blob: 9f8161fbe57f61e88536e18c665dc7738d7c917e [file] [log] [blame]
aludwin81178302016-11-30 17:18:49 -08001#!/usr/bin/env python
2# Copyright 2016 The LUCI Authors. All rights reserved.
3# Use of this source code is governed under the Apache License, Version 2.0
4# that can be found in the LICENSE file.
5
6"""A low-level blob storage/retrieval interface to the Isolate server"""
7
8import base64
aludwinbdcd1bb2016-12-08 08:39:53 -08009import collections
aludwin81178302016-11-30 17:18:49 -080010import logging
aludwin120e37e2017-06-28 13:05:58 -070011import os
aludwin81178302016-11-30 17:18:49 -080012import re
13import sys
14import threading
15import time
16import types
aludwin120e37e2017-06-28 13:05:58 -070017import uuid
aludwin81178302016-11-30 17:18:49 -080018
19from utils import file_path
20from utils import net
21
22import isolated_format
23
aludwin81178302016-11-30 17:18:49 -080024try:
aludwind5a67882017-08-04 12:58:10 -070025 import grpc # for error codes
26 from utils import grpc_proxy
aludwin120e37e2017-06-28 13:05:58 -070027 from proto import bytestream_pb2
Marc-Antoine Ruel85476a02017-09-20 21:08:51 -040028 # If not present, grpc crashes later.
29 import pyasn1_modules
aludwin120e37e2017-06-28 13:05:58 -070030except ImportError as err:
aludwin81178302016-11-30 17:18:49 -080031 grpc = None
aludwind5a67882017-08-04 12:58:10 -070032 grpc_proxy = None
aludwin120e37e2017-06-28 13:05:58 -070033 bytestream_pb2 = None
aludwin81178302016-11-30 17:18:49 -080034
35
36# Chunk size to use when reading from network stream.
37NET_IO_FILE_CHUNK = 16 * 1024
38
39
40# Read timeout in seconds for downloads from isolate storage. If there's no
41# response from the server within this timeout whole download will be aborted.
42DOWNLOAD_READ_TIMEOUT = 60
43
44
aludwin758d2462017-06-29 16:38:50 -070045# Stores the gRPC proxy address. Must be set if the storage API class is
46# IsolateServerGrpc (call 'set_grpc_proxy').
47_grpc_proxy = None
aludwin81178302016-11-30 17:18:49 -080048
49
50class Item(object):
51 """An item to push to Storage.
52
53 Its digest and size may be provided in advance, if known. Otherwise they will
54 be derived from content(). If digest is provided, it MUST correspond to
55 hash algorithm used by Storage.
56
57 When used with Storage, Item starts its life in a main thread, travels
58 to 'contains' thread, then to 'push' thread and then finally back to
59 the main thread. It is never used concurrently from multiple threads.
60 """
61
62 def __init__(self, digest=None, size=None, high_priority=False):
63 self.digest = digest
64 self.size = size
65 self.high_priority = high_priority
66 self.compression_level = 6
67
68 def content(self):
69 """Iterable with content of this item as byte string (str) chunks."""
70 raise NotImplementedError()
71
72 def prepare(self, hash_algo):
73 """Ensures self.digest and self.size are set.
74
75 Uses content() as a source of data to calculate them. Does nothing if digest
76 and size is already known.
77
78 Arguments:
79 hash_algo: hash algorithm to use to calculate digest.
80 """
81 if self.digest is None or self.size is None:
82 digest = hash_algo()
83 total = 0
84 for chunk in self.content():
85 digest.update(chunk)
86 total += len(chunk)
87 self.digest = digest.hexdigest()
88 self.size = total
89
90
91class StorageApi(object):
92 """Interface for classes that implement low-level storage operations.
93
94 StorageApi is oblivious of compression and hashing scheme used. This details
95 are handled in higher level Storage class.
96
97 Clients should generally not use StorageApi directly. Storage class is
98 preferred since it implements compression and upload optimizations.
99 """
100
101 @property
102 def location(self):
103 """URL of the backing store that this class is using."""
104 raise NotImplementedError()
105
106 @property
107 def namespace(self):
108 """Isolate namespace used by this storage.
109
110 Indirectly defines hashing scheme and compression method used.
111 """
112 raise NotImplementedError()
113
aludwinef01fa32017-06-29 12:01:03 -0700114 @property
115 def internal_compression(self):
116 """True if this class doesn't require external compression.
117
118 If true, callers should not compress items, even if the namespace indicates
119 otherwise. Compression will be performed by the StorageApi class.
120 """
121 return False
122
Adrian Ludwinb5b05312017-09-13 07:46:24 -0400123 def fetch(self, digest, size, offset):
aludwin81178302016-11-30 17:18:49 -0800124 """Fetches an object and yields its content.
125
126 Arguments:
127 digest: hash digest of item to download.
Adrian Ludwinb5b05312017-09-13 07:46:24 -0400128 size: size of the item to download if known, or None otherwise.
aludwin81178302016-11-30 17:18:49 -0800129 offset: offset (in bytes) from the start of the file to resume fetch from.
130
131 Yields:
132 Chunks of downloaded item (as str objects).
133 """
134 raise NotImplementedError()
135
136 def push(self, item, push_state, content=None):
137 """Uploads an |item| with content generated by |content| generator.
138
139 |item| MUST go through 'contains' call to get |push_state| before it can
140 be pushed to the storage.
141
142 To be clear, here is one possible usage:
143 all_items = [... all items to push as Item subclasses ...]
144 for missing_item, push_state in storage_api.contains(all_items).items():
145 storage_api.push(missing_item, push_state)
146
147 When pushing to a namespace with compression, data that should be pushed
148 and data provided by the item is not the same. In that case |content| is
149 not None and it yields chunks of compressed data (using item.content() as
150 a source of original uncompressed data). This is implemented by Storage
151 class.
152
153 Arguments:
154 item: Item object that holds information about an item being pushed.
155 push_state: push state object as returned by 'contains' call.
156 content: a generator that yields chunks to push, item.content() if None.
157
158 Returns:
159 None.
160 """
161 raise NotImplementedError()
162
163 def contains(self, items):
164 """Checks for |items| on the server, prepares missing ones for upload.
165
166 Arguments:
167 items: list of Item objects to check for presence.
168
169 Returns:
170 A dict missing Item -> opaque push state object to be passed to 'push'.
171 See doc string for 'push'.
172 """
173 raise NotImplementedError()
174
175
176class _IsolateServerPushState(object):
177 """Per-item state passed from IsolateServer.contains to IsolateServer.push.
178
179 Note this needs to be a global class to support pickling.
180 """
181
182 def __init__(self, preupload_status, size):
183 self.preupload_status = preupload_status
184 gs_upload_url = preupload_status.get('gs_upload_url') or None
185 if gs_upload_url:
186 self.upload_url = gs_upload_url
187 self.finalize_url = 'api/isolateservice/v1/finalize_gs_upload'
188 else:
189 self.upload_url = 'api/isolateservice/v1/store_inline'
190 self.finalize_url = None
191 self.uploaded = False
192 self.finalized = False
193 self.size = size
194
195
196def guard_memory_use(server, content, size):
197 """Guards a server against using excessive memory while uploading.
198
199 The server needs to contain a _memory_use int and a _lock mutex
200 (both IsolateServer and IsolateServerGrpc qualify); this function
201 then uses those values to track memory usage in a thread-safe way.
202
203 If a request would cause the memory usage to exceed a safe maximum,
204 this function sleeps in 0.1s increments until memory usage falls
205 below the maximum.
206 """
207 if isinstance(content, (basestring, list)):
208 # Memory is already used, too late.
209 with server._lock:
210 server._memory_use += size
211 else:
212 # TODO(vadimsh): Do not read from |content| generator when retrying push.
213 # If |content| is indeed a generator, it can not be re-winded back to the
214 # beginning of the stream. A retry will find it exhausted. A possible
215 # solution is to wrap |content| generator with some sort of caching
216 # restartable generator. It should be done alongside streaming support
217 # implementation.
218 #
219 # In theory, we should keep the generator, so that it is not serialized in
220 # memory. Sadly net.HttpService.request() requires the body to be
221 # serialized.
222 assert isinstance(content, types.GeneratorType), repr(content)
223 slept = False
224 # HACK HACK HACK. Please forgive me for my sins but OMG, it works!
225 # One byte less than 512mb. This is to cope with incompressible content.
226 max_size = int(sys.maxsize * 0.25)
227 while True:
228 with server._lock:
229 # This is due to 32 bits python when uploading very large files. The
230 # problem is that it's comparing uncompressed sizes, while we care
231 # about compressed sizes since it's what is serialized in memory.
232 # The first check assumes large files are compressible and that by
233 # throttling one upload at once, we can survive. Otherwise, kaboom.
234 memory_use = server._memory_use
235 if ((size >= max_size and not memory_use) or
236 (memory_use + size <= max_size)):
237 server._memory_use += size
238 memory_use = server._memory_use
239 break
240 time.sleep(0.1)
241 slept = True
242 if slept:
243 logging.info('Unblocked: %d %d', memory_use, size)
244
245
246class IsolateServer(StorageApi):
247 """StorageApi implementation that downloads and uploads to Isolate Server.
248
249 It uploads and downloads directly from Google Storage whenever appropriate.
250 Works only within single namespace.
251 """
252
253 def __init__(self, base_url, namespace):
254 super(IsolateServer, self).__init__()
255 assert file_path.is_url(base_url), base_url
256 self._base_url = base_url.rstrip('/')
257 self._namespace = namespace
Adrian Ludwinb5b05312017-09-13 07:46:24 -0400258 algo = isolated_format.get_hash_algo(namespace)
aludwin81178302016-11-30 17:18:49 -0800259 self._namespace_dict = {
260 'compression': 'flate' if namespace.endswith(
261 ('-gzip', '-flate')) else '',
Adrian Ludwinb5b05312017-09-13 07:46:24 -0400262 'digest_hash': isolated_format.SUPPORTED_ALGOS_REVERSE[algo],
aludwin81178302016-11-30 17:18:49 -0800263 'namespace': namespace,
264 }
265 self._lock = threading.Lock()
266 self._server_caps = None
267 self._memory_use = 0
268
269 @property
270 def _server_capabilities(self):
271 """Gets server details.
272
273 Returns:
274 Server capabilities dictionary as returned by /server_details endpoint.
275 """
276 # TODO(maruel): Make this request much earlier asynchronously while the
277 # files are being enumerated.
278
279 # TODO(vadimsh): Put |namespace| in the URL so that server can apply
280 # namespace-level ACLs to this call.
281
282 with self._lock:
283 if self._server_caps is None:
284 self._server_caps = net.url_read_json(
285 url='%s/api/isolateservice/v1/server_details' % self._base_url,
286 data={})
287 return self._server_caps
288
289 @property
290 def location(self):
291 return self._base_url
292
293 @property
294 def namespace(self):
295 return self._namespace
296
Adrian Ludwinb5b05312017-09-13 07:46:24 -0400297 def fetch(self, digest, _size, offset):
aludwin81178302016-11-30 17:18:49 -0800298 assert offset >= 0
299 source_url = '%s/api/isolateservice/v1/retrieve' % (
300 self._base_url)
301 logging.debug('download_file(%s, %d)', source_url, offset)
302 response = self._do_fetch(source_url, digest, offset)
303
304 if not response:
305 raise IOError(
306 'Attempted to fetch from %s; no data exist: %s / %s.' % (
307 source_url, self._namespace, digest))
308
309 # for DB uploads
310 content = response.get('content')
311 if content is not None:
312 yield base64.b64decode(content)
313 return
314
Marc-Antoine Ruela0fab7a2017-10-30 20:27:30 -0400315 if not response.get('url'):
316 raise IOError(
317 'Invalid response while fetching %s: %s' % (digest, response))
318
aludwin81178302016-11-30 17:18:49 -0800319 # for GS entities
320 connection = net.url_open(response['url'])
321 if not connection:
322 raise IOError('Failed to download %s / %s' % (self._namespace, digest))
323
324 # If |offset|, verify server respects it by checking Content-Range.
325 if offset:
326 content_range = connection.get_header('Content-Range')
327 if not content_range:
328 raise IOError('Missing Content-Range header')
329
330 # 'Content-Range' format is 'bytes <offset>-<last_byte_index>/<size>'.
331 # According to a spec, <size> can be '*' meaning "Total size of the file
332 # is not known in advance".
333 try:
334 match = re.match(r'bytes (\d+)-(\d+)/(\d+|\*)', content_range)
335 if not match:
336 raise ValueError()
337 content_offset = int(match.group(1))
338 last_byte_index = int(match.group(2))
339 size = None if match.group(3) == '*' else int(match.group(3))
340 except ValueError:
341 raise IOError('Invalid Content-Range header: %s' % content_range)
342
343 # Ensure returned offset equals requested one.
344 if offset != content_offset:
345 raise IOError('Expecting offset %d, got %d (Content-Range is %s)' % (
346 offset, content_offset, content_range))
347
348 # Ensure entire tail of the file is returned.
349 if size is not None and last_byte_index + 1 != size:
350 raise IOError('Incomplete response. Content-Range: %s' % content_range)
351
352 for data in connection.iter_content(NET_IO_FILE_CHUNK):
353 yield data
354
355 def push(self, item, push_state, content=None):
356 assert isinstance(item, Item)
357 assert item.digest is not None
358 assert item.size is not None
359 assert isinstance(push_state, _IsolateServerPushState)
360 assert not push_state.finalized
361
362 # Default to item.content().
363 content = item.content() if content is None else content
364 logging.info('Push state size: %d', push_state.size)
365 guard_memory_use(self, content, push_state.size)
366
367 try:
368 # This push operation may be a retry after failed finalization call below,
369 # no need to reupload contents in that case.
370 if not push_state.uploaded:
371 # PUT file to |upload_url|.
372 success = self._do_push(push_state, content)
373 if not success:
374 raise IOError('Failed to upload file with hash %s to URL %s' % (
375 item.digest, push_state.upload_url))
376 push_state.uploaded = True
377 else:
378 logging.info(
379 'A file %s already uploaded, retrying finalization only',
380 item.digest)
381
382 # Optionally notify the server that it's done.
383 if push_state.finalize_url:
384 # TODO(vadimsh): Calculate MD5 or CRC32C sum while uploading a file and
385 # send it to isolated server. That way isolate server can verify that
386 # the data safely reached Google Storage (GS provides MD5 and CRC32C of
387 # stored files).
388 # TODO(maruel): Fix the server to accept properly data={} so
389 # url_read_json() can be used.
390 response = net.url_read_json(
391 url='%s/%s' % (self._base_url, push_state.finalize_url),
392 data={
393 'upload_ticket': push_state.preupload_status['upload_ticket'],
394 })
Marc-Antoine Ruel3a314aa2017-10-10 17:41:06 -0400395 if not response or not response.get('ok'):
396 raise IOError(
397 'Failed to finalize file with hash %s\n%r' %
398 (item.digest, response))
aludwin81178302016-11-30 17:18:49 -0800399 push_state.finalized = True
400 finally:
401 with self._lock:
402 self._memory_use -= push_state.size
403
404 def contains(self, items):
405 # Ensure all items were initialized with 'prepare' call. Storage does that.
406 assert all(i.digest is not None and i.size is not None for i in items)
407
408 # Request body is a json encoded list of dicts.
409 body = {
410 'items': [
411 {
412 'digest': item.digest,
413 'is_isolated': bool(item.high_priority),
414 'size': item.size,
415 } for item in items
416 ],
417 'namespace': self._namespace_dict,
418 }
419
420 query_url = '%s/api/isolateservice/v1/preupload' % self._base_url
421
422 # Response body is a list of push_urls (or null if file is already present).
423 response = None
424 try:
425 response = net.url_read_json(url=query_url, data=body)
426 if response is None:
427 raise isolated_format.MappingError(
428 'Failed to execute preupload query')
429 except ValueError as err:
430 raise isolated_format.MappingError(
431 'Invalid response from server: %s, body is %s' % (err, response))
432
433 # Pick Items that are missing, attach _PushState to them.
434 missing_items = {}
435 for preupload_status in response.get('items', []):
436 assert 'upload_ticket' in preupload_status, (
437 preupload_status, '/preupload did not generate an upload ticket')
438 index = int(preupload_status['index'])
439 missing_items[items[index]] = _IsolateServerPushState(
440 preupload_status, items[index].size)
441 logging.info('Queried %d files, %d cache hit',
442 len(items), len(items) - len(missing_items))
443 return missing_items
444
445 def _do_fetch(self, url, digest, offset):
446 """Fetches isolated data from the URL.
447
448 Used only for fetching files, not for API calls. Can be overridden in
449 subclasses.
450
451 Args:
452 url: URL to fetch the data from, can possibly return http redirect.
453 offset: byte offset inside the file to start fetching from.
454
455 Returns:
456 net.HttpResponse compatible object, with 'read' and 'get_header' calls.
457 """
458 assert isinstance(offset, int)
459 data = {
460 'digest': digest.encode('utf-8'),
461 'namespace': self._namespace_dict,
462 'offset': offset,
463 }
464 # TODO(maruel): url + '?' + urllib.urlencode(data) once a HTTP GET endpoint
465 # is added.
466 return net.url_read_json(
467 url=url,
468 data=data,
469 read_timeout=DOWNLOAD_READ_TIMEOUT)
470
471 def _do_push(self, push_state, content):
472 """Uploads isolated file to the URL.
473
474 Used only for storing files, not for API calls. Can be overridden in
475 subclasses.
476
477 Args:
478 url: URL to upload the data to.
479 push_state: an _IsolateServicePushState instance
480 item: the original Item to be uploaded
481 content: an iterable that yields 'str' chunks.
482 """
483 # A cheezy way to avoid memcpy of (possibly huge) file, until streaming
484 # upload support is implemented.
485 if isinstance(content, list) and len(content) == 1:
486 content = content[0]
487 else:
488 content = ''.join(content)
489
490 # DB upload
491 if not push_state.finalize_url:
492 url = '%s/%s' % (self._base_url, push_state.upload_url)
493 content = base64.b64encode(content)
494 data = {
495 'upload_ticket': push_state.preupload_status['upload_ticket'],
496 'content': content,
497 }
498 response = net.url_read_json(url=url, data=data)
499 return response is not None and response['ok']
500
501 # upload to GS
502 url = push_state.upload_url
503 response = net.url_read(
504 content_type='application/octet-stream',
505 data=content,
506 method='PUT',
507 headers={'Cache-Control': 'public, max-age=31536000'},
508 url=url)
509 return response is not None
510
511
512class _IsolateServerGrpcPushState(object):
513 """Empty class, just to present same interface as IsolateServer """
514
515 def __init__(self):
516 pass
517
518
519class IsolateServerGrpc(StorageApi):
520 """StorageApi implementation that downloads and uploads to a gRPC service.
521
Adrian Ludwinb5b05312017-09-13 07:46:24 -0400522 Limitations: does not pass on namespace to the server (uses it only for hash
523 algo and compression), and only allows zero offsets while fetching.
aludwin81178302016-11-30 17:18:49 -0800524 """
525
aludwin758d2462017-06-29 16:38:50 -0700526 def __init__(self, server, namespace, proxy):
aludwin81178302016-11-30 17:18:49 -0800527 super(IsolateServerGrpc, self).__init__()
Adrian Ludwinb5b05312017-09-13 07:46:24 -0400528 logging.info('Using gRPC for Isolate with server %s, '
529 'namespace %s, proxy %s',
530 server, namespace, proxy)
aludwin120e37e2017-06-28 13:05:58 -0700531 self._server = server
532 self._lock = threading.Lock()
533 self._memory_use = 0
534 self._num_pushes = 0
535 self._already_exists = 0
aludwind5a67882017-08-04 12:58:10 -0700536 self._proxy = grpc_proxy.Proxy(proxy, bytestream_pb2.ByteStreamStub)
aludwinef01fa32017-06-29 12:01:03 -0700537 self._namespace = namespace
538
aludwin81178302016-11-30 17:18:49 -0800539
540 @property
541 def location(self):
542 return self._server
543
544 @property
545 def namespace(self):
aludwinef01fa32017-06-29 12:01:03 -0700546 return self._namespace
547
548 @property
549 def internal_compression(self):
550 # gRPC natively compresses all messages before transmission.
551 return True
aludwin81178302016-11-30 17:18:49 -0800552
Adrian Ludwinb5b05312017-09-13 07:46:24 -0400553 def fetch(self, digest, size, offset):
aludwin81178302016-11-30 17:18:49 -0800554 # The gRPC APIs only work with an offset of 0
555 assert offset == 0
aludwin120e37e2017-06-28 13:05:58 -0700556 request = bytestream_pb2.ReadRequest()
Adrian Ludwinb5b05312017-09-13 07:46:24 -0400557 if not size:
558 size = -1
559 request.resource_name = '%s/blobs/%s/%d' % (
560 self._proxy.prefix, digest, size)
aludwinee2faf72016-12-21 10:36:03 -0800561 try:
aludwind5a67882017-08-04 12:58:10 -0700562 for response in self._proxy.get_stream('Read', request):
aludwin120e37e2017-06-28 13:05:58 -0700563 yield response.data
aludwinee2faf72016-12-21 10:36:03 -0800564 except grpc.RpcError as g:
565 logging.error('gRPC error during fetch: re-throwing as IOError (%s)' % g)
566 raise IOError(g)
aludwin81178302016-11-30 17:18:49 -0800567
568 def push(self, item, push_state, content=None):
569 assert isinstance(item, Item)
570 assert item.digest is not None
571 assert item.size is not None
572 assert isinstance(push_state, _IsolateServerGrpcPushState)
573
574 # Default to item.content().
575 content = item.content() if content is None else content
576 guard_memory_use(self, content, item.size)
aludwin120e37e2017-06-28 13:05:58 -0700577 self._num_pushes += 1
aludwin81178302016-11-30 17:18:49 -0800578
579 try:
aludwinbdcd1bb2016-12-08 08:39:53 -0800580 def chunker():
581 # Returns one bit of content at a time
aludwind1e59132016-12-14 08:22:41 -0800582 if (isinstance(content, str)
583 or not isinstance(content, collections.Iterable)):
aludwinbdcd1bb2016-12-08 08:39:53 -0800584 yield content
585 else:
586 for chunk in content:
587 yield chunk
588 def slicer():
589 # Ensures every bit of content is under the gRPC max size; yields
590 # proto messages to send via gRPC.
aludwin120e37e2017-06-28 13:05:58 -0700591 request = bytestream_pb2.WriteRequest()
592 u = uuid.uuid4()
aludwind5a67882017-08-04 12:58:10 -0700593 request.resource_name = '%s/uploads/%s/blobs/%s/%d' % (
594 self._proxy.prefix, u, item.digest, item.size)
aludwin120e37e2017-06-28 13:05:58 -0700595 request.write_offset = 0
aludwinbdcd1bb2016-12-08 08:39:53 -0800596 for chunk in chunker():
597 # Make sure we send at least one chunk for zero-length blobs
598 has_sent_anything = False
aludwind1e59132016-12-14 08:22:41 -0800599 while chunk or not has_sent_anything:
aludwinbdcd1bb2016-12-08 08:39:53 -0800600 has_sent_anything = True
aludwin120e37e2017-06-28 13:05:58 -0700601 slice_len = min(len(chunk), NET_IO_FILE_CHUNK)
602 request.data = chunk[:slice_len]
603 if request.write_offset + slice_len == item.size:
604 request.finish_write = True
605 yield request
606 request.write_offset += slice_len
aludwinbdcd1bb2016-12-08 08:39:53 -0800607 chunk = chunk[slice_len:]
608
aludwinede55f22017-07-07 11:45:33 -0700609 response = None
aludwinee2faf72016-12-21 10:36:03 -0800610 try:
aludwind5a67882017-08-04 12:58:10 -0700611 response = self._proxy.call_no_retries('Write', slicer())
aludwinede55f22017-07-07 11:45:33 -0700612 except grpc.RpcError as r:
613 if r.code() == grpc.StatusCode.ALREADY_EXISTS:
aludwin120e37e2017-06-28 13:05:58 -0700614 # This is legit - we didn't check before we pushed so no problem if
615 # it's already there.
616 self._already_exists += 1
617 if self._already_exists % 100 == 0:
618 logging.info('unnecessarily pushed %d/%d blobs (%.1f%%)' % (
619 self._already_exists, self._num_pushes,
620 100.0 * self._already_exists / self._num_pushes))
621 else:
aludwinede55f22017-07-07 11:45:33 -0700622 logging.error('gRPC error during push: throwing as IOError (%s)' % r)
623 raise IOError(r)
aludwin120e37e2017-06-28 13:05:58 -0700624 except Exception as e:
625 logging.error('error during push: throwing as IOError (%s)' % e)
626 raise IOError(e)
aludwinee2faf72016-12-21 10:36:03 -0800627
aludwinede55f22017-07-07 11:45:33 -0700628 if response is not None and response.committed_size != item.size:
aludwin120e37e2017-06-28 13:05:58 -0700629 raise IOError('%s/%d: incorrect size written (%d)' % (
630 item.digest, item.size, response.committed_size))
aludwin81178302016-11-30 17:18:49 -0800631
632 finally:
633 with self._lock:
634 self._memory_use -= item.size
635
636 def contains(self, items):
637 """Returns the set of all missing items."""
aludwin120e37e2017-06-28 13:05:58 -0700638 # TODO(aludwin): this isn't supported directly in Bytestream, so for now
639 # assume that nothing is present in the cache.
aludwin81178302016-11-30 17:18:49 -0800640 # Ensure all items were initialized with 'prepare' call. Storage does that.
641 assert all(i.digest is not None and i.size is not None for i in items)
aludwin120e37e2017-06-28 13:05:58 -0700642 # Assume all Items are missing, and attach _PushState to them. The gRPC
643 # implementation doesn't actually have a push state, we just attach empty
644 # objects to satisfy the StorageApi interface.
aludwin81178302016-11-30 17:18:49 -0800645 missing_items = {}
aludwin120e37e2017-06-28 13:05:58 -0700646 for item in items:
aludwin81178302016-11-30 17:18:49 -0800647 missing_items[item] = _IsolateServerGrpcPushState()
aludwin81178302016-11-30 17:18:49 -0800648 return missing_items
649
650
aludwin758d2462017-06-29 16:38:50 -0700651def set_grpc_proxy(proxy):
652 """Sets the StorageApi to use the specified proxy."""
653 global _grpc_proxy
654 assert _grpc_proxy is None
655 _grpc_proxy = proxy
aludwin81178302016-11-30 17:18:49 -0800656
657
658def get_storage_api(url, namespace):
659 """Returns an object that implements low-level StorageApi interface.
660
661 It is used by Storage to work with single isolate |namespace|. It should
662 rarely be used directly by clients, see 'get_storage' for
663 a better alternative.
664
665 Arguments:
666 url: URL of isolate service to use shared cloud based storage.
667 namespace: isolate namespace to operate in, also defines hashing and
668 compression scheme used, i.e. namespace names that end with '-gzip'
669 store compressed data.
670
671 Returns:
672 Instance of StorageApi subclass.
673 """
aludwin758d2462017-06-29 16:38:50 -0700674 if _grpc_proxy is not None:
675 return IsolateServerGrpc(url, namespace, _grpc_proxy)
676 return IsolateServer(url, namespace)