blob: 7dc92b03db0b318288265c6b18322ed0626aa4fb [file] [log] [blame]
aludwin81178302016-11-30 17:18:49 -08001#!/usr/bin/env python
2# Copyright 2016 The LUCI Authors. All rights reserved.
3# Use of this source code is governed under the Apache License, Version 2.0
4# that can be found in the LICENSE file.
5
6"""A low-level blob storage/retrieval interface to the Isolate server"""
7
8import base64
9import binascii
aludwinbdcd1bb2016-12-08 08:39:53 -080010import collections
aludwin81178302016-11-30 17:18:49 -080011import logging
aludwin120e37e2017-06-28 13:05:58 -070012import os
aludwin81178302016-11-30 17:18:49 -080013import re
14import sys
15import threading
16import time
17import types
aludwin120e37e2017-06-28 13:05:58 -070018import uuid
aludwin81178302016-11-30 17:18:49 -080019
20from utils import file_path
21from utils import net
22
23import isolated_format
24
25# gRPC may not be installed on the worker machine. This is fine, as long as
26# the bot doesn't attempt to use gRPC (checked in IsolateServerGrpc.__init__).
aludwin120e37e2017-06-28 13:05:58 -070027# Full external requirements are: grpcio, certifi.
aludwin81178302016-11-30 17:18:49 -080028try:
29 import grpc
aludwin120e37e2017-06-28 13:05:58 -070030 from google import auth as google_auth
31 from google.auth.transport import grpc as google_auth_transport_grpc
32 from google.auth.transport import requests as google_auth_transport_requests
33 from proto import bytestream_pb2
34except ImportError as err:
aludwin81178302016-11-30 17:18:49 -080035 grpc = None
aludwin120e37e2017-06-28 13:05:58 -070036 bytestream_pb2 = None
aludwin81178302016-11-30 17:18:49 -080037
aludwin120e37e2017-06-28 13:05:58 -070038# If gRPC is installed, at least give a warning if certifi is not. This is not
39# actually used anywhere in this module, but if certifi is missing,
40# google.auth.transport will fail with
41# https://stackoverflow.com/questions/24973326
42if grpc is not None:
43 try:
44 import certifi
45 except ImportError as err:
46 logging.warning('could not import certifi; gRPC HTTPS connections may fail')
aludwin81178302016-11-30 17:18:49 -080047
48# Chunk size to use when reading from network stream.
49NET_IO_FILE_CHUNK = 16 * 1024
50
51
52# Read timeout in seconds for downloads from isolate storage. If there's no
53# response from the server within this timeout whole download will be aborted.
54DOWNLOAD_READ_TIMEOUT = 60
55
56
57# A class to use to communicate with the server by default. Can be changed by
58# 'set_storage_api_class'. Default is IsolateServer.
59_storage_api_cls = None
60
61
62class Item(object):
63 """An item to push to Storage.
64
65 Its digest and size may be provided in advance, if known. Otherwise they will
66 be derived from content(). If digest is provided, it MUST correspond to
67 hash algorithm used by Storage.
68
69 When used with Storage, Item starts its life in a main thread, travels
70 to 'contains' thread, then to 'push' thread and then finally back to
71 the main thread. It is never used concurrently from multiple threads.
72 """
73
74 def __init__(self, digest=None, size=None, high_priority=False):
75 self.digest = digest
76 self.size = size
77 self.high_priority = high_priority
78 self.compression_level = 6
79
80 def content(self):
81 """Iterable with content of this item as byte string (str) chunks."""
82 raise NotImplementedError()
83
84 def prepare(self, hash_algo):
85 """Ensures self.digest and self.size are set.
86
87 Uses content() as a source of data to calculate them. Does nothing if digest
88 and size is already known.
89
90 Arguments:
91 hash_algo: hash algorithm to use to calculate digest.
92 """
93 if self.digest is None or self.size is None:
94 digest = hash_algo()
95 total = 0
96 for chunk in self.content():
97 digest.update(chunk)
98 total += len(chunk)
99 self.digest = digest.hexdigest()
100 self.size = total
101
102
103class StorageApi(object):
104 """Interface for classes that implement low-level storage operations.
105
106 StorageApi is oblivious of compression and hashing scheme used. This details
107 are handled in higher level Storage class.
108
109 Clients should generally not use StorageApi directly. Storage class is
110 preferred since it implements compression and upload optimizations.
111 """
112
113 @property
114 def location(self):
115 """URL of the backing store that this class is using."""
116 raise NotImplementedError()
117
118 @property
119 def namespace(self):
120 """Isolate namespace used by this storage.
121
122 Indirectly defines hashing scheme and compression method used.
123 """
124 raise NotImplementedError()
125
126 def fetch(self, digest, offset=0):
127 """Fetches an object and yields its content.
128
129 Arguments:
130 digest: hash digest of item to download.
131 offset: offset (in bytes) from the start of the file to resume fetch from.
132
133 Yields:
134 Chunks of downloaded item (as str objects).
135 """
136 raise NotImplementedError()
137
138 def push(self, item, push_state, content=None):
139 """Uploads an |item| with content generated by |content| generator.
140
141 |item| MUST go through 'contains' call to get |push_state| before it can
142 be pushed to the storage.
143
144 To be clear, here is one possible usage:
145 all_items = [... all items to push as Item subclasses ...]
146 for missing_item, push_state in storage_api.contains(all_items).items():
147 storage_api.push(missing_item, push_state)
148
149 When pushing to a namespace with compression, data that should be pushed
150 and data provided by the item is not the same. In that case |content| is
151 not None and it yields chunks of compressed data (using item.content() as
152 a source of original uncompressed data). This is implemented by Storage
153 class.
154
155 Arguments:
156 item: Item object that holds information about an item being pushed.
157 push_state: push state object as returned by 'contains' call.
158 content: a generator that yields chunks to push, item.content() if None.
159
160 Returns:
161 None.
162 """
163 raise NotImplementedError()
164
165 def contains(self, items):
166 """Checks for |items| on the server, prepares missing ones for upload.
167
168 Arguments:
169 items: list of Item objects to check for presence.
170
171 Returns:
172 A dict missing Item -> opaque push state object to be passed to 'push'.
173 See doc string for 'push'.
174 """
175 raise NotImplementedError()
176
177
178class _IsolateServerPushState(object):
179 """Per-item state passed from IsolateServer.contains to IsolateServer.push.
180
181 Note this needs to be a global class to support pickling.
182 """
183
184 def __init__(self, preupload_status, size):
185 self.preupload_status = preupload_status
186 gs_upload_url = preupload_status.get('gs_upload_url') or None
187 if gs_upload_url:
188 self.upload_url = gs_upload_url
189 self.finalize_url = 'api/isolateservice/v1/finalize_gs_upload'
190 else:
191 self.upload_url = 'api/isolateservice/v1/store_inline'
192 self.finalize_url = None
193 self.uploaded = False
194 self.finalized = False
195 self.size = size
196
197
198def guard_memory_use(server, content, size):
199 """Guards a server against using excessive memory while uploading.
200
201 The server needs to contain a _memory_use int and a _lock mutex
202 (both IsolateServer and IsolateServerGrpc qualify); this function
203 then uses those values to track memory usage in a thread-safe way.
204
205 If a request would cause the memory usage to exceed a safe maximum,
206 this function sleeps in 0.1s increments until memory usage falls
207 below the maximum.
208 """
209 if isinstance(content, (basestring, list)):
210 # Memory is already used, too late.
211 with server._lock:
212 server._memory_use += size
213 else:
214 # TODO(vadimsh): Do not read from |content| generator when retrying push.
215 # If |content| is indeed a generator, it can not be re-winded back to the
216 # beginning of the stream. A retry will find it exhausted. A possible
217 # solution is to wrap |content| generator with some sort of caching
218 # restartable generator. It should be done alongside streaming support
219 # implementation.
220 #
221 # In theory, we should keep the generator, so that it is not serialized in
222 # memory. Sadly net.HttpService.request() requires the body to be
223 # serialized.
224 assert isinstance(content, types.GeneratorType), repr(content)
225 slept = False
226 # HACK HACK HACK. Please forgive me for my sins but OMG, it works!
227 # One byte less than 512mb. This is to cope with incompressible content.
228 max_size = int(sys.maxsize * 0.25)
229 while True:
230 with server._lock:
231 # This is due to 32 bits python when uploading very large files. The
232 # problem is that it's comparing uncompressed sizes, while we care
233 # about compressed sizes since it's what is serialized in memory.
234 # The first check assumes large files are compressible and that by
235 # throttling one upload at once, we can survive. Otherwise, kaboom.
236 memory_use = server._memory_use
237 if ((size >= max_size and not memory_use) or
238 (memory_use + size <= max_size)):
239 server._memory_use += size
240 memory_use = server._memory_use
241 break
242 time.sleep(0.1)
243 slept = True
244 if slept:
245 logging.info('Unblocked: %d %d', memory_use, size)
246
247
248class IsolateServer(StorageApi):
249 """StorageApi implementation that downloads and uploads to Isolate Server.
250
251 It uploads and downloads directly from Google Storage whenever appropriate.
252 Works only within single namespace.
253 """
254
255 def __init__(self, base_url, namespace):
256 super(IsolateServer, self).__init__()
257 assert file_path.is_url(base_url), base_url
258 self._base_url = base_url.rstrip('/')
259 self._namespace = namespace
260 self._namespace_dict = {
261 'compression': 'flate' if namespace.endswith(
262 ('-gzip', '-flate')) else '',
263 'digest_hash': 'sha-1',
264 'namespace': namespace,
265 }
266 self._lock = threading.Lock()
267 self._server_caps = None
268 self._memory_use = 0
269
270 @property
271 def _server_capabilities(self):
272 """Gets server details.
273
274 Returns:
275 Server capabilities dictionary as returned by /server_details endpoint.
276 """
277 # TODO(maruel): Make this request much earlier asynchronously while the
278 # files are being enumerated.
279
280 # TODO(vadimsh): Put |namespace| in the URL so that server can apply
281 # namespace-level ACLs to this call.
282
283 with self._lock:
284 if self._server_caps is None:
285 self._server_caps = net.url_read_json(
286 url='%s/api/isolateservice/v1/server_details' % self._base_url,
287 data={})
288 return self._server_caps
289
290 @property
291 def location(self):
292 return self._base_url
293
294 @property
295 def namespace(self):
296 return self._namespace
297
298 def fetch(self, digest, offset=0):
299 assert offset >= 0
300 source_url = '%s/api/isolateservice/v1/retrieve' % (
301 self._base_url)
302 logging.debug('download_file(%s, %d)', source_url, offset)
303 response = self._do_fetch(source_url, digest, offset)
304
305 if not response:
306 raise IOError(
307 'Attempted to fetch from %s; no data exist: %s / %s.' % (
308 source_url, self._namespace, digest))
309
310 # for DB uploads
311 content = response.get('content')
312 if content is not None:
313 yield base64.b64decode(content)
314 return
315
316 # for GS entities
317 connection = net.url_open(response['url'])
318 if not connection:
319 raise IOError('Failed to download %s / %s' % (self._namespace, digest))
320
321 # If |offset|, verify server respects it by checking Content-Range.
322 if offset:
323 content_range = connection.get_header('Content-Range')
324 if not content_range:
325 raise IOError('Missing Content-Range header')
326
327 # 'Content-Range' format is 'bytes <offset>-<last_byte_index>/<size>'.
328 # According to a spec, <size> can be '*' meaning "Total size of the file
329 # is not known in advance".
330 try:
331 match = re.match(r'bytes (\d+)-(\d+)/(\d+|\*)', content_range)
332 if not match:
333 raise ValueError()
334 content_offset = int(match.group(1))
335 last_byte_index = int(match.group(2))
336 size = None if match.group(3) == '*' else int(match.group(3))
337 except ValueError:
338 raise IOError('Invalid Content-Range header: %s' % content_range)
339
340 # Ensure returned offset equals requested one.
341 if offset != content_offset:
342 raise IOError('Expecting offset %d, got %d (Content-Range is %s)' % (
343 offset, content_offset, content_range))
344
345 # Ensure entire tail of the file is returned.
346 if size is not None and last_byte_index + 1 != size:
347 raise IOError('Incomplete response. Content-Range: %s' % content_range)
348
349 for data in connection.iter_content(NET_IO_FILE_CHUNK):
350 yield data
351
352 def push(self, item, push_state, content=None):
353 assert isinstance(item, Item)
354 assert item.digest is not None
355 assert item.size is not None
356 assert isinstance(push_state, _IsolateServerPushState)
357 assert not push_state.finalized
358
359 # Default to item.content().
360 content = item.content() if content is None else content
361 logging.info('Push state size: %d', push_state.size)
362 guard_memory_use(self, content, push_state.size)
363
364 try:
365 # This push operation may be a retry after failed finalization call below,
366 # no need to reupload contents in that case.
367 if not push_state.uploaded:
368 # PUT file to |upload_url|.
369 success = self._do_push(push_state, content)
370 if not success:
371 raise IOError('Failed to upload file with hash %s to URL %s' % (
372 item.digest, push_state.upload_url))
373 push_state.uploaded = True
374 else:
375 logging.info(
376 'A file %s already uploaded, retrying finalization only',
377 item.digest)
378
379 # Optionally notify the server that it's done.
380 if push_state.finalize_url:
381 # TODO(vadimsh): Calculate MD5 or CRC32C sum while uploading a file and
382 # send it to isolated server. That way isolate server can verify that
383 # the data safely reached Google Storage (GS provides MD5 and CRC32C of
384 # stored files).
385 # TODO(maruel): Fix the server to accept properly data={} so
386 # url_read_json() can be used.
387 response = net.url_read_json(
388 url='%s/%s' % (self._base_url, push_state.finalize_url),
389 data={
390 'upload_ticket': push_state.preupload_status['upload_ticket'],
391 })
392 if not response or not response['ok']:
393 raise IOError('Failed to finalize file with hash %s.' % item.digest)
394 push_state.finalized = True
395 finally:
396 with self._lock:
397 self._memory_use -= push_state.size
398
399 def contains(self, items):
400 # Ensure all items were initialized with 'prepare' call. Storage does that.
401 assert all(i.digest is not None and i.size is not None for i in items)
402
403 # Request body is a json encoded list of dicts.
404 body = {
405 'items': [
406 {
407 'digest': item.digest,
408 'is_isolated': bool(item.high_priority),
409 'size': item.size,
410 } for item in items
411 ],
412 'namespace': self._namespace_dict,
413 }
414
415 query_url = '%s/api/isolateservice/v1/preupload' % self._base_url
416
417 # Response body is a list of push_urls (or null if file is already present).
418 response = None
419 try:
420 response = net.url_read_json(url=query_url, data=body)
421 if response is None:
422 raise isolated_format.MappingError(
423 'Failed to execute preupload query')
424 except ValueError as err:
425 raise isolated_format.MappingError(
426 'Invalid response from server: %s, body is %s' % (err, response))
427
428 # Pick Items that are missing, attach _PushState to them.
429 missing_items = {}
430 for preupload_status in response.get('items', []):
431 assert 'upload_ticket' in preupload_status, (
432 preupload_status, '/preupload did not generate an upload ticket')
433 index = int(preupload_status['index'])
434 missing_items[items[index]] = _IsolateServerPushState(
435 preupload_status, items[index].size)
436 logging.info('Queried %d files, %d cache hit',
437 len(items), len(items) - len(missing_items))
438 return missing_items
439
440 def _do_fetch(self, url, digest, offset):
441 """Fetches isolated data from the URL.
442
443 Used only for fetching files, not for API calls. Can be overridden in
444 subclasses.
445
446 Args:
447 url: URL to fetch the data from, can possibly return http redirect.
448 offset: byte offset inside the file to start fetching from.
449
450 Returns:
451 net.HttpResponse compatible object, with 'read' and 'get_header' calls.
452 """
453 assert isinstance(offset, int)
454 data = {
455 'digest': digest.encode('utf-8'),
456 'namespace': self._namespace_dict,
457 'offset': offset,
458 }
459 # TODO(maruel): url + '?' + urllib.urlencode(data) once a HTTP GET endpoint
460 # is added.
461 return net.url_read_json(
462 url=url,
463 data=data,
464 read_timeout=DOWNLOAD_READ_TIMEOUT)
465
466 def _do_push(self, push_state, content):
467 """Uploads isolated file to the URL.
468
469 Used only for storing files, not for API calls. Can be overridden in
470 subclasses.
471
472 Args:
473 url: URL to upload the data to.
474 push_state: an _IsolateServicePushState instance
475 item: the original Item to be uploaded
476 content: an iterable that yields 'str' chunks.
477 """
478 # A cheezy way to avoid memcpy of (possibly huge) file, until streaming
479 # upload support is implemented.
480 if isinstance(content, list) and len(content) == 1:
481 content = content[0]
482 else:
483 content = ''.join(content)
484
485 # DB upload
486 if not push_state.finalize_url:
487 url = '%s/%s' % (self._base_url, push_state.upload_url)
488 content = base64.b64encode(content)
489 data = {
490 'upload_ticket': push_state.preupload_status['upload_ticket'],
491 'content': content,
492 }
493 response = net.url_read_json(url=url, data=data)
494 return response is not None and response['ok']
495
496 # upload to GS
497 url = push_state.upload_url
498 response = net.url_read(
499 content_type='application/octet-stream',
500 data=content,
501 method='PUT',
502 headers={'Cache-Control': 'public, max-age=31536000'},
503 url=url)
504 return response is not None
505
506
507class _IsolateServerGrpcPushState(object):
508 """Empty class, just to present same interface as IsolateServer """
509
510 def __init__(self):
511 pass
512
513
514class IsolateServerGrpc(StorageApi):
515 """StorageApi implementation that downloads and uploads to a gRPC service.
516
517 Limitations: only works for the default-gzip namespace, and with zero offsets
518 while fetching.
519 """
520
521 def __init__(self, server, namespace):
522 super(IsolateServerGrpc, self).__init__()
523 logging.info('Using gRPC for Isolate')
aludwin120e37e2017-06-28 13:05:58 -0700524 self._server = server
525 self._lock = threading.Lock()
526 self._memory_use = 0
527 self._num_pushes = 0
528 self._already_exists = 0
aludwin81178302016-11-30 17:18:49 -0800529
530 # Make sure grpc was successfully imported
531 assert grpc
aludwin120e37e2017-06-28 13:05:58 -0700532 assert bytestream_pb2
aludwin81178302016-11-30 17:18:49 -0800533 # Proxies only support the default-gzip namespace for now.
aludwin120e37e2017-06-28 13:05:58 -0700534 # TODO(aludwin): support other namespaces if necessary
aludwin81178302016-11-30 17:18:49 -0800535 assert namespace == 'default-gzip'
aludwin120e37e2017-06-28 13:05:58 -0700536
537 proxy = os.environ.get('ISOLATED_GRPC_PROXY', '')
538 roots = os.environ.get('ISOLATED_GRPC_PROXY_TLS_ROOTS')
539 overd = os.environ.get('ISOLATED_GRPC_PROXY_TLS_OVERRIDE')
540
541 # The "proxy" envvar must be of the form:
542 # http[s]://<server>[:port][/prefix]
543 m = re.search('^(https?):\/\/([^\/]+)/?(.*)$', proxy)
544 if not m:
545 raise ValueError(('gRPC proxy must have the form: '
546 'http[s]://<server>[:port][/prefix] '
547 '(given: %s)') % proxy)
548 transport = m.group(1)
549 host = m.group(2)
550 prefix = m.group(3)
551 if not prefix.endswith('/'):
552 prefix = prefix + '/'
553 logging.info('gRPC proxy: transport %s, host %s, prefix %s',
554 transport, host, prefix)
555 self._prefix = prefix
556
557 if transport == 'http':
558 self._channel = grpc.insecure_channel(host)
559 elif transport == 'https':
560 # Using cloud container builder scopes for testing:
561 scopes = ('https://www.googleapis.com/auth/cloud-build-service',)
562 credentials, _ = google_auth.default(scopes=scopes)
563 request = google_auth_transport_requests.Request()
564 options = ()
565 root_certs = None
566 if roots is not None:
567 logging.info('Using root CA %s', roots)
568 with open(roots) as f:
569 root_certs = f.read()
570 if overd is not None:
571 logging.info('Using TLS server override %s', overd)
572 options=(('grpc.ssl_target_name_override', overd),)
573 ssl_creds = grpc.ssl_channel_credentials(root_certificates=root_certs)
574 self._channel = google_auth_transport_grpc.secure_authorized_channel(
575 credentials, request, host, ssl_creds, options=options)
576 else:
577 raise ValueError('unknown transport %s (should be http[s])' % transport)
578 self._stub = bytestream_pb2.ByteStreamStub(self._channel)
aludwin81178302016-11-30 17:18:49 -0800579
580 @property
581 def location(self):
582 return self._server
583
584 @property
585 def namespace(self):
586 # This is used to determine if the data is compressed, but gRPC proxies
587 # don't have concepts of 'namespaces' and natively compress all messages
588 # before transmission. So return an unlikely-to-be-used name so that
589 # isolateserver doesn't try to compress anything.
590 return 'grpc-proxy'
591
592 def fetch(self, digest, offset=0):
593 # The gRPC APIs only work with an offset of 0
594 assert offset == 0
aludwin120e37e2017-06-28 13:05:58 -0700595 request = bytestream_pb2.ReadRequest()
596 #TODO(aludwin): send the expected size of the item
597 request.resource_name = '%sblobs/%s/0' % (
598 self._prefix, digest)
aludwinee2faf72016-12-21 10:36:03 -0800599 try:
aludwin120e37e2017-06-28 13:05:58 -0700600 for response in self._stub.Read(request, timeout=DOWNLOAD_READ_TIMEOUT):
601 yield response.data
aludwinee2faf72016-12-21 10:36:03 -0800602 except grpc.RpcError as g:
603 logging.error('gRPC error during fetch: re-throwing as IOError (%s)' % g)
604 raise IOError(g)
aludwin81178302016-11-30 17:18:49 -0800605
606 def push(self, item, push_state, content=None):
607 assert isinstance(item, Item)
608 assert item.digest is not None
609 assert item.size is not None
610 assert isinstance(push_state, _IsolateServerGrpcPushState)
611
612 # Default to item.content().
613 content = item.content() if content is None else content
614 guard_memory_use(self, content, item.size)
aludwin120e37e2017-06-28 13:05:58 -0700615 self._num_pushes += 1
aludwin81178302016-11-30 17:18:49 -0800616
617 try:
aludwinbdcd1bb2016-12-08 08:39:53 -0800618 def chunker():
619 # Returns one bit of content at a time
aludwind1e59132016-12-14 08:22:41 -0800620 if (isinstance(content, str)
621 or not isinstance(content, collections.Iterable)):
aludwinbdcd1bb2016-12-08 08:39:53 -0800622 yield content
623 else:
624 for chunk in content:
625 yield chunk
626 def slicer():
627 # Ensures every bit of content is under the gRPC max size; yields
628 # proto messages to send via gRPC.
aludwin120e37e2017-06-28 13:05:58 -0700629 request = bytestream_pb2.WriteRequest()
630 u = uuid.uuid4()
631 request.resource_name = '%suploads/%s/blobs/%s/%d' % (
632 self._prefix, u, item.digest, item.size)
633 request.write_offset = 0
aludwinbdcd1bb2016-12-08 08:39:53 -0800634 for chunk in chunker():
635 # Make sure we send at least one chunk for zero-length blobs
636 has_sent_anything = False
aludwind1e59132016-12-14 08:22:41 -0800637 while chunk or not has_sent_anything:
aludwinbdcd1bb2016-12-08 08:39:53 -0800638 has_sent_anything = True
aludwin120e37e2017-06-28 13:05:58 -0700639 slice_len = min(len(chunk), NET_IO_FILE_CHUNK)
640 request.data = chunk[:slice_len]
641 if request.write_offset + slice_len == item.size:
642 request.finish_write = True
643 yield request
644 request.write_offset += slice_len
aludwinbdcd1bb2016-12-08 08:39:53 -0800645 chunk = chunk[slice_len:]
646
aludwinee2faf72016-12-21 10:36:03 -0800647 try:
aludwin120e37e2017-06-28 13:05:58 -0700648 response = self._stub.Write(slicer())
649 except grpc.Call as c:
650 # You might think that errors from gRPC would be rpc.RpcError. You'd
651 # be... right... but it's *also* an instance of grpc.Call, and that's
652 # where the status code actually lives.
653 if c.code() == grpc.StatusCode.ALREADY_EXISTS:
654 # This is legit - we didn't check before we pushed so no problem if
655 # it's already there.
656 self._already_exists += 1
657 if self._already_exists % 100 == 0:
658 logging.info('unnecessarily pushed %d/%d blobs (%.1f%%)' % (
659 self._already_exists, self._num_pushes,
660 100.0 * self._already_exists / self._num_pushes))
661 else:
662 logging.error('gRPC error during push: throwing as IOError (%s)' % c)
663 raise IOError(c)
664 except Exception as e:
665 logging.error('error during push: throwing as IOError (%s)' % e)
666 raise IOError(e)
aludwinee2faf72016-12-21 10:36:03 -0800667
aludwin120e37e2017-06-28 13:05:58 -0700668 if response.committed_size != item.size:
669 raise IOError('%s/%d: incorrect size written (%d)' % (
670 item.digest, item.size, response.committed_size))
aludwin81178302016-11-30 17:18:49 -0800671
672 finally:
673 with self._lock:
674 self._memory_use -= item.size
675
676 def contains(self, items):
677 """Returns the set of all missing items."""
aludwin120e37e2017-06-28 13:05:58 -0700678 # TODO(aludwin): this isn't supported directly in Bytestream, so for now
679 # assume that nothing is present in the cache.
aludwin81178302016-11-30 17:18:49 -0800680 # Ensure all items were initialized with 'prepare' call. Storage does that.
681 assert all(i.digest is not None and i.size is not None for i in items)
aludwin120e37e2017-06-28 13:05:58 -0700682 # Assume all Items are missing, and attach _PushState to them. The gRPC
683 # implementation doesn't actually have a push state, we just attach empty
684 # objects to satisfy the StorageApi interface.
aludwin81178302016-11-30 17:18:49 -0800685 missing_items = {}
aludwin120e37e2017-06-28 13:05:58 -0700686 for item in items:
aludwin81178302016-11-30 17:18:49 -0800687 missing_items[item] = _IsolateServerGrpcPushState()
aludwin81178302016-11-30 17:18:49 -0800688 return missing_items
689
690
691def set_storage_api_class(cls):
692 """Replaces StorageApi implementation used by default."""
693 global _storage_api_cls
694 assert _storage_api_cls is None
695 assert issubclass(cls, StorageApi)
696 _storage_api_cls = cls
697
698
699def get_storage_api(url, namespace):
700 """Returns an object that implements low-level StorageApi interface.
701
702 It is used by Storage to work with single isolate |namespace|. It should
703 rarely be used directly by clients, see 'get_storage' for
704 a better alternative.
705
706 Arguments:
707 url: URL of isolate service to use shared cloud based storage.
708 namespace: isolate namespace to operate in, also defines hashing and
709 compression scheme used, i.e. namespace names that end with '-gzip'
710 store compressed data.
711
712 Returns:
713 Instance of StorageApi subclass.
714 """
715 cls = _storage_api_cls or IsolateServer
716 return cls(url, namespace)