blob: 94e8f14ed0ffd05c7ba08e26243dc9845cd83fbd [file] [log] [blame]
aludwin81178302016-11-30 17:18:49 -08001#!/usr/bin/env python
2# Copyright 2016 The LUCI Authors. All rights reserved.
3# Use of this source code is governed under the Apache License, Version 2.0
4# that can be found in the LICENSE file.
5
6"""A low-level blob storage/retrieval interface to the Isolate server"""
7
8import base64
aludwinbdcd1bb2016-12-08 08:39:53 -08009import collections
aludwin81178302016-11-30 17:18:49 -080010import logging
aludwin120e37e2017-06-28 13:05:58 -070011import os
aludwin81178302016-11-30 17:18:49 -080012import re
13import sys
14import threading
15import time
16import types
aludwin120e37e2017-06-28 13:05:58 -070017import uuid
aludwin81178302016-11-30 17:18:49 -080018
19from utils import file_path
20from utils import net
21
22import isolated_format
23
aludwin81178302016-11-30 17:18:49 -080024try:
aludwind5a67882017-08-04 12:58:10 -070025 import grpc # for error codes
26 from utils import grpc_proxy
aludwin120e37e2017-06-28 13:05:58 -070027 from proto import bytestream_pb2
Marc-Antoine Ruel85476a02017-09-20 21:08:51 -040028 # If not present, grpc crashes later.
29 import pyasn1_modules
aludwin120e37e2017-06-28 13:05:58 -070030except ImportError as err:
aludwin81178302016-11-30 17:18:49 -080031 grpc = None
aludwind5a67882017-08-04 12:58:10 -070032 grpc_proxy = None
aludwin120e37e2017-06-28 13:05:58 -070033 bytestream_pb2 = None
aludwin81178302016-11-30 17:18:49 -080034
35
36# Chunk size to use when reading from network stream.
37NET_IO_FILE_CHUNK = 16 * 1024
38
39
40# Read timeout in seconds for downloads from isolate storage. If there's no
41# response from the server within this timeout whole download will be aborted.
42DOWNLOAD_READ_TIMEOUT = 60
43
44
aludwin758d2462017-06-29 16:38:50 -070045# Stores the gRPC proxy address. Must be set if the storage API class is
46# IsolateServerGrpc (call 'set_grpc_proxy').
47_grpc_proxy = None
aludwin81178302016-11-30 17:18:49 -080048
49
50class Item(object):
51 """An item to push to Storage.
52
53 Its digest and size may be provided in advance, if known. Otherwise they will
54 be derived from content(). If digest is provided, it MUST correspond to
55 hash algorithm used by Storage.
56
57 When used with Storage, Item starts its life in a main thread, travels
58 to 'contains' thread, then to 'push' thread and then finally back to
59 the main thread. It is never used concurrently from multiple threads.
60 """
61
62 def __init__(self, digest=None, size=None, high_priority=False):
63 self.digest = digest
64 self.size = size
65 self.high_priority = high_priority
66 self.compression_level = 6
67
68 def content(self):
69 """Iterable with content of this item as byte string (str) chunks."""
70 raise NotImplementedError()
71
72 def prepare(self, hash_algo):
73 """Ensures self.digest and self.size are set.
74
75 Uses content() as a source of data to calculate them. Does nothing if digest
76 and size is already known.
77
78 Arguments:
79 hash_algo: hash algorithm to use to calculate digest.
80 """
81 if self.digest is None or self.size is None:
82 digest = hash_algo()
83 total = 0
84 for chunk in self.content():
85 digest.update(chunk)
86 total += len(chunk)
87 self.digest = digest.hexdigest()
88 self.size = total
89
90
91class StorageApi(object):
92 """Interface for classes that implement low-level storage operations.
93
94 StorageApi is oblivious of compression and hashing scheme used. This details
95 are handled in higher level Storage class.
96
97 Clients should generally not use StorageApi directly. Storage class is
98 preferred since it implements compression and upload optimizations.
99 """
100
101 @property
102 def location(self):
103 """URL of the backing store that this class is using."""
104 raise NotImplementedError()
105
106 @property
107 def namespace(self):
108 """Isolate namespace used by this storage.
109
110 Indirectly defines hashing scheme and compression method used.
111 """
112 raise NotImplementedError()
113
aludwinef01fa32017-06-29 12:01:03 -0700114 @property
115 def internal_compression(self):
116 """True if this class doesn't require external compression.
117
118 If true, callers should not compress items, even if the namespace indicates
119 otherwise. Compression will be performed by the StorageApi class.
120 """
121 return False
122
Adrian Ludwinb5b05312017-09-13 07:46:24 -0400123 def fetch(self, digest, size, offset):
aludwin81178302016-11-30 17:18:49 -0800124 """Fetches an object and yields its content.
125
126 Arguments:
127 digest: hash digest of item to download.
Adrian Ludwinb5b05312017-09-13 07:46:24 -0400128 size: size of the item to download if known, or None otherwise.
aludwin81178302016-11-30 17:18:49 -0800129 offset: offset (in bytes) from the start of the file to resume fetch from.
130
131 Yields:
132 Chunks of downloaded item (as str objects).
133 """
134 raise NotImplementedError()
135
136 def push(self, item, push_state, content=None):
137 """Uploads an |item| with content generated by |content| generator.
138
139 |item| MUST go through 'contains' call to get |push_state| before it can
140 be pushed to the storage.
141
142 To be clear, here is one possible usage:
143 all_items = [... all items to push as Item subclasses ...]
144 for missing_item, push_state in storage_api.contains(all_items).items():
145 storage_api.push(missing_item, push_state)
146
147 When pushing to a namespace with compression, data that should be pushed
148 and data provided by the item is not the same. In that case |content| is
149 not None and it yields chunks of compressed data (using item.content() as
150 a source of original uncompressed data). This is implemented by Storage
151 class.
152
153 Arguments:
154 item: Item object that holds information about an item being pushed.
155 push_state: push state object as returned by 'contains' call.
156 content: a generator that yields chunks to push, item.content() if None.
157
158 Returns:
159 None.
160 """
161 raise NotImplementedError()
162
163 def contains(self, items):
164 """Checks for |items| on the server, prepares missing ones for upload.
165
166 Arguments:
167 items: list of Item objects to check for presence.
168
169 Returns:
170 A dict missing Item -> opaque push state object to be passed to 'push'.
171 See doc string for 'push'.
172 """
173 raise NotImplementedError()
174
175
176class _IsolateServerPushState(object):
177 """Per-item state passed from IsolateServer.contains to IsolateServer.push.
178
179 Note this needs to be a global class to support pickling.
180 """
181
182 def __init__(self, preupload_status, size):
183 self.preupload_status = preupload_status
184 gs_upload_url = preupload_status.get('gs_upload_url') or None
185 if gs_upload_url:
186 self.upload_url = gs_upload_url
187 self.finalize_url = 'api/isolateservice/v1/finalize_gs_upload'
188 else:
189 self.upload_url = 'api/isolateservice/v1/store_inline'
190 self.finalize_url = None
191 self.uploaded = False
192 self.finalized = False
193 self.size = size
194
195
196def guard_memory_use(server, content, size):
197 """Guards a server against using excessive memory while uploading.
198
199 The server needs to contain a _memory_use int and a _lock mutex
200 (both IsolateServer and IsolateServerGrpc qualify); this function
201 then uses those values to track memory usage in a thread-safe way.
202
203 If a request would cause the memory usage to exceed a safe maximum,
204 this function sleeps in 0.1s increments until memory usage falls
205 below the maximum.
206 """
207 if isinstance(content, (basestring, list)):
208 # Memory is already used, too late.
209 with server._lock:
210 server._memory_use += size
211 else:
212 # TODO(vadimsh): Do not read from |content| generator when retrying push.
213 # If |content| is indeed a generator, it can not be re-winded back to the
214 # beginning of the stream. A retry will find it exhausted. A possible
215 # solution is to wrap |content| generator with some sort of caching
216 # restartable generator. It should be done alongside streaming support
217 # implementation.
218 #
219 # In theory, we should keep the generator, so that it is not serialized in
220 # memory. Sadly net.HttpService.request() requires the body to be
221 # serialized.
222 assert isinstance(content, types.GeneratorType), repr(content)
223 slept = False
224 # HACK HACK HACK. Please forgive me for my sins but OMG, it works!
225 # One byte less than 512mb. This is to cope with incompressible content.
226 max_size = int(sys.maxsize * 0.25)
227 while True:
228 with server._lock:
229 # This is due to 32 bits python when uploading very large files. The
230 # problem is that it's comparing uncompressed sizes, while we care
231 # about compressed sizes since it's what is serialized in memory.
232 # The first check assumes large files are compressible and that by
233 # throttling one upload at once, we can survive. Otherwise, kaboom.
234 memory_use = server._memory_use
235 if ((size >= max_size and not memory_use) or
236 (memory_use + size <= max_size)):
237 server._memory_use += size
238 memory_use = server._memory_use
239 break
240 time.sleep(0.1)
241 slept = True
242 if slept:
243 logging.info('Unblocked: %d %d', memory_use, size)
244
245
246class IsolateServer(StorageApi):
247 """StorageApi implementation that downloads and uploads to Isolate Server.
248
249 It uploads and downloads directly from Google Storage whenever appropriate.
250 Works only within single namespace.
251 """
252
253 def __init__(self, base_url, namespace):
254 super(IsolateServer, self).__init__()
255 assert file_path.is_url(base_url), base_url
256 self._base_url = base_url.rstrip('/')
257 self._namespace = namespace
Adrian Ludwinb5b05312017-09-13 07:46:24 -0400258 algo = isolated_format.get_hash_algo(namespace)
aludwin81178302016-11-30 17:18:49 -0800259 self._namespace_dict = {
260 'compression': 'flate' if namespace.endswith(
261 ('-gzip', '-flate')) else '',
Adrian Ludwinb5b05312017-09-13 07:46:24 -0400262 'digest_hash': isolated_format.SUPPORTED_ALGOS_REVERSE[algo],
aludwin81178302016-11-30 17:18:49 -0800263 'namespace': namespace,
264 }
265 self._lock = threading.Lock()
266 self._server_caps = None
267 self._memory_use = 0
268
269 @property
270 def _server_capabilities(self):
271 """Gets server details.
272
273 Returns:
274 Server capabilities dictionary as returned by /server_details endpoint.
275 """
276 # TODO(maruel): Make this request much earlier asynchronously while the
277 # files are being enumerated.
278
279 # TODO(vadimsh): Put |namespace| in the URL so that server can apply
280 # namespace-level ACLs to this call.
281
282 with self._lock:
283 if self._server_caps is None:
284 self._server_caps = net.url_read_json(
285 url='%s/api/isolateservice/v1/server_details' % self._base_url,
286 data={})
287 return self._server_caps
288
289 @property
290 def location(self):
291 return self._base_url
292
293 @property
294 def namespace(self):
295 return self._namespace
296
Adrian Ludwinb5b05312017-09-13 07:46:24 -0400297 def fetch(self, digest, _size, offset):
aludwin81178302016-11-30 17:18:49 -0800298 assert offset >= 0
299 source_url = '%s/api/isolateservice/v1/retrieve' % (
300 self._base_url)
301 logging.debug('download_file(%s, %d)', source_url, offset)
302 response = self._do_fetch(source_url, digest, offset)
303
304 if not response:
305 raise IOError(
306 'Attempted to fetch from %s; no data exist: %s / %s.' % (
307 source_url, self._namespace, digest))
308
309 # for DB uploads
310 content = response.get('content')
311 if content is not None:
312 yield base64.b64decode(content)
313 return
314
315 # for GS entities
316 connection = net.url_open(response['url'])
317 if not connection:
318 raise IOError('Failed to download %s / %s' % (self._namespace, digest))
319
320 # If |offset|, verify server respects it by checking Content-Range.
321 if offset:
322 content_range = connection.get_header('Content-Range')
323 if not content_range:
324 raise IOError('Missing Content-Range header')
325
326 # 'Content-Range' format is 'bytes <offset>-<last_byte_index>/<size>'.
327 # According to a spec, <size> can be '*' meaning "Total size of the file
328 # is not known in advance".
329 try:
330 match = re.match(r'bytes (\d+)-(\d+)/(\d+|\*)', content_range)
331 if not match:
332 raise ValueError()
333 content_offset = int(match.group(1))
334 last_byte_index = int(match.group(2))
335 size = None if match.group(3) == '*' else int(match.group(3))
336 except ValueError:
337 raise IOError('Invalid Content-Range header: %s' % content_range)
338
339 # Ensure returned offset equals requested one.
340 if offset != content_offset:
341 raise IOError('Expecting offset %d, got %d (Content-Range is %s)' % (
342 offset, content_offset, content_range))
343
344 # Ensure entire tail of the file is returned.
345 if size is not None and last_byte_index + 1 != size:
346 raise IOError('Incomplete response. Content-Range: %s' % content_range)
347
348 for data in connection.iter_content(NET_IO_FILE_CHUNK):
349 yield data
350
351 def push(self, item, push_state, content=None):
352 assert isinstance(item, Item)
353 assert item.digest is not None
354 assert item.size is not None
355 assert isinstance(push_state, _IsolateServerPushState)
356 assert not push_state.finalized
357
358 # Default to item.content().
359 content = item.content() if content is None else content
360 logging.info('Push state size: %d', push_state.size)
361 guard_memory_use(self, content, push_state.size)
362
363 try:
364 # This push operation may be a retry after failed finalization call below,
365 # no need to reupload contents in that case.
366 if not push_state.uploaded:
367 # PUT file to |upload_url|.
368 success = self._do_push(push_state, content)
369 if not success:
370 raise IOError('Failed to upload file with hash %s to URL %s' % (
371 item.digest, push_state.upload_url))
372 push_state.uploaded = True
373 else:
374 logging.info(
375 'A file %s already uploaded, retrying finalization only',
376 item.digest)
377
378 # Optionally notify the server that it's done.
379 if push_state.finalize_url:
380 # TODO(vadimsh): Calculate MD5 or CRC32C sum while uploading a file and
381 # send it to isolated server. That way isolate server can verify that
382 # the data safely reached Google Storage (GS provides MD5 and CRC32C of
383 # stored files).
384 # TODO(maruel): Fix the server to accept properly data={} so
385 # url_read_json() can be used.
386 response = net.url_read_json(
387 url='%s/%s' % (self._base_url, push_state.finalize_url),
388 data={
389 'upload_ticket': push_state.preupload_status['upload_ticket'],
390 })
Marc-Antoine Ruel3a314aa2017-10-10 17:41:06 -0400391 if not response or not response.get('ok'):
392 raise IOError(
393 'Failed to finalize file with hash %s\n%r' %
394 (item.digest, response))
aludwin81178302016-11-30 17:18:49 -0800395 push_state.finalized = True
396 finally:
397 with self._lock:
398 self._memory_use -= push_state.size
399
400 def contains(self, items):
401 # Ensure all items were initialized with 'prepare' call. Storage does that.
402 assert all(i.digest is not None and i.size is not None for i in items)
403
404 # Request body is a json encoded list of dicts.
405 body = {
406 'items': [
407 {
408 'digest': item.digest,
409 'is_isolated': bool(item.high_priority),
410 'size': item.size,
411 } for item in items
412 ],
413 'namespace': self._namespace_dict,
414 }
415
416 query_url = '%s/api/isolateservice/v1/preupload' % self._base_url
417
418 # Response body is a list of push_urls (or null if file is already present).
419 response = None
420 try:
421 response = net.url_read_json(url=query_url, data=body)
422 if response is None:
423 raise isolated_format.MappingError(
424 'Failed to execute preupload query')
425 except ValueError as err:
426 raise isolated_format.MappingError(
427 'Invalid response from server: %s, body is %s' % (err, response))
428
429 # Pick Items that are missing, attach _PushState to them.
430 missing_items = {}
431 for preupload_status in response.get('items', []):
432 assert 'upload_ticket' in preupload_status, (
433 preupload_status, '/preupload did not generate an upload ticket')
434 index = int(preupload_status['index'])
435 missing_items[items[index]] = _IsolateServerPushState(
436 preupload_status, items[index].size)
437 logging.info('Queried %d files, %d cache hit',
438 len(items), len(items) - len(missing_items))
439 return missing_items
440
441 def _do_fetch(self, url, digest, offset):
442 """Fetches isolated data from the URL.
443
444 Used only for fetching files, not for API calls. Can be overridden in
445 subclasses.
446
447 Args:
448 url: URL to fetch the data from, can possibly return http redirect.
449 offset: byte offset inside the file to start fetching from.
450
451 Returns:
452 net.HttpResponse compatible object, with 'read' and 'get_header' calls.
453 """
454 assert isinstance(offset, int)
455 data = {
456 'digest': digest.encode('utf-8'),
457 'namespace': self._namespace_dict,
458 'offset': offset,
459 }
460 # TODO(maruel): url + '?' + urllib.urlencode(data) once a HTTP GET endpoint
461 # is added.
462 return net.url_read_json(
463 url=url,
464 data=data,
465 read_timeout=DOWNLOAD_READ_TIMEOUT)
466
467 def _do_push(self, push_state, content):
468 """Uploads isolated file to the URL.
469
470 Used only for storing files, not for API calls. Can be overridden in
471 subclasses.
472
473 Args:
474 url: URL to upload the data to.
475 push_state: an _IsolateServicePushState instance
476 item: the original Item to be uploaded
477 content: an iterable that yields 'str' chunks.
478 """
479 # A cheezy way to avoid memcpy of (possibly huge) file, until streaming
480 # upload support is implemented.
481 if isinstance(content, list) and len(content) == 1:
482 content = content[0]
483 else:
484 content = ''.join(content)
485
486 # DB upload
487 if not push_state.finalize_url:
488 url = '%s/%s' % (self._base_url, push_state.upload_url)
489 content = base64.b64encode(content)
490 data = {
491 'upload_ticket': push_state.preupload_status['upload_ticket'],
492 'content': content,
493 }
494 response = net.url_read_json(url=url, data=data)
495 return response is not None and response['ok']
496
497 # upload to GS
498 url = push_state.upload_url
499 response = net.url_read(
500 content_type='application/octet-stream',
501 data=content,
502 method='PUT',
503 headers={'Cache-Control': 'public, max-age=31536000'},
504 url=url)
505 return response is not None
506
507
508class _IsolateServerGrpcPushState(object):
509 """Empty class, just to present same interface as IsolateServer """
510
511 def __init__(self):
512 pass
513
514
515class IsolateServerGrpc(StorageApi):
516 """StorageApi implementation that downloads and uploads to a gRPC service.
517
Adrian Ludwinb5b05312017-09-13 07:46:24 -0400518 Limitations: does not pass on namespace to the server (uses it only for hash
519 algo and compression), and only allows zero offsets while fetching.
aludwin81178302016-11-30 17:18:49 -0800520 """
521
aludwin758d2462017-06-29 16:38:50 -0700522 def __init__(self, server, namespace, proxy):
aludwin81178302016-11-30 17:18:49 -0800523 super(IsolateServerGrpc, self).__init__()
Adrian Ludwinb5b05312017-09-13 07:46:24 -0400524 logging.info('Using gRPC for Isolate with server %s, '
525 'namespace %s, proxy %s',
526 server, namespace, proxy)
aludwin120e37e2017-06-28 13:05:58 -0700527 self._server = server
528 self._lock = threading.Lock()
529 self._memory_use = 0
530 self._num_pushes = 0
531 self._already_exists = 0
aludwind5a67882017-08-04 12:58:10 -0700532 self._proxy = grpc_proxy.Proxy(proxy, bytestream_pb2.ByteStreamStub)
aludwinef01fa32017-06-29 12:01:03 -0700533 self._namespace = namespace
534
aludwin81178302016-11-30 17:18:49 -0800535
536 @property
537 def location(self):
538 return self._server
539
540 @property
541 def namespace(self):
aludwinef01fa32017-06-29 12:01:03 -0700542 return self._namespace
543
544 @property
545 def internal_compression(self):
546 # gRPC natively compresses all messages before transmission.
547 return True
aludwin81178302016-11-30 17:18:49 -0800548
Adrian Ludwinb5b05312017-09-13 07:46:24 -0400549 def fetch(self, digest, size, offset):
aludwin81178302016-11-30 17:18:49 -0800550 # The gRPC APIs only work with an offset of 0
551 assert offset == 0
aludwin120e37e2017-06-28 13:05:58 -0700552 request = bytestream_pb2.ReadRequest()
Adrian Ludwinb5b05312017-09-13 07:46:24 -0400553 if not size:
554 size = -1
555 request.resource_name = '%s/blobs/%s/%d' % (
556 self._proxy.prefix, digest, size)
aludwinee2faf72016-12-21 10:36:03 -0800557 try:
aludwind5a67882017-08-04 12:58:10 -0700558 for response in self._proxy.get_stream('Read', request):
aludwin120e37e2017-06-28 13:05:58 -0700559 yield response.data
aludwinee2faf72016-12-21 10:36:03 -0800560 except grpc.RpcError as g:
561 logging.error('gRPC error during fetch: re-throwing as IOError (%s)' % g)
562 raise IOError(g)
aludwin81178302016-11-30 17:18:49 -0800563
564 def push(self, item, push_state, content=None):
565 assert isinstance(item, Item)
566 assert item.digest is not None
567 assert item.size is not None
568 assert isinstance(push_state, _IsolateServerGrpcPushState)
569
570 # Default to item.content().
571 content = item.content() if content is None else content
572 guard_memory_use(self, content, item.size)
aludwin120e37e2017-06-28 13:05:58 -0700573 self._num_pushes += 1
aludwin81178302016-11-30 17:18:49 -0800574
575 try:
aludwinbdcd1bb2016-12-08 08:39:53 -0800576 def chunker():
577 # Returns one bit of content at a time
aludwind1e59132016-12-14 08:22:41 -0800578 if (isinstance(content, str)
579 or not isinstance(content, collections.Iterable)):
aludwinbdcd1bb2016-12-08 08:39:53 -0800580 yield content
581 else:
582 for chunk in content:
583 yield chunk
584 def slicer():
585 # Ensures every bit of content is under the gRPC max size; yields
586 # proto messages to send via gRPC.
aludwin120e37e2017-06-28 13:05:58 -0700587 request = bytestream_pb2.WriteRequest()
588 u = uuid.uuid4()
aludwind5a67882017-08-04 12:58:10 -0700589 request.resource_name = '%s/uploads/%s/blobs/%s/%d' % (
590 self._proxy.prefix, u, item.digest, item.size)
aludwin120e37e2017-06-28 13:05:58 -0700591 request.write_offset = 0
aludwinbdcd1bb2016-12-08 08:39:53 -0800592 for chunk in chunker():
593 # Make sure we send at least one chunk for zero-length blobs
594 has_sent_anything = False
aludwind1e59132016-12-14 08:22:41 -0800595 while chunk or not has_sent_anything:
aludwinbdcd1bb2016-12-08 08:39:53 -0800596 has_sent_anything = True
aludwin120e37e2017-06-28 13:05:58 -0700597 slice_len = min(len(chunk), NET_IO_FILE_CHUNK)
598 request.data = chunk[:slice_len]
599 if request.write_offset + slice_len == item.size:
600 request.finish_write = True
601 yield request
602 request.write_offset += slice_len
aludwinbdcd1bb2016-12-08 08:39:53 -0800603 chunk = chunk[slice_len:]
604
aludwinede55f22017-07-07 11:45:33 -0700605 response = None
aludwinee2faf72016-12-21 10:36:03 -0800606 try:
aludwind5a67882017-08-04 12:58:10 -0700607 response = self._proxy.call_no_retries('Write', slicer())
aludwinede55f22017-07-07 11:45:33 -0700608 except grpc.RpcError as r:
609 if r.code() == grpc.StatusCode.ALREADY_EXISTS:
aludwin120e37e2017-06-28 13:05:58 -0700610 # This is legit - we didn't check before we pushed so no problem if
611 # it's already there.
612 self._already_exists += 1
613 if self._already_exists % 100 == 0:
614 logging.info('unnecessarily pushed %d/%d blobs (%.1f%%)' % (
615 self._already_exists, self._num_pushes,
616 100.0 * self._already_exists / self._num_pushes))
617 else:
aludwinede55f22017-07-07 11:45:33 -0700618 logging.error('gRPC error during push: throwing as IOError (%s)' % r)
619 raise IOError(r)
aludwin120e37e2017-06-28 13:05:58 -0700620 except Exception as e:
621 logging.error('error during push: throwing as IOError (%s)' % e)
622 raise IOError(e)
aludwinee2faf72016-12-21 10:36:03 -0800623
aludwinede55f22017-07-07 11:45:33 -0700624 if response is not None and response.committed_size != item.size:
aludwin120e37e2017-06-28 13:05:58 -0700625 raise IOError('%s/%d: incorrect size written (%d)' % (
626 item.digest, item.size, response.committed_size))
aludwin81178302016-11-30 17:18:49 -0800627
628 finally:
629 with self._lock:
630 self._memory_use -= item.size
631
632 def contains(self, items):
633 """Returns the set of all missing items."""
aludwin120e37e2017-06-28 13:05:58 -0700634 # TODO(aludwin): this isn't supported directly in Bytestream, so for now
635 # assume that nothing is present in the cache.
aludwin81178302016-11-30 17:18:49 -0800636 # Ensure all items were initialized with 'prepare' call. Storage does that.
637 assert all(i.digest is not None and i.size is not None for i in items)
aludwin120e37e2017-06-28 13:05:58 -0700638 # Assume all Items are missing, and attach _PushState to them. The gRPC
639 # implementation doesn't actually have a push state, we just attach empty
640 # objects to satisfy the StorageApi interface.
aludwin81178302016-11-30 17:18:49 -0800641 missing_items = {}
aludwin120e37e2017-06-28 13:05:58 -0700642 for item in items:
aludwin81178302016-11-30 17:18:49 -0800643 missing_items[item] = _IsolateServerGrpcPushState()
aludwin81178302016-11-30 17:18:49 -0800644 return missing_items
645
646
aludwin758d2462017-06-29 16:38:50 -0700647def set_grpc_proxy(proxy):
648 """Sets the StorageApi to use the specified proxy."""
649 global _grpc_proxy
650 assert _grpc_proxy is None
651 _grpc_proxy = proxy
aludwin81178302016-11-30 17:18:49 -0800652
653
654def get_storage_api(url, namespace):
655 """Returns an object that implements low-level StorageApi interface.
656
657 It is used by Storage to work with single isolate |namespace|. It should
658 rarely be used directly by clients, see 'get_storage' for
659 a better alternative.
660
661 Arguments:
662 url: URL of isolate service to use shared cloud based storage.
663 namespace: isolate namespace to operate in, also defines hashing and
664 compression scheme used, i.e. namespace names that end with '-gzip'
665 store compressed data.
666
667 Returns:
668 Instance of StorageApi subclass.
669 """
aludwin758d2462017-06-29 16:38:50 -0700670 if _grpc_proxy is not None:
671 return IsolateServerGrpc(url, namespace, _grpc_proxy)
672 return IsolateServer(url, namespace)