aludwin | 8117830 | 2016-11-30 17:18:49 -0800 | [diff] [blame] | 1 | #!/usr/bin/env python |
| 2 | # Copyright 2016 The LUCI Authors. All rights reserved. |
| 3 | # Use of this source code is governed under the Apache License, Version 2.0 |
| 4 | # that can be found in the LICENSE file. |
| 5 | |
| 6 | """A low-level blob storage/retrieval interface to the Isolate server""" |
| 7 | |
| 8 | import base64 |
| 9 | import binascii |
aludwin | bdcd1bb | 2016-12-08 08:39:53 -0800 | [diff] [blame] | 10 | import collections |
aludwin | 8117830 | 2016-11-30 17:18:49 -0800 | [diff] [blame] | 11 | import logging |
| 12 | import re |
| 13 | import sys |
| 14 | import threading |
| 15 | import time |
| 16 | import types |
| 17 | |
| 18 | from utils import file_path |
| 19 | from utils import net |
| 20 | |
| 21 | import isolated_format |
| 22 | |
| 23 | # gRPC may not be installed on the worker machine. This is fine, as long as |
| 24 | # the bot doesn't attempt to use gRPC (checked in IsolateServerGrpc.__init__). |
| 25 | try: |
| 26 | import grpc |
| 27 | from proto import isolate_bot_pb2 |
| 28 | except ImportError: |
| 29 | grpc = None |
| 30 | isolate_bot_pb2 = None |
| 31 | |
| 32 | |
| 33 | # Chunk size to use when reading from network stream. |
| 34 | NET_IO_FILE_CHUNK = 16 * 1024 |
| 35 | |
| 36 | |
| 37 | # Read timeout in seconds for downloads from isolate storage. If there's no |
| 38 | # response from the server within this timeout whole download will be aborted. |
| 39 | DOWNLOAD_READ_TIMEOUT = 60 |
| 40 | |
| 41 | |
| 42 | # A class to use to communicate with the server by default. Can be changed by |
| 43 | # 'set_storage_api_class'. Default is IsolateServer. |
| 44 | _storage_api_cls = None |
| 45 | |
| 46 | |
| 47 | class Item(object): |
| 48 | """An item to push to Storage. |
| 49 | |
| 50 | Its digest and size may be provided in advance, if known. Otherwise they will |
| 51 | be derived from content(). If digest is provided, it MUST correspond to |
| 52 | hash algorithm used by Storage. |
| 53 | |
| 54 | When used with Storage, Item starts its life in a main thread, travels |
| 55 | to 'contains' thread, then to 'push' thread and then finally back to |
| 56 | the main thread. It is never used concurrently from multiple threads. |
| 57 | """ |
| 58 | |
| 59 | def __init__(self, digest=None, size=None, high_priority=False): |
| 60 | self.digest = digest |
| 61 | self.size = size |
| 62 | self.high_priority = high_priority |
| 63 | self.compression_level = 6 |
| 64 | |
| 65 | def content(self): |
| 66 | """Iterable with content of this item as byte string (str) chunks.""" |
| 67 | raise NotImplementedError() |
| 68 | |
| 69 | def prepare(self, hash_algo): |
| 70 | """Ensures self.digest and self.size are set. |
| 71 | |
| 72 | Uses content() as a source of data to calculate them. Does nothing if digest |
| 73 | and size is already known. |
| 74 | |
| 75 | Arguments: |
| 76 | hash_algo: hash algorithm to use to calculate digest. |
| 77 | """ |
| 78 | if self.digest is None or self.size is None: |
| 79 | digest = hash_algo() |
| 80 | total = 0 |
| 81 | for chunk in self.content(): |
| 82 | digest.update(chunk) |
| 83 | total += len(chunk) |
| 84 | self.digest = digest.hexdigest() |
| 85 | self.size = total |
| 86 | |
| 87 | |
| 88 | class StorageApi(object): |
| 89 | """Interface for classes that implement low-level storage operations. |
| 90 | |
| 91 | StorageApi is oblivious of compression and hashing scheme used. This details |
| 92 | are handled in higher level Storage class. |
| 93 | |
| 94 | Clients should generally not use StorageApi directly. Storage class is |
| 95 | preferred since it implements compression and upload optimizations. |
| 96 | """ |
| 97 | |
| 98 | @property |
| 99 | def location(self): |
| 100 | """URL of the backing store that this class is using.""" |
| 101 | raise NotImplementedError() |
| 102 | |
| 103 | @property |
| 104 | def namespace(self): |
| 105 | """Isolate namespace used by this storage. |
| 106 | |
| 107 | Indirectly defines hashing scheme and compression method used. |
| 108 | """ |
| 109 | raise NotImplementedError() |
| 110 | |
| 111 | def fetch(self, digest, offset=0): |
| 112 | """Fetches an object and yields its content. |
| 113 | |
| 114 | Arguments: |
| 115 | digest: hash digest of item to download. |
| 116 | offset: offset (in bytes) from the start of the file to resume fetch from. |
| 117 | |
| 118 | Yields: |
| 119 | Chunks of downloaded item (as str objects). |
| 120 | """ |
| 121 | raise NotImplementedError() |
| 122 | |
| 123 | def push(self, item, push_state, content=None): |
| 124 | """Uploads an |item| with content generated by |content| generator. |
| 125 | |
| 126 | |item| MUST go through 'contains' call to get |push_state| before it can |
| 127 | be pushed to the storage. |
| 128 | |
| 129 | To be clear, here is one possible usage: |
| 130 | all_items = [... all items to push as Item subclasses ...] |
| 131 | for missing_item, push_state in storage_api.contains(all_items).items(): |
| 132 | storage_api.push(missing_item, push_state) |
| 133 | |
| 134 | When pushing to a namespace with compression, data that should be pushed |
| 135 | and data provided by the item is not the same. In that case |content| is |
| 136 | not None and it yields chunks of compressed data (using item.content() as |
| 137 | a source of original uncompressed data). This is implemented by Storage |
| 138 | class. |
| 139 | |
| 140 | Arguments: |
| 141 | item: Item object that holds information about an item being pushed. |
| 142 | push_state: push state object as returned by 'contains' call. |
| 143 | content: a generator that yields chunks to push, item.content() if None. |
| 144 | |
| 145 | Returns: |
| 146 | None. |
| 147 | """ |
| 148 | raise NotImplementedError() |
| 149 | |
| 150 | def contains(self, items): |
| 151 | """Checks for |items| on the server, prepares missing ones for upload. |
| 152 | |
| 153 | Arguments: |
| 154 | items: list of Item objects to check for presence. |
| 155 | |
| 156 | Returns: |
| 157 | A dict missing Item -> opaque push state object to be passed to 'push'. |
| 158 | See doc string for 'push'. |
| 159 | """ |
| 160 | raise NotImplementedError() |
| 161 | |
| 162 | |
| 163 | class _IsolateServerPushState(object): |
| 164 | """Per-item state passed from IsolateServer.contains to IsolateServer.push. |
| 165 | |
| 166 | Note this needs to be a global class to support pickling. |
| 167 | """ |
| 168 | |
| 169 | def __init__(self, preupload_status, size): |
| 170 | self.preupload_status = preupload_status |
| 171 | gs_upload_url = preupload_status.get('gs_upload_url') or None |
| 172 | if gs_upload_url: |
| 173 | self.upload_url = gs_upload_url |
| 174 | self.finalize_url = 'api/isolateservice/v1/finalize_gs_upload' |
| 175 | else: |
| 176 | self.upload_url = 'api/isolateservice/v1/store_inline' |
| 177 | self.finalize_url = None |
| 178 | self.uploaded = False |
| 179 | self.finalized = False |
| 180 | self.size = size |
| 181 | |
| 182 | |
| 183 | def guard_memory_use(server, content, size): |
| 184 | """Guards a server against using excessive memory while uploading. |
| 185 | |
| 186 | The server needs to contain a _memory_use int and a _lock mutex |
| 187 | (both IsolateServer and IsolateServerGrpc qualify); this function |
| 188 | then uses those values to track memory usage in a thread-safe way. |
| 189 | |
| 190 | If a request would cause the memory usage to exceed a safe maximum, |
| 191 | this function sleeps in 0.1s increments until memory usage falls |
| 192 | below the maximum. |
| 193 | """ |
| 194 | if isinstance(content, (basestring, list)): |
| 195 | # Memory is already used, too late. |
| 196 | with server._lock: |
| 197 | server._memory_use += size |
| 198 | else: |
| 199 | # TODO(vadimsh): Do not read from |content| generator when retrying push. |
| 200 | # If |content| is indeed a generator, it can not be re-winded back to the |
| 201 | # beginning of the stream. A retry will find it exhausted. A possible |
| 202 | # solution is to wrap |content| generator with some sort of caching |
| 203 | # restartable generator. It should be done alongside streaming support |
| 204 | # implementation. |
| 205 | # |
| 206 | # In theory, we should keep the generator, so that it is not serialized in |
| 207 | # memory. Sadly net.HttpService.request() requires the body to be |
| 208 | # serialized. |
| 209 | assert isinstance(content, types.GeneratorType), repr(content) |
| 210 | slept = False |
| 211 | # HACK HACK HACK. Please forgive me for my sins but OMG, it works! |
| 212 | # One byte less than 512mb. This is to cope with incompressible content. |
| 213 | max_size = int(sys.maxsize * 0.25) |
| 214 | while True: |
| 215 | with server._lock: |
| 216 | # This is due to 32 bits python when uploading very large files. The |
| 217 | # problem is that it's comparing uncompressed sizes, while we care |
| 218 | # about compressed sizes since it's what is serialized in memory. |
| 219 | # The first check assumes large files are compressible and that by |
| 220 | # throttling one upload at once, we can survive. Otherwise, kaboom. |
| 221 | memory_use = server._memory_use |
| 222 | if ((size >= max_size and not memory_use) or |
| 223 | (memory_use + size <= max_size)): |
| 224 | server._memory_use += size |
| 225 | memory_use = server._memory_use |
| 226 | break |
| 227 | time.sleep(0.1) |
| 228 | slept = True |
| 229 | if slept: |
| 230 | logging.info('Unblocked: %d %d', memory_use, size) |
| 231 | |
| 232 | |
| 233 | class IsolateServer(StorageApi): |
| 234 | """StorageApi implementation that downloads and uploads to Isolate Server. |
| 235 | |
| 236 | It uploads and downloads directly from Google Storage whenever appropriate. |
| 237 | Works only within single namespace. |
| 238 | """ |
| 239 | |
| 240 | def __init__(self, base_url, namespace): |
| 241 | super(IsolateServer, self).__init__() |
| 242 | assert file_path.is_url(base_url), base_url |
| 243 | self._base_url = base_url.rstrip('/') |
| 244 | self._namespace = namespace |
| 245 | self._namespace_dict = { |
| 246 | 'compression': 'flate' if namespace.endswith( |
| 247 | ('-gzip', '-flate')) else '', |
| 248 | 'digest_hash': 'sha-1', |
| 249 | 'namespace': namespace, |
| 250 | } |
| 251 | self._lock = threading.Lock() |
| 252 | self._server_caps = None |
| 253 | self._memory_use = 0 |
| 254 | |
| 255 | @property |
| 256 | def _server_capabilities(self): |
| 257 | """Gets server details. |
| 258 | |
| 259 | Returns: |
| 260 | Server capabilities dictionary as returned by /server_details endpoint. |
| 261 | """ |
| 262 | # TODO(maruel): Make this request much earlier asynchronously while the |
| 263 | # files are being enumerated. |
| 264 | |
| 265 | # TODO(vadimsh): Put |namespace| in the URL so that server can apply |
| 266 | # namespace-level ACLs to this call. |
| 267 | |
| 268 | with self._lock: |
| 269 | if self._server_caps is None: |
| 270 | self._server_caps = net.url_read_json( |
| 271 | url='%s/api/isolateservice/v1/server_details' % self._base_url, |
| 272 | data={}) |
| 273 | return self._server_caps |
| 274 | |
| 275 | @property |
| 276 | def location(self): |
| 277 | return self._base_url |
| 278 | |
| 279 | @property |
| 280 | def namespace(self): |
| 281 | return self._namespace |
| 282 | |
| 283 | def fetch(self, digest, offset=0): |
| 284 | assert offset >= 0 |
| 285 | source_url = '%s/api/isolateservice/v1/retrieve' % ( |
| 286 | self._base_url) |
| 287 | logging.debug('download_file(%s, %d)', source_url, offset) |
| 288 | response = self._do_fetch(source_url, digest, offset) |
| 289 | |
| 290 | if not response: |
| 291 | raise IOError( |
| 292 | 'Attempted to fetch from %s; no data exist: %s / %s.' % ( |
| 293 | source_url, self._namespace, digest)) |
| 294 | |
| 295 | # for DB uploads |
| 296 | content = response.get('content') |
| 297 | if content is not None: |
| 298 | yield base64.b64decode(content) |
| 299 | return |
| 300 | |
| 301 | # for GS entities |
| 302 | connection = net.url_open(response['url']) |
| 303 | if not connection: |
| 304 | raise IOError('Failed to download %s / %s' % (self._namespace, digest)) |
| 305 | |
| 306 | # If |offset|, verify server respects it by checking Content-Range. |
| 307 | if offset: |
| 308 | content_range = connection.get_header('Content-Range') |
| 309 | if not content_range: |
| 310 | raise IOError('Missing Content-Range header') |
| 311 | |
| 312 | # 'Content-Range' format is 'bytes <offset>-<last_byte_index>/<size>'. |
| 313 | # According to a spec, <size> can be '*' meaning "Total size of the file |
| 314 | # is not known in advance". |
| 315 | try: |
| 316 | match = re.match(r'bytes (\d+)-(\d+)/(\d+|\*)', content_range) |
| 317 | if not match: |
| 318 | raise ValueError() |
| 319 | content_offset = int(match.group(1)) |
| 320 | last_byte_index = int(match.group(2)) |
| 321 | size = None if match.group(3) == '*' else int(match.group(3)) |
| 322 | except ValueError: |
| 323 | raise IOError('Invalid Content-Range header: %s' % content_range) |
| 324 | |
| 325 | # Ensure returned offset equals requested one. |
| 326 | if offset != content_offset: |
| 327 | raise IOError('Expecting offset %d, got %d (Content-Range is %s)' % ( |
| 328 | offset, content_offset, content_range)) |
| 329 | |
| 330 | # Ensure entire tail of the file is returned. |
| 331 | if size is not None and last_byte_index + 1 != size: |
| 332 | raise IOError('Incomplete response. Content-Range: %s' % content_range) |
| 333 | |
| 334 | for data in connection.iter_content(NET_IO_FILE_CHUNK): |
| 335 | yield data |
| 336 | |
| 337 | def push(self, item, push_state, content=None): |
| 338 | assert isinstance(item, Item) |
| 339 | assert item.digest is not None |
| 340 | assert item.size is not None |
| 341 | assert isinstance(push_state, _IsolateServerPushState) |
| 342 | assert not push_state.finalized |
| 343 | |
| 344 | # Default to item.content(). |
| 345 | content = item.content() if content is None else content |
| 346 | logging.info('Push state size: %d', push_state.size) |
| 347 | guard_memory_use(self, content, push_state.size) |
| 348 | |
| 349 | try: |
| 350 | # This push operation may be a retry after failed finalization call below, |
| 351 | # no need to reupload contents in that case. |
| 352 | if not push_state.uploaded: |
| 353 | # PUT file to |upload_url|. |
| 354 | success = self._do_push(push_state, content) |
| 355 | if not success: |
| 356 | raise IOError('Failed to upload file with hash %s to URL %s' % ( |
| 357 | item.digest, push_state.upload_url)) |
| 358 | push_state.uploaded = True |
| 359 | else: |
| 360 | logging.info( |
| 361 | 'A file %s already uploaded, retrying finalization only', |
| 362 | item.digest) |
| 363 | |
| 364 | # Optionally notify the server that it's done. |
| 365 | if push_state.finalize_url: |
| 366 | # TODO(vadimsh): Calculate MD5 or CRC32C sum while uploading a file and |
| 367 | # send it to isolated server. That way isolate server can verify that |
| 368 | # the data safely reached Google Storage (GS provides MD5 and CRC32C of |
| 369 | # stored files). |
| 370 | # TODO(maruel): Fix the server to accept properly data={} so |
| 371 | # url_read_json() can be used. |
| 372 | response = net.url_read_json( |
| 373 | url='%s/%s' % (self._base_url, push_state.finalize_url), |
| 374 | data={ |
| 375 | 'upload_ticket': push_state.preupload_status['upload_ticket'], |
| 376 | }) |
| 377 | if not response or not response['ok']: |
| 378 | raise IOError('Failed to finalize file with hash %s.' % item.digest) |
| 379 | push_state.finalized = True |
| 380 | finally: |
| 381 | with self._lock: |
| 382 | self._memory_use -= push_state.size |
| 383 | |
| 384 | def contains(self, items): |
| 385 | # Ensure all items were initialized with 'prepare' call. Storage does that. |
| 386 | assert all(i.digest is not None and i.size is not None for i in items) |
| 387 | |
| 388 | # Request body is a json encoded list of dicts. |
| 389 | body = { |
| 390 | 'items': [ |
| 391 | { |
| 392 | 'digest': item.digest, |
| 393 | 'is_isolated': bool(item.high_priority), |
| 394 | 'size': item.size, |
| 395 | } for item in items |
| 396 | ], |
| 397 | 'namespace': self._namespace_dict, |
| 398 | } |
| 399 | |
| 400 | query_url = '%s/api/isolateservice/v1/preupload' % self._base_url |
| 401 | |
| 402 | # Response body is a list of push_urls (or null if file is already present). |
| 403 | response = None |
| 404 | try: |
| 405 | response = net.url_read_json(url=query_url, data=body) |
| 406 | if response is None: |
| 407 | raise isolated_format.MappingError( |
| 408 | 'Failed to execute preupload query') |
| 409 | except ValueError as err: |
| 410 | raise isolated_format.MappingError( |
| 411 | 'Invalid response from server: %s, body is %s' % (err, response)) |
| 412 | |
| 413 | # Pick Items that are missing, attach _PushState to them. |
| 414 | missing_items = {} |
| 415 | for preupload_status in response.get('items', []): |
| 416 | assert 'upload_ticket' in preupload_status, ( |
| 417 | preupload_status, '/preupload did not generate an upload ticket') |
| 418 | index = int(preupload_status['index']) |
| 419 | missing_items[items[index]] = _IsolateServerPushState( |
| 420 | preupload_status, items[index].size) |
| 421 | logging.info('Queried %d files, %d cache hit', |
| 422 | len(items), len(items) - len(missing_items)) |
| 423 | return missing_items |
| 424 | |
| 425 | def _do_fetch(self, url, digest, offset): |
| 426 | """Fetches isolated data from the URL. |
| 427 | |
| 428 | Used only for fetching files, not for API calls. Can be overridden in |
| 429 | subclasses. |
| 430 | |
| 431 | Args: |
| 432 | url: URL to fetch the data from, can possibly return http redirect. |
| 433 | offset: byte offset inside the file to start fetching from. |
| 434 | |
| 435 | Returns: |
| 436 | net.HttpResponse compatible object, with 'read' and 'get_header' calls. |
| 437 | """ |
| 438 | assert isinstance(offset, int) |
| 439 | data = { |
| 440 | 'digest': digest.encode('utf-8'), |
| 441 | 'namespace': self._namespace_dict, |
| 442 | 'offset': offset, |
| 443 | } |
| 444 | # TODO(maruel): url + '?' + urllib.urlencode(data) once a HTTP GET endpoint |
| 445 | # is added. |
| 446 | return net.url_read_json( |
| 447 | url=url, |
| 448 | data=data, |
| 449 | read_timeout=DOWNLOAD_READ_TIMEOUT) |
| 450 | |
| 451 | def _do_push(self, push_state, content): |
| 452 | """Uploads isolated file to the URL. |
| 453 | |
| 454 | Used only for storing files, not for API calls. Can be overridden in |
| 455 | subclasses. |
| 456 | |
| 457 | Args: |
| 458 | url: URL to upload the data to. |
| 459 | push_state: an _IsolateServicePushState instance |
| 460 | item: the original Item to be uploaded |
| 461 | content: an iterable that yields 'str' chunks. |
| 462 | """ |
| 463 | # A cheezy way to avoid memcpy of (possibly huge) file, until streaming |
| 464 | # upload support is implemented. |
| 465 | if isinstance(content, list) and len(content) == 1: |
| 466 | content = content[0] |
| 467 | else: |
| 468 | content = ''.join(content) |
| 469 | |
| 470 | # DB upload |
| 471 | if not push_state.finalize_url: |
| 472 | url = '%s/%s' % (self._base_url, push_state.upload_url) |
| 473 | content = base64.b64encode(content) |
| 474 | data = { |
| 475 | 'upload_ticket': push_state.preupload_status['upload_ticket'], |
| 476 | 'content': content, |
| 477 | } |
| 478 | response = net.url_read_json(url=url, data=data) |
| 479 | return response is not None and response['ok'] |
| 480 | |
| 481 | # upload to GS |
| 482 | url = push_state.upload_url |
| 483 | response = net.url_read( |
| 484 | content_type='application/octet-stream', |
| 485 | data=content, |
| 486 | method='PUT', |
| 487 | headers={'Cache-Control': 'public, max-age=31536000'}, |
| 488 | url=url) |
| 489 | return response is not None |
| 490 | |
| 491 | |
| 492 | class _IsolateServerGrpcPushState(object): |
| 493 | """Empty class, just to present same interface as IsolateServer """ |
| 494 | |
| 495 | def __init__(self): |
| 496 | pass |
| 497 | |
| 498 | |
| 499 | class IsolateServerGrpc(StorageApi): |
| 500 | """StorageApi implementation that downloads and uploads to a gRPC service. |
| 501 | |
| 502 | Limitations: only works for the default-gzip namespace, and with zero offsets |
| 503 | while fetching. |
| 504 | """ |
| 505 | |
| 506 | def __init__(self, server, namespace): |
| 507 | super(IsolateServerGrpc, self).__init__() |
| 508 | logging.info('Using gRPC for Isolate') |
| 509 | |
| 510 | # Make sure grpc was successfully imported |
| 511 | assert grpc |
| 512 | assert isolate_bot_pb2 |
| 513 | |
| 514 | # Proxies only support the default-gzip namespace for now. |
| 515 | # TODO(aludwin): support other namespaces |
| 516 | assert namespace == 'default-gzip' |
| 517 | self._server = server |
| 518 | self._channel = grpc.insecure_channel(server) |
| 519 | self._stub = isolate_bot_pb2.FileServiceStub(self._channel) |
| 520 | self._lock = threading.Lock() |
| 521 | self._memory_use = 0 |
| 522 | logging.info('...gRPC successfully initialized') |
| 523 | |
| 524 | @property |
| 525 | def location(self): |
| 526 | return self._server |
| 527 | |
| 528 | @property |
| 529 | def namespace(self): |
| 530 | # This is used to determine if the data is compressed, but gRPC proxies |
| 531 | # don't have concepts of 'namespaces' and natively compress all messages |
| 532 | # before transmission. So return an unlikely-to-be-used name so that |
| 533 | # isolateserver doesn't try to compress anything. |
| 534 | return 'grpc-proxy' |
| 535 | |
| 536 | def fetch(self, digest, offset=0): |
| 537 | # The gRPC APIs only work with an offset of 0 |
| 538 | assert offset == 0 |
| 539 | request = isolate_bot_pb2.FetchBlobsRequest() |
| 540 | req_digest = request.digest.add() |
| 541 | # Convert the utf-8 encoded hexidecimal string (like '012abc') to a byte |
| 542 | # array (like [0x01, 0x2a, 0xbc]). |
| 543 | req_digest.digest = binascii.unhexlify(digest) |
| 544 | expected_offset = 0 |
aludwin | ee2faf7 | 2016-12-21 10:36:03 -0800 | [diff] [blame^] | 545 | try: |
| 546 | for response in self._stub.FetchBlobs(request, |
| 547 | timeout=DOWNLOAD_READ_TIMEOUT): |
| 548 | if not response.status.succeeded: |
| 549 | raise IOError( |
| 550 | 'Error while fetching %s: %s' % (digest, response.status)) |
| 551 | if not expected_offset == response.data.offset: |
| 552 | raise IOError( |
| 553 | 'Error while fetching %s: expected offset %d, got %d' % ( |
| 554 | digest, expected_offset, response.data.offset)) |
| 555 | expected_offset += len(response.data.data) |
| 556 | yield response.data.data |
| 557 | except grpc.RpcError as g: |
| 558 | logging.error('gRPC error during fetch: re-throwing as IOError (%s)' % g) |
| 559 | raise IOError(g) |
aludwin | 8117830 | 2016-11-30 17:18:49 -0800 | [diff] [blame] | 560 | |
| 561 | def push(self, item, push_state, content=None): |
| 562 | assert isinstance(item, Item) |
| 563 | assert item.digest is not None |
| 564 | assert item.size is not None |
| 565 | assert isinstance(push_state, _IsolateServerGrpcPushState) |
| 566 | |
| 567 | # Default to item.content(). |
| 568 | content = item.content() if content is None else content |
| 569 | guard_memory_use(self, content, item.size) |
| 570 | |
| 571 | try: |
aludwin | bdcd1bb | 2016-12-08 08:39:53 -0800 | [diff] [blame] | 572 | def chunker(): |
| 573 | # Returns one bit of content at a time |
aludwin | d1e5913 | 2016-12-14 08:22:41 -0800 | [diff] [blame] | 574 | if (isinstance(content, str) |
| 575 | or not isinstance(content, collections.Iterable)): |
aludwin | bdcd1bb | 2016-12-08 08:39:53 -0800 | [diff] [blame] | 576 | yield content |
| 577 | else: |
| 578 | for chunk in content: |
| 579 | yield chunk |
| 580 | def slicer(): |
| 581 | # Ensures every bit of content is under the gRPC max size; yields |
| 582 | # proto messages to send via gRPC. |
| 583 | request = isolate_bot_pb2.PushBlobsRequest() |
| 584 | request.data.digest.digest = binascii.unhexlify(item.digest) |
| 585 | request.data.digest.size_bytes = item.size |
| 586 | request.data.offset = 0 |
| 587 | for chunk in chunker(): |
| 588 | # Make sure we send at least one chunk for zero-length blobs |
| 589 | has_sent_anything = False |
aludwin | d1e5913 | 2016-12-14 08:22:41 -0800 | [diff] [blame] | 590 | while chunk or not has_sent_anything: |
aludwin | bdcd1bb | 2016-12-08 08:39:53 -0800 | [diff] [blame] | 591 | slice_len = min(len(chunk), NET_IO_FILE_CHUNK) |
| 592 | request.data.data = chunk[:slice_len] |
| 593 | yield request |
| 594 | has_sent_anything = True |
| 595 | request.data.offset += slice_len |
| 596 | # The proxy only expects the first chunk to have the digest |
| 597 | request.data.ClearField("digest") |
| 598 | chunk = chunk[slice_len:] |
| 599 | |
aludwin | 8117830 | 2016-11-30 17:18:49 -0800 | [diff] [blame] | 600 | # TODO(aludwin): batch up several requests to reuse TCP connections |
aludwin | ee2faf7 | 2016-12-21 10:36:03 -0800 | [diff] [blame^] | 601 | try: |
| 602 | response = self._stub.PushBlobs(slicer()) |
| 603 | except grpc.RpcError as g: |
| 604 | logging.error('gRPC error during push: re-throwing as IOError (%s)' % g) |
| 605 | raise IOError(g) |
| 606 | |
aludwin | 8117830 | 2016-11-30 17:18:49 -0800 | [diff] [blame] | 607 | if not response.status.succeeded: |
| 608 | raise IOError( |
| 609 | 'Error while uploading %s: %s' % ( |
| 610 | item.digest, response.status.error_detail)) |
| 611 | |
| 612 | finally: |
| 613 | with self._lock: |
| 614 | self._memory_use -= item.size |
| 615 | |
| 616 | def contains(self, items): |
| 617 | """Returns the set of all missing items.""" |
| 618 | # Ensure all items were initialized with 'prepare' call. Storage does that. |
| 619 | assert all(i.digest is not None and i.size is not None for i in items) |
| 620 | request = isolate_bot_pb2.ContainsRequest() |
| 621 | items_by_digest = {} |
| 622 | for item in items: |
| 623 | cd = request.digest.add() |
| 624 | cd.digest = binascii.unhexlify(item.digest) |
| 625 | items_by_digest[cd.digest] = item |
aludwin | ee2faf7 | 2016-12-21 10:36:03 -0800 | [diff] [blame^] | 626 | try: |
| 627 | response = self._stub.Contains(request) |
| 628 | except grpc.RpcError as g: |
| 629 | logging.error('gRPC error during contains: re-throwing as IOError (%s)' |
| 630 | % g) |
| 631 | raise IOError(g) |
aludwin | 8117830 | 2016-11-30 17:18:49 -0800 | [diff] [blame] | 632 | |
| 633 | # If everything's present, return the empty set. |
| 634 | if response.status.succeeded: |
| 635 | return {} |
| 636 | |
| 637 | if not response.status.error == isolate_bot_pb2.BlobStatus.MISSING_DIGEST: |
| 638 | raise IOError('Unknown response during lookup: %s' % response.status) |
| 639 | |
| 640 | # Pick Items that are missing, attach _PushState to them. The gRPC |
| 641 | # implementation doesn't actually have a push state, we just attach |
| 642 | # empty objects to satisfy the StorageApi interface. |
| 643 | missing_items = {} |
| 644 | for missing in response.status.missing_digest: |
| 645 | item = items_by_digest[missing.digest] |
| 646 | missing_items[item] = _IsolateServerGrpcPushState() |
| 647 | |
| 648 | logging.info('Queried %d files, %d cache hit', |
| 649 | len(items), len(items) - len(missing_items)) |
| 650 | return missing_items |
| 651 | |
| 652 | |
| 653 | def set_storage_api_class(cls): |
| 654 | """Replaces StorageApi implementation used by default.""" |
| 655 | global _storage_api_cls |
| 656 | assert _storage_api_cls is None |
| 657 | assert issubclass(cls, StorageApi) |
| 658 | _storage_api_cls = cls |
| 659 | |
| 660 | |
| 661 | def get_storage_api(url, namespace): |
| 662 | """Returns an object that implements low-level StorageApi interface. |
| 663 | |
| 664 | It is used by Storage to work with single isolate |namespace|. It should |
| 665 | rarely be used directly by clients, see 'get_storage' for |
| 666 | a better alternative. |
| 667 | |
| 668 | Arguments: |
| 669 | url: URL of isolate service to use shared cloud based storage. |
| 670 | namespace: isolate namespace to operate in, also defines hashing and |
| 671 | compression scheme used, i.e. namespace names that end with '-gzip' |
| 672 | store compressed data. |
| 673 | |
| 674 | Returns: |
| 675 | Instance of StorageApi subclass. |
| 676 | """ |
| 677 | cls = _storage_api_cls or IsolateServer |
| 678 | return cls(url, namespace) |