aludwin | 8117830 | 2016-11-30 17:18:49 -0800 | [diff] [blame] | 1 | #!/usr/bin/env python |
| 2 | # Copyright 2016 The LUCI Authors. All rights reserved. |
| 3 | # Use of this source code is governed under the Apache License, Version 2.0 |
| 4 | # that can be found in the LICENSE file. |
| 5 | |
| 6 | """A low-level blob storage/retrieval interface to the Isolate server""" |
| 7 | |
| 8 | import base64 |
| 9 | import binascii |
aludwin | bdcd1bb | 2016-12-08 08:39:53 -0800 | [diff] [blame] | 10 | import collections |
aludwin | 8117830 | 2016-11-30 17:18:49 -0800 | [diff] [blame] | 11 | import logging |
aludwin | 120e37e | 2017-06-28 13:05:58 -0700 | [diff] [blame] | 12 | import os |
aludwin | 8117830 | 2016-11-30 17:18:49 -0800 | [diff] [blame] | 13 | import re |
| 14 | import sys |
| 15 | import threading |
| 16 | import time |
| 17 | import types |
aludwin | 120e37e | 2017-06-28 13:05:58 -0700 | [diff] [blame] | 18 | import uuid |
aludwin | 8117830 | 2016-11-30 17:18:49 -0800 | [diff] [blame] | 19 | |
| 20 | from utils import file_path |
| 21 | from utils import net |
| 22 | |
| 23 | import isolated_format |
| 24 | |
| 25 | # gRPC may not be installed on the worker machine. This is fine, as long as |
| 26 | # the bot doesn't attempt to use gRPC (checked in IsolateServerGrpc.__init__). |
aludwin | 120e37e | 2017-06-28 13:05:58 -0700 | [diff] [blame] | 27 | # Full external requirements are: grpcio, certifi. |
aludwin | 8117830 | 2016-11-30 17:18:49 -0800 | [diff] [blame] | 28 | try: |
| 29 | import grpc |
aludwin | 120e37e | 2017-06-28 13:05:58 -0700 | [diff] [blame] | 30 | from google import auth as google_auth |
| 31 | from google.auth.transport import grpc as google_auth_transport_grpc |
| 32 | from google.auth.transport import requests as google_auth_transport_requests |
| 33 | from proto import bytestream_pb2 |
| 34 | except ImportError as err: |
aludwin | 8117830 | 2016-11-30 17:18:49 -0800 | [diff] [blame] | 35 | grpc = None |
aludwin | 120e37e | 2017-06-28 13:05:58 -0700 | [diff] [blame] | 36 | bytestream_pb2 = None |
aludwin | 8117830 | 2016-11-30 17:18:49 -0800 | [diff] [blame] | 37 | |
aludwin | 120e37e | 2017-06-28 13:05:58 -0700 | [diff] [blame] | 38 | # If gRPC is installed, at least give a warning if certifi is not. This is not |
| 39 | # actually used anywhere in this module, but if certifi is missing, |
| 40 | # google.auth.transport will fail with |
| 41 | # https://stackoverflow.com/questions/24973326 |
| 42 | if grpc is not None: |
| 43 | try: |
| 44 | import certifi |
| 45 | except ImportError as err: |
| 46 | logging.warning('could not import certifi; gRPC HTTPS connections may fail') |
aludwin | 8117830 | 2016-11-30 17:18:49 -0800 | [diff] [blame] | 47 | |
| 48 | # Chunk size to use when reading from network stream. |
| 49 | NET_IO_FILE_CHUNK = 16 * 1024 |
| 50 | |
| 51 | |
| 52 | # Read timeout in seconds for downloads from isolate storage. If there's no |
| 53 | # response from the server within this timeout whole download will be aborted. |
| 54 | DOWNLOAD_READ_TIMEOUT = 60 |
| 55 | |
| 56 | |
| 57 | # A class to use to communicate with the server by default. Can be changed by |
| 58 | # 'set_storage_api_class'. Default is IsolateServer. |
| 59 | _storage_api_cls = None |
| 60 | |
| 61 | |
| 62 | class Item(object): |
| 63 | """An item to push to Storage. |
| 64 | |
| 65 | Its digest and size may be provided in advance, if known. Otherwise they will |
| 66 | be derived from content(). If digest is provided, it MUST correspond to |
| 67 | hash algorithm used by Storage. |
| 68 | |
| 69 | When used with Storage, Item starts its life in a main thread, travels |
| 70 | to 'contains' thread, then to 'push' thread and then finally back to |
| 71 | the main thread. It is never used concurrently from multiple threads. |
| 72 | """ |
| 73 | |
| 74 | def __init__(self, digest=None, size=None, high_priority=False): |
| 75 | self.digest = digest |
| 76 | self.size = size |
| 77 | self.high_priority = high_priority |
| 78 | self.compression_level = 6 |
| 79 | |
| 80 | def content(self): |
| 81 | """Iterable with content of this item as byte string (str) chunks.""" |
| 82 | raise NotImplementedError() |
| 83 | |
| 84 | def prepare(self, hash_algo): |
| 85 | """Ensures self.digest and self.size are set. |
| 86 | |
| 87 | Uses content() as a source of data to calculate them. Does nothing if digest |
| 88 | and size is already known. |
| 89 | |
| 90 | Arguments: |
| 91 | hash_algo: hash algorithm to use to calculate digest. |
| 92 | """ |
| 93 | if self.digest is None or self.size is None: |
| 94 | digest = hash_algo() |
| 95 | total = 0 |
| 96 | for chunk in self.content(): |
| 97 | digest.update(chunk) |
| 98 | total += len(chunk) |
| 99 | self.digest = digest.hexdigest() |
| 100 | self.size = total |
| 101 | |
| 102 | |
| 103 | class StorageApi(object): |
| 104 | """Interface for classes that implement low-level storage operations. |
| 105 | |
| 106 | StorageApi is oblivious of compression and hashing scheme used. This details |
| 107 | are handled in higher level Storage class. |
| 108 | |
| 109 | Clients should generally not use StorageApi directly. Storage class is |
| 110 | preferred since it implements compression and upload optimizations. |
| 111 | """ |
| 112 | |
| 113 | @property |
| 114 | def location(self): |
| 115 | """URL of the backing store that this class is using.""" |
| 116 | raise NotImplementedError() |
| 117 | |
| 118 | @property |
| 119 | def namespace(self): |
| 120 | """Isolate namespace used by this storage. |
| 121 | |
| 122 | Indirectly defines hashing scheme and compression method used. |
| 123 | """ |
| 124 | raise NotImplementedError() |
| 125 | |
aludwin | ef01fa3 | 2017-06-29 12:01:03 -0700 | [diff] [blame^] | 126 | @property |
| 127 | def internal_compression(self): |
| 128 | """True if this class doesn't require external compression. |
| 129 | |
| 130 | If true, callers should not compress items, even if the namespace indicates |
| 131 | otherwise. Compression will be performed by the StorageApi class. |
| 132 | """ |
| 133 | return False |
| 134 | |
aludwin | 8117830 | 2016-11-30 17:18:49 -0800 | [diff] [blame] | 135 | def fetch(self, digest, offset=0): |
| 136 | """Fetches an object and yields its content. |
| 137 | |
| 138 | Arguments: |
| 139 | digest: hash digest of item to download. |
| 140 | offset: offset (in bytes) from the start of the file to resume fetch from. |
| 141 | |
| 142 | Yields: |
| 143 | Chunks of downloaded item (as str objects). |
| 144 | """ |
| 145 | raise NotImplementedError() |
| 146 | |
| 147 | def push(self, item, push_state, content=None): |
| 148 | """Uploads an |item| with content generated by |content| generator. |
| 149 | |
| 150 | |item| MUST go through 'contains' call to get |push_state| before it can |
| 151 | be pushed to the storage. |
| 152 | |
| 153 | To be clear, here is one possible usage: |
| 154 | all_items = [... all items to push as Item subclasses ...] |
| 155 | for missing_item, push_state in storage_api.contains(all_items).items(): |
| 156 | storage_api.push(missing_item, push_state) |
| 157 | |
| 158 | When pushing to a namespace with compression, data that should be pushed |
| 159 | and data provided by the item is not the same. In that case |content| is |
| 160 | not None and it yields chunks of compressed data (using item.content() as |
| 161 | a source of original uncompressed data). This is implemented by Storage |
| 162 | class. |
| 163 | |
| 164 | Arguments: |
| 165 | item: Item object that holds information about an item being pushed. |
| 166 | push_state: push state object as returned by 'contains' call. |
| 167 | content: a generator that yields chunks to push, item.content() if None. |
| 168 | |
| 169 | Returns: |
| 170 | None. |
| 171 | """ |
| 172 | raise NotImplementedError() |
| 173 | |
| 174 | def contains(self, items): |
| 175 | """Checks for |items| on the server, prepares missing ones for upload. |
| 176 | |
| 177 | Arguments: |
| 178 | items: list of Item objects to check for presence. |
| 179 | |
| 180 | Returns: |
| 181 | A dict missing Item -> opaque push state object to be passed to 'push'. |
| 182 | See doc string for 'push'. |
| 183 | """ |
| 184 | raise NotImplementedError() |
| 185 | |
| 186 | |
| 187 | class _IsolateServerPushState(object): |
| 188 | """Per-item state passed from IsolateServer.contains to IsolateServer.push. |
| 189 | |
| 190 | Note this needs to be a global class to support pickling. |
| 191 | """ |
| 192 | |
| 193 | def __init__(self, preupload_status, size): |
| 194 | self.preupload_status = preupload_status |
| 195 | gs_upload_url = preupload_status.get('gs_upload_url') or None |
| 196 | if gs_upload_url: |
| 197 | self.upload_url = gs_upload_url |
| 198 | self.finalize_url = 'api/isolateservice/v1/finalize_gs_upload' |
| 199 | else: |
| 200 | self.upload_url = 'api/isolateservice/v1/store_inline' |
| 201 | self.finalize_url = None |
| 202 | self.uploaded = False |
| 203 | self.finalized = False |
| 204 | self.size = size |
| 205 | |
| 206 | |
| 207 | def guard_memory_use(server, content, size): |
| 208 | """Guards a server against using excessive memory while uploading. |
| 209 | |
| 210 | The server needs to contain a _memory_use int and a _lock mutex |
| 211 | (both IsolateServer and IsolateServerGrpc qualify); this function |
| 212 | then uses those values to track memory usage in a thread-safe way. |
| 213 | |
| 214 | If a request would cause the memory usage to exceed a safe maximum, |
| 215 | this function sleeps in 0.1s increments until memory usage falls |
| 216 | below the maximum. |
| 217 | """ |
| 218 | if isinstance(content, (basestring, list)): |
| 219 | # Memory is already used, too late. |
| 220 | with server._lock: |
| 221 | server._memory_use += size |
| 222 | else: |
| 223 | # TODO(vadimsh): Do not read from |content| generator when retrying push. |
| 224 | # If |content| is indeed a generator, it can not be re-winded back to the |
| 225 | # beginning of the stream. A retry will find it exhausted. A possible |
| 226 | # solution is to wrap |content| generator with some sort of caching |
| 227 | # restartable generator. It should be done alongside streaming support |
| 228 | # implementation. |
| 229 | # |
| 230 | # In theory, we should keep the generator, so that it is not serialized in |
| 231 | # memory. Sadly net.HttpService.request() requires the body to be |
| 232 | # serialized. |
| 233 | assert isinstance(content, types.GeneratorType), repr(content) |
| 234 | slept = False |
| 235 | # HACK HACK HACK. Please forgive me for my sins but OMG, it works! |
| 236 | # One byte less than 512mb. This is to cope with incompressible content. |
| 237 | max_size = int(sys.maxsize * 0.25) |
| 238 | while True: |
| 239 | with server._lock: |
| 240 | # This is due to 32 bits python when uploading very large files. The |
| 241 | # problem is that it's comparing uncompressed sizes, while we care |
| 242 | # about compressed sizes since it's what is serialized in memory. |
| 243 | # The first check assumes large files are compressible and that by |
| 244 | # throttling one upload at once, we can survive. Otherwise, kaboom. |
| 245 | memory_use = server._memory_use |
| 246 | if ((size >= max_size and not memory_use) or |
| 247 | (memory_use + size <= max_size)): |
| 248 | server._memory_use += size |
| 249 | memory_use = server._memory_use |
| 250 | break |
| 251 | time.sleep(0.1) |
| 252 | slept = True |
| 253 | if slept: |
| 254 | logging.info('Unblocked: %d %d', memory_use, size) |
| 255 | |
| 256 | |
| 257 | class IsolateServer(StorageApi): |
| 258 | """StorageApi implementation that downloads and uploads to Isolate Server. |
| 259 | |
| 260 | It uploads and downloads directly from Google Storage whenever appropriate. |
| 261 | Works only within single namespace. |
| 262 | """ |
| 263 | |
| 264 | def __init__(self, base_url, namespace): |
| 265 | super(IsolateServer, self).__init__() |
| 266 | assert file_path.is_url(base_url), base_url |
| 267 | self._base_url = base_url.rstrip('/') |
| 268 | self._namespace = namespace |
| 269 | self._namespace_dict = { |
| 270 | 'compression': 'flate' if namespace.endswith( |
| 271 | ('-gzip', '-flate')) else '', |
| 272 | 'digest_hash': 'sha-1', |
| 273 | 'namespace': namespace, |
| 274 | } |
| 275 | self._lock = threading.Lock() |
| 276 | self._server_caps = None |
| 277 | self._memory_use = 0 |
| 278 | |
| 279 | @property |
| 280 | def _server_capabilities(self): |
| 281 | """Gets server details. |
| 282 | |
| 283 | Returns: |
| 284 | Server capabilities dictionary as returned by /server_details endpoint. |
| 285 | """ |
| 286 | # TODO(maruel): Make this request much earlier asynchronously while the |
| 287 | # files are being enumerated. |
| 288 | |
| 289 | # TODO(vadimsh): Put |namespace| in the URL so that server can apply |
| 290 | # namespace-level ACLs to this call. |
| 291 | |
| 292 | with self._lock: |
| 293 | if self._server_caps is None: |
| 294 | self._server_caps = net.url_read_json( |
| 295 | url='%s/api/isolateservice/v1/server_details' % self._base_url, |
| 296 | data={}) |
| 297 | return self._server_caps |
| 298 | |
| 299 | @property |
| 300 | def location(self): |
| 301 | return self._base_url |
| 302 | |
| 303 | @property |
| 304 | def namespace(self): |
| 305 | return self._namespace |
| 306 | |
| 307 | def fetch(self, digest, offset=0): |
| 308 | assert offset >= 0 |
| 309 | source_url = '%s/api/isolateservice/v1/retrieve' % ( |
| 310 | self._base_url) |
| 311 | logging.debug('download_file(%s, %d)', source_url, offset) |
| 312 | response = self._do_fetch(source_url, digest, offset) |
| 313 | |
| 314 | if not response: |
| 315 | raise IOError( |
| 316 | 'Attempted to fetch from %s; no data exist: %s / %s.' % ( |
| 317 | source_url, self._namespace, digest)) |
| 318 | |
| 319 | # for DB uploads |
| 320 | content = response.get('content') |
| 321 | if content is not None: |
| 322 | yield base64.b64decode(content) |
| 323 | return |
| 324 | |
| 325 | # for GS entities |
| 326 | connection = net.url_open(response['url']) |
| 327 | if not connection: |
| 328 | raise IOError('Failed to download %s / %s' % (self._namespace, digest)) |
| 329 | |
| 330 | # If |offset|, verify server respects it by checking Content-Range. |
| 331 | if offset: |
| 332 | content_range = connection.get_header('Content-Range') |
| 333 | if not content_range: |
| 334 | raise IOError('Missing Content-Range header') |
| 335 | |
| 336 | # 'Content-Range' format is 'bytes <offset>-<last_byte_index>/<size>'. |
| 337 | # According to a spec, <size> can be '*' meaning "Total size of the file |
| 338 | # is not known in advance". |
| 339 | try: |
| 340 | match = re.match(r'bytes (\d+)-(\d+)/(\d+|\*)', content_range) |
| 341 | if not match: |
| 342 | raise ValueError() |
| 343 | content_offset = int(match.group(1)) |
| 344 | last_byte_index = int(match.group(2)) |
| 345 | size = None if match.group(3) == '*' else int(match.group(3)) |
| 346 | except ValueError: |
| 347 | raise IOError('Invalid Content-Range header: %s' % content_range) |
| 348 | |
| 349 | # Ensure returned offset equals requested one. |
| 350 | if offset != content_offset: |
| 351 | raise IOError('Expecting offset %d, got %d (Content-Range is %s)' % ( |
| 352 | offset, content_offset, content_range)) |
| 353 | |
| 354 | # Ensure entire tail of the file is returned. |
| 355 | if size is not None and last_byte_index + 1 != size: |
| 356 | raise IOError('Incomplete response. Content-Range: %s' % content_range) |
| 357 | |
| 358 | for data in connection.iter_content(NET_IO_FILE_CHUNK): |
| 359 | yield data |
| 360 | |
| 361 | def push(self, item, push_state, content=None): |
| 362 | assert isinstance(item, Item) |
| 363 | assert item.digest is not None |
| 364 | assert item.size is not None |
| 365 | assert isinstance(push_state, _IsolateServerPushState) |
| 366 | assert not push_state.finalized |
| 367 | |
| 368 | # Default to item.content(). |
| 369 | content = item.content() if content is None else content |
| 370 | logging.info('Push state size: %d', push_state.size) |
| 371 | guard_memory_use(self, content, push_state.size) |
| 372 | |
| 373 | try: |
| 374 | # This push operation may be a retry after failed finalization call below, |
| 375 | # no need to reupload contents in that case. |
| 376 | if not push_state.uploaded: |
| 377 | # PUT file to |upload_url|. |
| 378 | success = self._do_push(push_state, content) |
| 379 | if not success: |
| 380 | raise IOError('Failed to upload file with hash %s to URL %s' % ( |
| 381 | item.digest, push_state.upload_url)) |
| 382 | push_state.uploaded = True |
| 383 | else: |
| 384 | logging.info( |
| 385 | 'A file %s already uploaded, retrying finalization only', |
| 386 | item.digest) |
| 387 | |
| 388 | # Optionally notify the server that it's done. |
| 389 | if push_state.finalize_url: |
| 390 | # TODO(vadimsh): Calculate MD5 or CRC32C sum while uploading a file and |
| 391 | # send it to isolated server. That way isolate server can verify that |
| 392 | # the data safely reached Google Storage (GS provides MD5 and CRC32C of |
| 393 | # stored files). |
| 394 | # TODO(maruel): Fix the server to accept properly data={} so |
| 395 | # url_read_json() can be used. |
| 396 | response = net.url_read_json( |
| 397 | url='%s/%s' % (self._base_url, push_state.finalize_url), |
| 398 | data={ |
| 399 | 'upload_ticket': push_state.preupload_status['upload_ticket'], |
| 400 | }) |
| 401 | if not response or not response['ok']: |
| 402 | raise IOError('Failed to finalize file with hash %s.' % item.digest) |
| 403 | push_state.finalized = True |
| 404 | finally: |
| 405 | with self._lock: |
| 406 | self._memory_use -= push_state.size |
| 407 | |
| 408 | def contains(self, items): |
| 409 | # Ensure all items were initialized with 'prepare' call. Storage does that. |
| 410 | assert all(i.digest is not None and i.size is not None for i in items) |
| 411 | |
| 412 | # Request body is a json encoded list of dicts. |
| 413 | body = { |
| 414 | 'items': [ |
| 415 | { |
| 416 | 'digest': item.digest, |
| 417 | 'is_isolated': bool(item.high_priority), |
| 418 | 'size': item.size, |
| 419 | } for item in items |
| 420 | ], |
| 421 | 'namespace': self._namespace_dict, |
| 422 | } |
| 423 | |
| 424 | query_url = '%s/api/isolateservice/v1/preupload' % self._base_url |
| 425 | |
| 426 | # Response body is a list of push_urls (or null if file is already present). |
| 427 | response = None |
| 428 | try: |
| 429 | response = net.url_read_json(url=query_url, data=body) |
| 430 | if response is None: |
| 431 | raise isolated_format.MappingError( |
| 432 | 'Failed to execute preupload query') |
| 433 | except ValueError as err: |
| 434 | raise isolated_format.MappingError( |
| 435 | 'Invalid response from server: %s, body is %s' % (err, response)) |
| 436 | |
| 437 | # Pick Items that are missing, attach _PushState to them. |
| 438 | missing_items = {} |
| 439 | for preupload_status in response.get('items', []): |
| 440 | assert 'upload_ticket' in preupload_status, ( |
| 441 | preupload_status, '/preupload did not generate an upload ticket') |
| 442 | index = int(preupload_status['index']) |
| 443 | missing_items[items[index]] = _IsolateServerPushState( |
| 444 | preupload_status, items[index].size) |
| 445 | logging.info('Queried %d files, %d cache hit', |
| 446 | len(items), len(items) - len(missing_items)) |
| 447 | return missing_items |
| 448 | |
| 449 | def _do_fetch(self, url, digest, offset): |
| 450 | """Fetches isolated data from the URL. |
| 451 | |
| 452 | Used only for fetching files, not for API calls. Can be overridden in |
| 453 | subclasses. |
| 454 | |
| 455 | Args: |
| 456 | url: URL to fetch the data from, can possibly return http redirect. |
| 457 | offset: byte offset inside the file to start fetching from. |
| 458 | |
| 459 | Returns: |
| 460 | net.HttpResponse compatible object, with 'read' and 'get_header' calls. |
| 461 | """ |
| 462 | assert isinstance(offset, int) |
| 463 | data = { |
| 464 | 'digest': digest.encode('utf-8'), |
| 465 | 'namespace': self._namespace_dict, |
| 466 | 'offset': offset, |
| 467 | } |
| 468 | # TODO(maruel): url + '?' + urllib.urlencode(data) once a HTTP GET endpoint |
| 469 | # is added. |
| 470 | return net.url_read_json( |
| 471 | url=url, |
| 472 | data=data, |
| 473 | read_timeout=DOWNLOAD_READ_TIMEOUT) |
| 474 | |
| 475 | def _do_push(self, push_state, content): |
| 476 | """Uploads isolated file to the URL. |
| 477 | |
| 478 | Used only for storing files, not for API calls. Can be overridden in |
| 479 | subclasses. |
| 480 | |
| 481 | Args: |
| 482 | url: URL to upload the data to. |
| 483 | push_state: an _IsolateServicePushState instance |
| 484 | item: the original Item to be uploaded |
| 485 | content: an iterable that yields 'str' chunks. |
| 486 | """ |
| 487 | # A cheezy way to avoid memcpy of (possibly huge) file, until streaming |
| 488 | # upload support is implemented. |
| 489 | if isinstance(content, list) and len(content) == 1: |
| 490 | content = content[0] |
| 491 | else: |
| 492 | content = ''.join(content) |
| 493 | |
| 494 | # DB upload |
| 495 | if not push_state.finalize_url: |
| 496 | url = '%s/%s' % (self._base_url, push_state.upload_url) |
| 497 | content = base64.b64encode(content) |
| 498 | data = { |
| 499 | 'upload_ticket': push_state.preupload_status['upload_ticket'], |
| 500 | 'content': content, |
| 501 | } |
| 502 | response = net.url_read_json(url=url, data=data) |
| 503 | return response is not None and response['ok'] |
| 504 | |
| 505 | # upload to GS |
| 506 | url = push_state.upload_url |
| 507 | response = net.url_read( |
| 508 | content_type='application/octet-stream', |
| 509 | data=content, |
| 510 | method='PUT', |
| 511 | headers={'Cache-Control': 'public, max-age=31536000'}, |
| 512 | url=url) |
| 513 | return response is not None |
| 514 | |
| 515 | |
| 516 | class _IsolateServerGrpcPushState(object): |
| 517 | """Empty class, just to present same interface as IsolateServer """ |
| 518 | |
| 519 | def __init__(self): |
| 520 | pass |
| 521 | |
| 522 | |
| 523 | class IsolateServerGrpc(StorageApi): |
| 524 | """StorageApi implementation that downloads and uploads to a gRPC service. |
| 525 | |
| 526 | Limitations: only works for the default-gzip namespace, and with zero offsets |
| 527 | while fetching. |
| 528 | """ |
| 529 | |
| 530 | def __init__(self, server, namespace): |
| 531 | super(IsolateServerGrpc, self).__init__() |
| 532 | logging.info('Using gRPC for Isolate') |
aludwin | 120e37e | 2017-06-28 13:05:58 -0700 | [diff] [blame] | 533 | self._server = server |
| 534 | self._lock = threading.Lock() |
| 535 | self._memory_use = 0 |
| 536 | self._num_pushes = 0 |
| 537 | self._already_exists = 0 |
aludwin | 8117830 | 2016-11-30 17:18:49 -0800 | [diff] [blame] | 538 | |
aludwin | 8117830 | 2016-11-30 17:18:49 -0800 | [diff] [blame] | 539 | # Proxies only support the default-gzip namespace for now. |
aludwin | 120e37e | 2017-06-28 13:05:58 -0700 | [diff] [blame] | 540 | # TODO(aludwin): support other namespaces if necessary |
aludwin | 8117830 | 2016-11-30 17:18:49 -0800 | [diff] [blame] | 541 | assert namespace == 'default-gzip' |
aludwin | ef01fa3 | 2017-06-29 12:01:03 -0700 | [diff] [blame^] | 542 | self._namespace = namespace |
| 543 | |
| 544 | # Make sure grpc was successfully imported |
| 545 | assert grpc |
| 546 | assert bytestream_pb2 |
aludwin | 120e37e | 2017-06-28 13:05:58 -0700 | [diff] [blame] | 547 | |
| 548 | proxy = os.environ.get('ISOLATED_GRPC_PROXY', '') |
| 549 | roots = os.environ.get('ISOLATED_GRPC_PROXY_TLS_ROOTS') |
| 550 | overd = os.environ.get('ISOLATED_GRPC_PROXY_TLS_OVERRIDE') |
| 551 | |
| 552 | # The "proxy" envvar must be of the form: |
| 553 | # http[s]://<server>[:port][/prefix] |
| 554 | m = re.search('^(https?):\/\/([^\/]+)/?(.*)$', proxy) |
| 555 | if not m: |
| 556 | raise ValueError(('gRPC proxy must have the form: ' |
| 557 | 'http[s]://<server>[:port][/prefix] ' |
| 558 | '(given: %s)') % proxy) |
| 559 | transport = m.group(1) |
| 560 | host = m.group(2) |
| 561 | prefix = m.group(3) |
| 562 | if not prefix.endswith('/'): |
| 563 | prefix = prefix + '/' |
| 564 | logging.info('gRPC proxy: transport %s, host %s, prefix %s', |
| 565 | transport, host, prefix) |
| 566 | self._prefix = prefix |
| 567 | |
| 568 | if transport == 'http': |
| 569 | self._channel = grpc.insecure_channel(host) |
| 570 | elif transport == 'https': |
| 571 | # Using cloud container builder scopes for testing: |
| 572 | scopes = ('https://www.googleapis.com/auth/cloud-build-service',) |
| 573 | credentials, _ = google_auth.default(scopes=scopes) |
| 574 | request = google_auth_transport_requests.Request() |
| 575 | options = () |
| 576 | root_certs = None |
| 577 | if roots is not None: |
| 578 | logging.info('Using root CA %s', roots) |
| 579 | with open(roots) as f: |
| 580 | root_certs = f.read() |
| 581 | if overd is not None: |
| 582 | logging.info('Using TLS server override %s', overd) |
| 583 | options=(('grpc.ssl_target_name_override', overd),) |
| 584 | ssl_creds = grpc.ssl_channel_credentials(root_certificates=root_certs) |
| 585 | self._channel = google_auth_transport_grpc.secure_authorized_channel( |
| 586 | credentials, request, host, ssl_creds, options=options) |
| 587 | else: |
| 588 | raise ValueError('unknown transport %s (should be http[s])' % transport) |
| 589 | self._stub = bytestream_pb2.ByteStreamStub(self._channel) |
aludwin | 8117830 | 2016-11-30 17:18:49 -0800 | [diff] [blame] | 590 | |
| 591 | @property |
| 592 | def location(self): |
| 593 | return self._server |
| 594 | |
| 595 | @property |
| 596 | def namespace(self): |
aludwin | ef01fa3 | 2017-06-29 12:01:03 -0700 | [diff] [blame^] | 597 | return self._namespace |
| 598 | |
| 599 | @property |
| 600 | def internal_compression(self): |
| 601 | # gRPC natively compresses all messages before transmission. |
| 602 | return True |
aludwin | 8117830 | 2016-11-30 17:18:49 -0800 | [diff] [blame] | 603 | |
| 604 | def fetch(self, digest, offset=0): |
| 605 | # The gRPC APIs only work with an offset of 0 |
| 606 | assert offset == 0 |
aludwin | 120e37e | 2017-06-28 13:05:58 -0700 | [diff] [blame] | 607 | request = bytestream_pb2.ReadRequest() |
| 608 | #TODO(aludwin): send the expected size of the item |
| 609 | request.resource_name = '%sblobs/%s/0' % ( |
| 610 | self._prefix, digest) |
aludwin | ee2faf7 | 2016-12-21 10:36:03 -0800 | [diff] [blame] | 611 | try: |
aludwin | 120e37e | 2017-06-28 13:05:58 -0700 | [diff] [blame] | 612 | for response in self._stub.Read(request, timeout=DOWNLOAD_READ_TIMEOUT): |
| 613 | yield response.data |
aludwin | ee2faf7 | 2016-12-21 10:36:03 -0800 | [diff] [blame] | 614 | except grpc.RpcError as g: |
| 615 | logging.error('gRPC error during fetch: re-throwing as IOError (%s)' % g) |
| 616 | raise IOError(g) |
aludwin | 8117830 | 2016-11-30 17:18:49 -0800 | [diff] [blame] | 617 | |
| 618 | def push(self, item, push_state, content=None): |
| 619 | assert isinstance(item, Item) |
| 620 | assert item.digest is not None |
| 621 | assert item.size is not None |
| 622 | assert isinstance(push_state, _IsolateServerGrpcPushState) |
| 623 | |
| 624 | # Default to item.content(). |
| 625 | content = item.content() if content is None else content |
| 626 | guard_memory_use(self, content, item.size) |
aludwin | 120e37e | 2017-06-28 13:05:58 -0700 | [diff] [blame] | 627 | self._num_pushes += 1 |
aludwin | 8117830 | 2016-11-30 17:18:49 -0800 | [diff] [blame] | 628 | |
| 629 | try: |
aludwin | bdcd1bb | 2016-12-08 08:39:53 -0800 | [diff] [blame] | 630 | def chunker(): |
| 631 | # Returns one bit of content at a time |
aludwin | d1e5913 | 2016-12-14 08:22:41 -0800 | [diff] [blame] | 632 | if (isinstance(content, str) |
| 633 | or not isinstance(content, collections.Iterable)): |
aludwin | bdcd1bb | 2016-12-08 08:39:53 -0800 | [diff] [blame] | 634 | yield content |
| 635 | else: |
| 636 | for chunk in content: |
| 637 | yield chunk |
| 638 | def slicer(): |
| 639 | # Ensures every bit of content is under the gRPC max size; yields |
| 640 | # proto messages to send via gRPC. |
aludwin | 120e37e | 2017-06-28 13:05:58 -0700 | [diff] [blame] | 641 | request = bytestream_pb2.WriteRequest() |
| 642 | u = uuid.uuid4() |
| 643 | request.resource_name = '%suploads/%s/blobs/%s/%d' % ( |
| 644 | self._prefix, u, item.digest, item.size) |
| 645 | request.write_offset = 0 |
aludwin | bdcd1bb | 2016-12-08 08:39:53 -0800 | [diff] [blame] | 646 | for chunk in chunker(): |
| 647 | # Make sure we send at least one chunk for zero-length blobs |
| 648 | has_sent_anything = False |
aludwin | d1e5913 | 2016-12-14 08:22:41 -0800 | [diff] [blame] | 649 | while chunk or not has_sent_anything: |
aludwin | bdcd1bb | 2016-12-08 08:39:53 -0800 | [diff] [blame] | 650 | has_sent_anything = True |
aludwin | 120e37e | 2017-06-28 13:05:58 -0700 | [diff] [blame] | 651 | slice_len = min(len(chunk), NET_IO_FILE_CHUNK) |
| 652 | request.data = chunk[:slice_len] |
| 653 | if request.write_offset + slice_len == item.size: |
| 654 | request.finish_write = True |
| 655 | yield request |
| 656 | request.write_offset += slice_len |
aludwin | bdcd1bb | 2016-12-08 08:39:53 -0800 | [diff] [blame] | 657 | chunk = chunk[slice_len:] |
| 658 | |
aludwin | ee2faf7 | 2016-12-21 10:36:03 -0800 | [diff] [blame] | 659 | try: |
aludwin | 120e37e | 2017-06-28 13:05:58 -0700 | [diff] [blame] | 660 | response = self._stub.Write(slicer()) |
| 661 | except grpc.Call as c: |
| 662 | # You might think that errors from gRPC would be rpc.RpcError. You'd |
| 663 | # be... right... but it's *also* an instance of grpc.Call, and that's |
| 664 | # where the status code actually lives. |
| 665 | if c.code() == grpc.StatusCode.ALREADY_EXISTS: |
| 666 | # This is legit - we didn't check before we pushed so no problem if |
| 667 | # it's already there. |
| 668 | self._already_exists += 1 |
| 669 | if self._already_exists % 100 == 0: |
| 670 | logging.info('unnecessarily pushed %d/%d blobs (%.1f%%)' % ( |
| 671 | self._already_exists, self._num_pushes, |
| 672 | 100.0 * self._already_exists / self._num_pushes)) |
| 673 | else: |
| 674 | logging.error('gRPC error during push: throwing as IOError (%s)' % c) |
| 675 | raise IOError(c) |
| 676 | except Exception as e: |
| 677 | logging.error('error during push: throwing as IOError (%s)' % e) |
| 678 | raise IOError(e) |
aludwin | ee2faf7 | 2016-12-21 10:36:03 -0800 | [diff] [blame] | 679 | |
aludwin | 120e37e | 2017-06-28 13:05:58 -0700 | [diff] [blame] | 680 | if response.committed_size != item.size: |
| 681 | raise IOError('%s/%d: incorrect size written (%d)' % ( |
| 682 | item.digest, item.size, response.committed_size)) |
aludwin | 8117830 | 2016-11-30 17:18:49 -0800 | [diff] [blame] | 683 | |
| 684 | finally: |
| 685 | with self._lock: |
| 686 | self._memory_use -= item.size |
| 687 | |
| 688 | def contains(self, items): |
| 689 | """Returns the set of all missing items.""" |
aludwin | 120e37e | 2017-06-28 13:05:58 -0700 | [diff] [blame] | 690 | # TODO(aludwin): this isn't supported directly in Bytestream, so for now |
| 691 | # assume that nothing is present in the cache. |
aludwin | 8117830 | 2016-11-30 17:18:49 -0800 | [diff] [blame] | 692 | # Ensure all items were initialized with 'prepare' call. Storage does that. |
| 693 | assert all(i.digest is not None and i.size is not None for i in items) |
aludwin | 120e37e | 2017-06-28 13:05:58 -0700 | [diff] [blame] | 694 | # Assume all Items are missing, and attach _PushState to them. The gRPC |
| 695 | # implementation doesn't actually have a push state, we just attach empty |
| 696 | # objects to satisfy the StorageApi interface. |
aludwin | 8117830 | 2016-11-30 17:18:49 -0800 | [diff] [blame] | 697 | missing_items = {} |
aludwin | 120e37e | 2017-06-28 13:05:58 -0700 | [diff] [blame] | 698 | for item in items: |
aludwin | 8117830 | 2016-11-30 17:18:49 -0800 | [diff] [blame] | 699 | missing_items[item] = _IsolateServerGrpcPushState() |
aludwin | 8117830 | 2016-11-30 17:18:49 -0800 | [diff] [blame] | 700 | return missing_items |
| 701 | |
| 702 | |
| 703 | def set_storage_api_class(cls): |
| 704 | """Replaces StorageApi implementation used by default.""" |
| 705 | global _storage_api_cls |
| 706 | assert _storage_api_cls is None |
| 707 | assert issubclass(cls, StorageApi) |
| 708 | _storage_api_cls = cls |
| 709 | |
| 710 | |
| 711 | def get_storage_api(url, namespace): |
| 712 | """Returns an object that implements low-level StorageApi interface. |
| 713 | |
| 714 | It is used by Storage to work with single isolate |namespace|. It should |
| 715 | rarely be used directly by clients, see 'get_storage' for |
| 716 | a better alternative. |
| 717 | |
| 718 | Arguments: |
| 719 | url: URL of isolate service to use shared cloud based storage. |
| 720 | namespace: isolate namespace to operate in, also defines hashing and |
| 721 | compression scheme used, i.e. namespace names that end with '-gzip' |
| 722 | store compressed data. |
| 723 | |
| 724 | Returns: |
| 725 | Instance of StorageApi subclass. |
| 726 | """ |
| 727 | cls = _storage_api_cls or IsolateServer |
| 728 | return cls(url, namespace) |