blob: 7545ba266b126492922054d8a020d00271b2eb7f [file] [log] [blame]
aludwin81178302016-11-30 17:18:49 -08001#!/usr/bin/env python
2# Copyright 2016 The LUCI Authors. All rights reserved.
3# Use of this source code is governed under the Apache License, Version 2.0
4# that can be found in the LICENSE file.
5
6"""A low-level blob storage/retrieval interface to the Isolate server"""
7
8import base64
aludwinbdcd1bb2016-12-08 08:39:53 -08009import collections
aludwin81178302016-11-30 17:18:49 -080010import logging
aludwin120e37e2017-06-28 13:05:58 -070011import os
aludwin81178302016-11-30 17:18:49 -080012import re
13import sys
14import threading
15import time
16import types
aludwin120e37e2017-06-28 13:05:58 -070017import uuid
aludwin81178302016-11-30 17:18:49 -080018
19from utils import file_path
20from utils import net
21
22import isolated_format
23
aludwin81178302016-11-30 17:18:49 -080024try:
aludwind5a67882017-08-04 12:58:10 -070025 import grpc # for error codes
26 from utils import grpc_proxy
aludwin120e37e2017-06-28 13:05:58 -070027 from proto import bytestream_pb2
Marc-Antoine Ruel85476a02017-09-20 21:08:51 -040028 # If not present, grpc crashes later.
29 import pyasn1_modules
aludwin120e37e2017-06-28 13:05:58 -070030except ImportError as err:
aludwin81178302016-11-30 17:18:49 -080031 grpc = None
aludwind5a67882017-08-04 12:58:10 -070032 grpc_proxy = None
aludwin120e37e2017-06-28 13:05:58 -070033 bytestream_pb2 = None
aludwin81178302016-11-30 17:18:49 -080034
35
36# Chunk size to use when reading from network stream.
37NET_IO_FILE_CHUNK = 16 * 1024
38
39
40# Read timeout in seconds for downloads from isolate storage. If there's no
41# response from the server within this timeout whole download will be aborted.
42DOWNLOAD_READ_TIMEOUT = 60
43
44
aludwin758d2462017-06-29 16:38:50 -070045# Stores the gRPC proxy address. Must be set if the storage API class is
46# IsolateServerGrpc (call 'set_grpc_proxy').
47_grpc_proxy = None
aludwin81178302016-11-30 17:18:49 -080048
49
50class Item(object):
51 """An item to push to Storage.
52
53 Its digest and size may be provided in advance, if known. Otherwise they will
54 be derived from content(). If digest is provided, it MUST correspond to
55 hash algorithm used by Storage.
56
57 When used with Storage, Item starts its life in a main thread, travels
58 to 'contains' thread, then to 'push' thread and then finally back to
59 the main thread. It is never used concurrently from multiple threads.
60 """
61
62 def __init__(self, digest=None, size=None, high_priority=False):
63 self.digest = digest
64 self.size = size
65 self.high_priority = high_priority
66 self.compression_level = 6
67
68 def content(self):
69 """Iterable with content of this item as byte string (str) chunks."""
70 raise NotImplementedError()
71
72 def prepare(self, hash_algo):
73 """Ensures self.digest and self.size are set.
74
75 Uses content() as a source of data to calculate them. Does nothing if digest
76 and size is already known.
77
78 Arguments:
79 hash_algo: hash algorithm to use to calculate digest.
80 """
81 if self.digest is None or self.size is None:
82 digest = hash_algo()
83 total = 0
84 for chunk in self.content():
85 digest.update(chunk)
86 total += len(chunk)
87 self.digest = digest.hexdigest()
88 self.size = total
89
90
91class StorageApi(object):
92 """Interface for classes that implement low-level storage operations.
93
94 StorageApi is oblivious of compression and hashing scheme used. This details
95 are handled in higher level Storage class.
96
97 Clients should generally not use StorageApi directly. Storage class is
98 preferred since it implements compression and upload optimizations.
99 """
100
101 @property
102 def location(self):
103 """URL of the backing store that this class is using."""
104 raise NotImplementedError()
105
106 @property
107 def namespace(self):
108 """Isolate namespace used by this storage.
109
110 Indirectly defines hashing scheme and compression method used.
111 """
112 raise NotImplementedError()
113
aludwinef01fa32017-06-29 12:01:03 -0700114 @property
115 def internal_compression(self):
116 """True if this class doesn't require external compression.
117
118 If true, callers should not compress items, even if the namespace indicates
119 otherwise. Compression will be performed by the StorageApi class.
120 """
121 return False
122
Adrian Ludwinb5b05312017-09-13 07:46:24 -0400123 def fetch(self, digest, size, offset):
aludwin81178302016-11-30 17:18:49 -0800124 """Fetches an object and yields its content.
125
126 Arguments:
127 digest: hash digest of item to download.
Adrian Ludwinb5b05312017-09-13 07:46:24 -0400128 size: size of the item to download if known, or None otherwise.
aludwin81178302016-11-30 17:18:49 -0800129 offset: offset (in bytes) from the start of the file to resume fetch from.
130
131 Yields:
132 Chunks of downloaded item (as str objects).
133 """
134 raise NotImplementedError()
135
136 def push(self, item, push_state, content=None):
137 """Uploads an |item| with content generated by |content| generator.
138
139 |item| MUST go through 'contains' call to get |push_state| before it can
140 be pushed to the storage.
141
142 To be clear, here is one possible usage:
143 all_items = [... all items to push as Item subclasses ...]
144 for missing_item, push_state in storage_api.contains(all_items).items():
145 storage_api.push(missing_item, push_state)
146
147 When pushing to a namespace with compression, data that should be pushed
148 and data provided by the item is not the same. In that case |content| is
149 not None and it yields chunks of compressed data (using item.content() as
150 a source of original uncompressed data). This is implemented by Storage
151 class.
152
153 Arguments:
154 item: Item object that holds information about an item being pushed.
155 push_state: push state object as returned by 'contains' call.
156 content: a generator that yields chunks to push, item.content() if None.
157
158 Returns:
159 None.
160 """
161 raise NotImplementedError()
162
163 def contains(self, items):
164 """Checks for |items| on the server, prepares missing ones for upload.
165
166 Arguments:
167 items: list of Item objects to check for presence.
168
169 Returns:
170 A dict missing Item -> opaque push state object to be passed to 'push'.
171 See doc string for 'push'.
172 """
173 raise NotImplementedError()
174
175
176class _IsolateServerPushState(object):
177 """Per-item state passed from IsolateServer.contains to IsolateServer.push.
178
179 Note this needs to be a global class to support pickling.
180 """
181
182 def __init__(self, preupload_status, size):
183 self.preupload_status = preupload_status
184 gs_upload_url = preupload_status.get('gs_upload_url') or None
185 if gs_upload_url:
186 self.upload_url = gs_upload_url
187 self.finalize_url = 'api/isolateservice/v1/finalize_gs_upload'
188 else:
189 self.upload_url = 'api/isolateservice/v1/store_inline'
190 self.finalize_url = None
191 self.uploaded = False
192 self.finalized = False
193 self.size = size
194
195
196def guard_memory_use(server, content, size):
197 """Guards a server against using excessive memory while uploading.
198
199 The server needs to contain a _memory_use int and a _lock mutex
200 (both IsolateServer and IsolateServerGrpc qualify); this function
201 then uses those values to track memory usage in a thread-safe way.
202
203 If a request would cause the memory usage to exceed a safe maximum,
204 this function sleeps in 0.1s increments until memory usage falls
205 below the maximum.
206 """
207 if isinstance(content, (basestring, list)):
208 # Memory is already used, too late.
209 with server._lock:
210 server._memory_use += size
211 else:
212 # TODO(vadimsh): Do not read from |content| generator when retrying push.
213 # If |content| is indeed a generator, it can not be re-winded back to the
214 # beginning of the stream. A retry will find it exhausted. A possible
215 # solution is to wrap |content| generator with some sort of caching
216 # restartable generator. It should be done alongside streaming support
217 # implementation.
218 #
219 # In theory, we should keep the generator, so that it is not serialized in
220 # memory. Sadly net.HttpService.request() requires the body to be
221 # serialized.
222 assert isinstance(content, types.GeneratorType), repr(content)
223 slept = False
224 # HACK HACK HACK. Please forgive me for my sins but OMG, it works!
225 # One byte less than 512mb. This is to cope with incompressible content.
226 max_size = int(sys.maxsize * 0.25)
227 while True:
228 with server._lock:
229 # This is due to 32 bits python when uploading very large files. The
230 # problem is that it's comparing uncompressed sizes, while we care
231 # about compressed sizes since it's what is serialized in memory.
232 # The first check assumes large files are compressible and that by
233 # throttling one upload at once, we can survive. Otherwise, kaboom.
234 memory_use = server._memory_use
235 if ((size >= max_size and not memory_use) or
236 (memory_use + size <= max_size)):
237 server._memory_use += size
238 memory_use = server._memory_use
239 break
240 time.sleep(0.1)
241 slept = True
242 if slept:
243 logging.info('Unblocked: %d %d', memory_use, size)
244
245
246class IsolateServer(StorageApi):
247 """StorageApi implementation that downloads and uploads to Isolate Server.
248
249 It uploads and downloads directly from Google Storage whenever appropriate.
250 Works only within single namespace.
251 """
252
253 def __init__(self, base_url, namespace):
254 super(IsolateServer, self).__init__()
255 assert file_path.is_url(base_url), base_url
256 self._base_url = base_url.rstrip('/')
257 self._namespace = namespace
Adrian Ludwinb5b05312017-09-13 07:46:24 -0400258 algo = isolated_format.get_hash_algo(namespace)
aludwin81178302016-11-30 17:18:49 -0800259 self._namespace_dict = {
260 'compression': 'flate' if namespace.endswith(
261 ('-gzip', '-flate')) else '',
Adrian Ludwinb5b05312017-09-13 07:46:24 -0400262 'digest_hash': isolated_format.SUPPORTED_ALGOS_REVERSE[algo],
aludwin81178302016-11-30 17:18:49 -0800263 'namespace': namespace,
264 }
265 self._lock = threading.Lock()
266 self._server_caps = None
267 self._memory_use = 0
268
269 @property
270 def _server_capabilities(self):
271 """Gets server details.
272
273 Returns:
274 Server capabilities dictionary as returned by /server_details endpoint.
275 """
276 # TODO(maruel): Make this request much earlier asynchronously while the
277 # files are being enumerated.
278
279 # TODO(vadimsh): Put |namespace| in the URL so that server can apply
280 # namespace-level ACLs to this call.
281
282 with self._lock:
283 if self._server_caps is None:
284 self._server_caps = net.url_read_json(
285 url='%s/api/isolateservice/v1/server_details' % self._base_url,
286 data={})
287 return self._server_caps
288
289 @property
290 def location(self):
291 return self._base_url
292
293 @property
294 def namespace(self):
295 return self._namespace
296
Adrian Ludwinb5b05312017-09-13 07:46:24 -0400297 def fetch(self, digest, _size, offset):
aludwin81178302016-11-30 17:18:49 -0800298 assert offset >= 0
299 source_url = '%s/api/isolateservice/v1/retrieve' % (
300 self._base_url)
301 logging.debug('download_file(%s, %d)', source_url, offset)
302 response = self._do_fetch(source_url, digest, offset)
303
304 if not response:
305 raise IOError(
306 'Attempted to fetch from %s; no data exist: %s / %s.' % (
307 source_url, self._namespace, digest))
308
309 # for DB uploads
310 content = response.get('content')
311 if content is not None:
312 yield base64.b64decode(content)
313 return
314
315 # for GS entities
316 connection = net.url_open(response['url'])
317 if not connection:
318 raise IOError('Failed to download %s / %s' % (self._namespace, digest))
319
320 # If |offset|, verify server respects it by checking Content-Range.
321 if offset:
322 content_range = connection.get_header('Content-Range')
323 if not content_range:
324 raise IOError('Missing Content-Range header')
325
326 # 'Content-Range' format is 'bytes <offset>-<last_byte_index>/<size>'.
327 # According to a spec, <size> can be '*' meaning "Total size of the file
328 # is not known in advance".
329 try:
330 match = re.match(r'bytes (\d+)-(\d+)/(\d+|\*)', content_range)
331 if not match:
332 raise ValueError()
333 content_offset = int(match.group(1))
334 last_byte_index = int(match.group(2))
335 size = None if match.group(3) == '*' else int(match.group(3))
336 except ValueError:
337 raise IOError('Invalid Content-Range header: %s' % content_range)
338
339 # Ensure returned offset equals requested one.
340 if offset != content_offset:
341 raise IOError('Expecting offset %d, got %d (Content-Range is %s)' % (
342 offset, content_offset, content_range))
343
344 # Ensure entire tail of the file is returned.
345 if size is not None and last_byte_index + 1 != size:
346 raise IOError('Incomplete response. Content-Range: %s' % content_range)
347
348 for data in connection.iter_content(NET_IO_FILE_CHUNK):
349 yield data
350
351 def push(self, item, push_state, content=None):
352 assert isinstance(item, Item)
353 assert item.digest is not None
354 assert item.size is not None
355 assert isinstance(push_state, _IsolateServerPushState)
356 assert not push_state.finalized
357
358 # Default to item.content().
359 content = item.content() if content is None else content
360 logging.info('Push state size: %d', push_state.size)
361 guard_memory_use(self, content, push_state.size)
362
363 try:
364 # This push operation may be a retry after failed finalization call below,
365 # no need to reupload contents in that case.
366 if not push_state.uploaded:
367 # PUT file to |upload_url|.
368 success = self._do_push(push_state, content)
369 if not success:
370 raise IOError('Failed to upload file with hash %s to URL %s' % (
371 item.digest, push_state.upload_url))
372 push_state.uploaded = True
373 else:
374 logging.info(
375 'A file %s already uploaded, retrying finalization only',
376 item.digest)
377
378 # Optionally notify the server that it's done.
379 if push_state.finalize_url:
380 # TODO(vadimsh): Calculate MD5 or CRC32C sum while uploading a file and
381 # send it to isolated server. That way isolate server can verify that
382 # the data safely reached Google Storage (GS provides MD5 and CRC32C of
383 # stored files).
384 # TODO(maruel): Fix the server to accept properly data={} so
385 # url_read_json() can be used.
386 response = net.url_read_json(
387 url='%s/%s' % (self._base_url, push_state.finalize_url),
388 data={
389 'upload_ticket': push_state.preupload_status['upload_ticket'],
390 })
391 if not response or not response['ok']:
392 raise IOError('Failed to finalize file with hash %s.' % item.digest)
393 push_state.finalized = True
394 finally:
395 with self._lock:
396 self._memory_use -= push_state.size
397
398 def contains(self, items):
399 # Ensure all items were initialized with 'prepare' call. Storage does that.
400 assert all(i.digest is not None and i.size is not None for i in items)
401
402 # Request body is a json encoded list of dicts.
403 body = {
404 'items': [
405 {
406 'digest': item.digest,
407 'is_isolated': bool(item.high_priority),
408 'size': item.size,
409 } for item in items
410 ],
411 'namespace': self._namespace_dict,
412 }
413
414 query_url = '%s/api/isolateservice/v1/preupload' % self._base_url
415
416 # Response body is a list of push_urls (or null if file is already present).
417 response = None
418 try:
419 response = net.url_read_json(url=query_url, data=body)
420 if response is None:
421 raise isolated_format.MappingError(
422 'Failed to execute preupload query')
423 except ValueError as err:
424 raise isolated_format.MappingError(
425 'Invalid response from server: %s, body is %s' % (err, response))
426
427 # Pick Items that are missing, attach _PushState to them.
428 missing_items = {}
429 for preupload_status in response.get('items', []):
430 assert 'upload_ticket' in preupload_status, (
431 preupload_status, '/preupload did not generate an upload ticket')
432 index = int(preupload_status['index'])
433 missing_items[items[index]] = _IsolateServerPushState(
434 preupload_status, items[index].size)
435 logging.info('Queried %d files, %d cache hit',
436 len(items), len(items) - len(missing_items))
437 return missing_items
438
439 def _do_fetch(self, url, digest, offset):
440 """Fetches isolated data from the URL.
441
442 Used only for fetching files, not for API calls. Can be overridden in
443 subclasses.
444
445 Args:
446 url: URL to fetch the data from, can possibly return http redirect.
447 offset: byte offset inside the file to start fetching from.
448
449 Returns:
450 net.HttpResponse compatible object, with 'read' and 'get_header' calls.
451 """
452 assert isinstance(offset, int)
453 data = {
454 'digest': digest.encode('utf-8'),
455 'namespace': self._namespace_dict,
456 'offset': offset,
457 }
458 # TODO(maruel): url + '?' + urllib.urlencode(data) once a HTTP GET endpoint
459 # is added.
460 return net.url_read_json(
461 url=url,
462 data=data,
463 read_timeout=DOWNLOAD_READ_TIMEOUT)
464
465 def _do_push(self, push_state, content):
466 """Uploads isolated file to the URL.
467
468 Used only for storing files, not for API calls. Can be overridden in
469 subclasses.
470
471 Args:
472 url: URL to upload the data to.
473 push_state: an _IsolateServicePushState instance
474 item: the original Item to be uploaded
475 content: an iterable that yields 'str' chunks.
476 """
477 # A cheezy way to avoid memcpy of (possibly huge) file, until streaming
478 # upload support is implemented.
479 if isinstance(content, list) and len(content) == 1:
480 content = content[0]
481 else:
482 content = ''.join(content)
483
484 # DB upload
485 if not push_state.finalize_url:
486 url = '%s/%s' % (self._base_url, push_state.upload_url)
487 content = base64.b64encode(content)
488 data = {
489 'upload_ticket': push_state.preupload_status['upload_ticket'],
490 'content': content,
491 }
492 response = net.url_read_json(url=url, data=data)
493 return response is not None and response['ok']
494
495 # upload to GS
496 url = push_state.upload_url
497 response = net.url_read(
498 content_type='application/octet-stream',
499 data=content,
500 method='PUT',
501 headers={'Cache-Control': 'public, max-age=31536000'},
502 url=url)
503 return response is not None
504
505
506class _IsolateServerGrpcPushState(object):
507 """Empty class, just to present same interface as IsolateServer """
508
509 def __init__(self):
510 pass
511
512
513class IsolateServerGrpc(StorageApi):
514 """StorageApi implementation that downloads and uploads to a gRPC service.
515
Adrian Ludwinb5b05312017-09-13 07:46:24 -0400516 Limitations: does not pass on namespace to the server (uses it only for hash
517 algo and compression), and only allows zero offsets while fetching.
aludwin81178302016-11-30 17:18:49 -0800518 """
519
aludwin758d2462017-06-29 16:38:50 -0700520 def __init__(self, server, namespace, proxy):
aludwin81178302016-11-30 17:18:49 -0800521 super(IsolateServerGrpc, self).__init__()
Adrian Ludwinb5b05312017-09-13 07:46:24 -0400522 logging.info('Using gRPC for Isolate with server %s, '
523 'namespace %s, proxy %s',
524 server, namespace, proxy)
aludwin120e37e2017-06-28 13:05:58 -0700525 self._server = server
526 self._lock = threading.Lock()
527 self._memory_use = 0
528 self._num_pushes = 0
529 self._already_exists = 0
aludwind5a67882017-08-04 12:58:10 -0700530 self._proxy = grpc_proxy.Proxy(proxy, bytestream_pb2.ByteStreamStub)
aludwinef01fa32017-06-29 12:01:03 -0700531 self._namespace = namespace
532
aludwin81178302016-11-30 17:18:49 -0800533
534 @property
535 def location(self):
536 return self._server
537
538 @property
539 def namespace(self):
aludwinef01fa32017-06-29 12:01:03 -0700540 return self._namespace
541
542 @property
543 def internal_compression(self):
544 # gRPC natively compresses all messages before transmission.
545 return True
aludwin81178302016-11-30 17:18:49 -0800546
Adrian Ludwinb5b05312017-09-13 07:46:24 -0400547 def fetch(self, digest, size, offset):
aludwin81178302016-11-30 17:18:49 -0800548 # The gRPC APIs only work with an offset of 0
549 assert offset == 0
aludwin120e37e2017-06-28 13:05:58 -0700550 request = bytestream_pb2.ReadRequest()
Adrian Ludwinb5b05312017-09-13 07:46:24 -0400551 if not size:
552 size = -1
553 request.resource_name = '%s/blobs/%s/%d' % (
554 self._proxy.prefix, digest, size)
aludwinee2faf72016-12-21 10:36:03 -0800555 try:
aludwind5a67882017-08-04 12:58:10 -0700556 for response in self._proxy.get_stream('Read', request):
aludwin120e37e2017-06-28 13:05:58 -0700557 yield response.data
aludwinee2faf72016-12-21 10:36:03 -0800558 except grpc.RpcError as g:
559 logging.error('gRPC error during fetch: re-throwing as IOError (%s)' % g)
560 raise IOError(g)
aludwin81178302016-11-30 17:18:49 -0800561
562 def push(self, item, push_state, content=None):
563 assert isinstance(item, Item)
564 assert item.digest is not None
565 assert item.size is not None
566 assert isinstance(push_state, _IsolateServerGrpcPushState)
567
568 # Default to item.content().
569 content = item.content() if content is None else content
570 guard_memory_use(self, content, item.size)
aludwin120e37e2017-06-28 13:05:58 -0700571 self._num_pushes += 1
aludwin81178302016-11-30 17:18:49 -0800572
573 try:
aludwinbdcd1bb2016-12-08 08:39:53 -0800574 def chunker():
575 # Returns one bit of content at a time
aludwind1e59132016-12-14 08:22:41 -0800576 if (isinstance(content, str)
577 or not isinstance(content, collections.Iterable)):
aludwinbdcd1bb2016-12-08 08:39:53 -0800578 yield content
579 else:
580 for chunk in content:
581 yield chunk
582 def slicer():
583 # Ensures every bit of content is under the gRPC max size; yields
584 # proto messages to send via gRPC.
aludwin120e37e2017-06-28 13:05:58 -0700585 request = bytestream_pb2.WriteRequest()
586 u = uuid.uuid4()
aludwind5a67882017-08-04 12:58:10 -0700587 request.resource_name = '%s/uploads/%s/blobs/%s/%d' % (
588 self._proxy.prefix, u, item.digest, item.size)
aludwin120e37e2017-06-28 13:05:58 -0700589 request.write_offset = 0
aludwinbdcd1bb2016-12-08 08:39:53 -0800590 for chunk in chunker():
591 # Make sure we send at least one chunk for zero-length blobs
592 has_sent_anything = False
aludwind1e59132016-12-14 08:22:41 -0800593 while chunk or not has_sent_anything:
aludwinbdcd1bb2016-12-08 08:39:53 -0800594 has_sent_anything = True
aludwin120e37e2017-06-28 13:05:58 -0700595 slice_len = min(len(chunk), NET_IO_FILE_CHUNK)
596 request.data = chunk[:slice_len]
597 if request.write_offset + slice_len == item.size:
598 request.finish_write = True
599 yield request
600 request.write_offset += slice_len
aludwinbdcd1bb2016-12-08 08:39:53 -0800601 chunk = chunk[slice_len:]
602
aludwinede55f22017-07-07 11:45:33 -0700603 response = None
aludwinee2faf72016-12-21 10:36:03 -0800604 try:
aludwind5a67882017-08-04 12:58:10 -0700605 response = self._proxy.call_no_retries('Write', slicer())
aludwinede55f22017-07-07 11:45:33 -0700606 except grpc.RpcError as r:
607 if r.code() == grpc.StatusCode.ALREADY_EXISTS:
aludwin120e37e2017-06-28 13:05:58 -0700608 # This is legit - we didn't check before we pushed so no problem if
609 # it's already there.
610 self._already_exists += 1
611 if self._already_exists % 100 == 0:
612 logging.info('unnecessarily pushed %d/%d blobs (%.1f%%)' % (
613 self._already_exists, self._num_pushes,
614 100.0 * self._already_exists / self._num_pushes))
615 else:
aludwinede55f22017-07-07 11:45:33 -0700616 logging.error('gRPC error during push: throwing as IOError (%s)' % r)
617 raise IOError(r)
aludwin120e37e2017-06-28 13:05:58 -0700618 except Exception as e:
619 logging.error('error during push: throwing as IOError (%s)' % e)
620 raise IOError(e)
aludwinee2faf72016-12-21 10:36:03 -0800621
aludwinede55f22017-07-07 11:45:33 -0700622 if response is not None and response.committed_size != item.size:
aludwin120e37e2017-06-28 13:05:58 -0700623 raise IOError('%s/%d: incorrect size written (%d)' % (
624 item.digest, item.size, response.committed_size))
aludwin81178302016-11-30 17:18:49 -0800625
626 finally:
627 with self._lock:
628 self._memory_use -= item.size
629
630 def contains(self, items):
631 """Returns the set of all missing items."""
aludwin120e37e2017-06-28 13:05:58 -0700632 # TODO(aludwin): this isn't supported directly in Bytestream, so for now
633 # assume that nothing is present in the cache.
aludwin81178302016-11-30 17:18:49 -0800634 # Ensure all items were initialized with 'prepare' call. Storage does that.
635 assert all(i.digest is not None and i.size is not None for i in items)
aludwin120e37e2017-06-28 13:05:58 -0700636 # Assume all Items are missing, and attach _PushState to them. The gRPC
637 # implementation doesn't actually have a push state, we just attach empty
638 # objects to satisfy the StorageApi interface.
aludwin81178302016-11-30 17:18:49 -0800639 missing_items = {}
aludwin120e37e2017-06-28 13:05:58 -0700640 for item in items:
aludwin81178302016-11-30 17:18:49 -0800641 missing_items[item] = _IsolateServerGrpcPushState()
aludwin81178302016-11-30 17:18:49 -0800642 return missing_items
643
644
aludwin758d2462017-06-29 16:38:50 -0700645def set_grpc_proxy(proxy):
646 """Sets the StorageApi to use the specified proxy."""
647 global _grpc_proxy
648 assert _grpc_proxy is None
649 _grpc_proxy = proxy
aludwin81178302016-11-30 17:18:49 -0800650
651
652def get_storage_api(url, namespace):
653 """Returns an object that implements low-level StorageApi interface.
654
655 It is used by Storage to work with single isolate |namespace|. It should
656 rarely be used directly by clients, see 'get_storage' for
657 a better alternative.
658
659 Arguments:
660 url: URL of isolate service to use shared cloud based storage.
661 namespace: isolate namespace to operate in, also defines hashing and
662 compression scheme used, i.e. namespace names that end with '-gzip'
663 store compressed data.
664
665 Returns:
666 Instance of StorageApi subclass.
667 """
aludwin758d2462017-06-29 16:38:50 -0700668 if _grpc_proxy is not None:
669 return IsolateServerGrpc(url, namespace, _grpc_proxy)
670 return IsolateServer(url, namespace)