blob: 2d37cd8560a188930e44b573224dfd79269c617e [file] [log] [blame]
aludwin81178302016-11-30 17:18:49 -08001#!/usr/bin/env python
2# Copyright 2016 The LUCI Authors. All rights reserved.
3# Use of this source code is governed under the Apache License, Version 2.0
4# that can be found in the LICENSE file.
5
6"""A low-level blob storage/retrieval interface to the Isolate server"""
7
8import base64
aludwinbdcd1bb2016-12-08 08:39:53 -08009import collections
aludwin81178302016-11-30 17:18:49 -080010import logging
aludwin120e37e2017-06-28 13:05:58 -070011import os
aludwin81178302016-11-30 17:18:49 -080012import re
13import sys
14import threading
15import time
16import types
aludwin120e37e2017-06-28 13:05:58 -070017import uuid
aludwin81178302016-11-30 17:18:49 -080018
19from utils import file_path
20from utils import net
21
22import isolated_format
23
aludwin81178302016-11-30 17:18:49 -080024try:
aludwind5a67882017-08-04 12:58:10 -070025 import grpc # for error codes
26 from utils import grpc_proxy
aludwin120e37e2017-06-28 13:05:58 -070027 from proto import bytestream_pb2
28except ImportError as err:
aludwin81178302016-11-30 17:18:49 -080029 grpc = None
aludwind5a67882017-08-04 12:58:10 -070030 grpc_proxy = None
aludwin120e37e2017-06-28 13:05:58 -070031 bytestream_pb2 = None
aludwin81178302016-11-30 17:18:49 -080032
33
34# Chunk size to use when reading from network stream.
35NET_IO_FILE_CHUNK = 16 * 1024
36
37
38# Read timeout in seconds for downloads from isolate storage. If there's no
39# response from the server within this timeout whole download will be aborted.
40DOWNLOAD_READ_TIMEOUT = 60
41
42
aludwin758d2462017-06-29 16:38:50 -070043# Stores the gRPC proxy address. Must be set if the storage API class is
44# IsolateServerGrpc (call 'set_grpc_proxy').
45_grpc_proxy = None
aludwin81178302016-11-30 17:18:49 -080046
47
48class Item(object):
49 """An item to push to Storage.
50
51 Its digest and size may be provided in advance, if known. Otherwise they will
52 be derived from content(). If digest is provided, it MUST correspond to
53 hash algorithm used by Storage.
54
55 When used with Storage, Item starts its life in a main thread, travels
56 to 'contains' thread, then to 'push' thread and then finally back to
57 the main thread. It is never used concurrently from multiple threads.
58 """
59
60 def __init__(self, digest=None, size=None, high_priority=False):
61 self.digest = digest
62 self.size = size
63 self.high_priority = high_priority
64 self.compression_level = 6
65
66 def content(self):
67 """Iterable with content of this item as byte string (str) chunks."""
68 raise NotImplementedError()
69
70 def prepare(self, hash_algo):
71 """Ensures self.digest and self.size are set.
72
73 Uses content() as a source of data to calculate them. Does nothing if digest
74 and size is already known.
75
76 Arguments:
77 hash_algo: hash algorithm to use to calculate digest.
78 """
79 if self.digest is None or self.size is None:
80 digest = hash_algo()
81 total = 0
82 for chunk in self.content():
83 digest.update(chunk)
84 total += len(chunk)
85 self.digest = digest.hexdigest()
86 self.size = total
87
88
89class StorageApi(object):
90 """Interface for classes that implement low-level storage operations.
91
92 StorageApi is oblivious of compression and hashing scheme used. This details
93 are handled in higher level Storage class.
94
95 Clients should generally not use StorageApi directly. Storage class is
96 preferred since it implements compression and upload optimizations.
97 """
98
99 @property
100 def location(self):
101 """URL of the backing store that this class is using."""
102 raise NotImplementedError()
103
104 @property
105 def namespace(self):
106 """Isolate namespace used by this storage.
107
108 Indirectly defines hashing scheme and compression method used.
109 """
110 raise NotImplementedError()
111
aludwinef01fa32017-06-29 12:01:03 -0700112 @property
113 def internal_compression(self):
114 """True if this class doesn't require external compression.
115
116 If true, callers should not compress items, even if the namespace indicates
117 otherwise. Compression will be performed by the StorageApi class.
118 """
119 return False
120
Adrian Ludwinb5b05312017-09-13 07:46:24 -0400121 def fetch(self, digest, size, offset):
aludwin81178302016-11-30 17:18:49 -0800122 """Fetches an object and yields its content.
123
124 Arguments:
125 digest: hash digest of item to download.
Adrian Ludwinb5b05312017-09-13 07:46:24 -0400126 size: size of the item to download if known, or None otherwise.
aludwin81178302016-11-30 17:18:49 -0800127 offset: offset (in bytes) from the start of the file to resume fetch from.
128
129 Yields:
130 Chunks of downloaded item (as str objects).
131 """
132 raise NotImplementedError()
133
134 def push(self, item, push_state, content=None):
135 """Uploads an |item| with content generated by |content| generator.
136
137 |item| MUST go through 'contains' call to get |push_state| before it can
138 be pushed to the storage.
139
140 To be clear, here is one possible usage:
141 all_items = [... all items to push as Item subclasses ...]
142 for missing_item, push_state in storage_api.contains(all_items).items():
143 storage_api.push(missing_item, push_state)
144
145 When pushing to a namespace with compression, data that should be pushed
146 and data provided by the item is not the same. In that case |content| is
147 not None and it yields chunks of compressed data (using item.content() as
148 a source of original uncompressed data). This is implemented by Storage
149 class.
150
151 Arguments:
152 item: Item object that holds information about an item being pushed.
153 push_state: push state object as returned by 'contains' call.
154 content: a generator that yields chunks to push, item.content() if None.
155
156 Returns:
157 None.
158 """
159 raise NotImplementedError()
160
161 def contains(self, items):
162 """Checks for |items| on the server, prepares missing ones for upload.
163
164 Arguments:
165 items: list of Item objects to check for presence.
166
167 Returns:
168 A dict missing Item -> opaque push state object to be passed to 'push'.
169 See doc string for 'push'.
170 """
171 raise NotImplementedError()
172
173
174class _IsolateServerPushState(object):
175 """Per-item state passed from IsolateServer.contains to IsolateServer.push.
176
177 Note this needs to be a global class to support pickling.
178 """
179
180 def __init__(self, preupload_status, size):
181 self.preupload_status = preupload_status
182 gs_upload_url = preupload_status.get('gs_upload_url') or None
183 if gs_upload_url:
184 self.upload_url = gs_upload_url
185 self.finalize_url = 'api/isolateservice/v1/finalize_gs_upload'
186 else:
187 self.upload_url = 'api/isolateservice/v1/store_inline'
188 self.finalize_url = None
189 self.uploaded = False
190 self.finalized = False
191 self.size = size
192
193
194def guard_memory_use(server, content, size):
195 """Guards a server against using excessive memory while uploading.
196
197 The server needs to contain a _memory_use int and a _lock mutex
198 (both IsolateServer and IsolateServerGrpc qualify); this function
199 then uses those values to track memory usage in a thread-safe way.
200
201 If a request would cause the memory usage to exceed a safe maximum,
202 this function sleeps in 0.1s increments until memory usage falls
203 below the maximum.
204 """
205 if isinstance(content, (basestring, list)):
206 # Memory is already used, too late.
207 with server._lock:
208 server._memory_use += size
209 else:
210 # TODO(vadimsh): Do not read from |content| generator when retrying push.
211 # If |content| is indeed a generator, it can not be re-winded back to the
212 # beginning of the stream. A retry will find it exhausted. A possible
213 # solution is to wrap |content| generator with some sort of caching
214 # restartable generator. It should be done alongside streaming support
215 # implementation.
216 #
217 # In theory, we should keep the generator, so that it is not serialized in
218 # memory. Sadly net.HttpService.request() requires the body to be
219 # serialized.
220 assert isinstance(content, types.GeneratorType), repr(content)
221 slept = False
222 # HACK HACK HACK. Please forgive me for my sins but OMG, it works!
223 # One byte less than 512mb. This is to cope with incompressible content.
224 max_size = int(sys.maxsize * 0.25)
225 while True:
226 with server._lock:
227 # This is due to 32 bits python when uploading very large files. The
228 # problem is that it's comparing uncompressed sizes, while we care
229 # about compressed sizes since it's what is serialized in memory.
230 # The first check assumes large files are compressible and that by
231 # throttling one upload at once, we can survive. Otherwise, kaboom.
232 memory_use = server._memory_use
233 if ((size >= max_size and not memory_use) or
234 (memory_use + size <= max_size)):
235 server._memory_use += size
236 memory_use = server._memory_use
237 break
238 time.sleep(0.1)
239 slept = True
240 if slept:
241 logging.info('Unblocked: %d %d', memory_use, size)
242
243
244class IsolateServer(StorageApi):
245 """StorageApi implementation that downloads and uploads to Isolate Server.
246
247 It uploads and downloads directly from Google Storage whenever appropriate.
248 Works only within single namespace.
249 """
250
251 def __init__(self, base_url, namespace):
252 super(IsolateServer, self).__init__()
253 assert file_path.is_url(base_url), base_url
254 self._base_url = base_url.rstrip('/')
255 self._namespace = namespace
Adrian Ludwinb5b05312017-09-13 07:46:24 -0400256 algo = isolated_format.get_hash_algo(namespace)
aludwin81178302016-11-30 17:18:49 -0800257 self._namespace_dict = {
258 'compression': 'flate' if namespace.endswith(
259 ('-gzip', '-flate')) else '',
Adrian Ludwinb5b05312017-09-13 07:46:24 -0400260 'digest_hash': isolated_format.SUPPORTED_ALGOS_REVERSE[algo],
aludwin81178302016-11-30 17:18:49 -0800261 'namespace': namespace,
262 }
263 self._lock = threading.Lock()
264 self._server_caps = None
265 self._memory_use = 0
266
267 @property
268 def _server_capabilities(self):
269 """Gets server details.
270
271 Returns:
272 Server capabilities dictionary as returned by /server_details endpoint.
273 """
274 # TODO(maruel): Make this request much earlier asynchronously while the
275 # files are being enumerated.
276
277 # TODO(vadimsh): Put |namespace| in the URL so that server can apply
278 # namespace-level ACLs to this call.
279
280 with self._lock:
281 if self._server_caps is None:
282 self._server_caps = net.url_read_json(
283 url='%s/api/isolateservice/v1/server_details' % self._base_url,
284 data={})
285 return self._server_caps
286
287 @property
288 def location(self):
289 return self._base_url
290
291 @property
292 def namespace(self):
293 return self._namespace
294
Adrian Ludwinb5b05312017-09-13 07:46:24 -0400295 def fetch(self, digest, _size, offset):
aludwin81178302016-11-30 17:18:49 -0800296 assert offset >= 0
297 source_url = '%s/api/isolateservice/v1/retrieve' % (
298 self._base_url)
299 logging.debug('download_file(%s, %d)', source_url, offset)
300 response = self._do_fetch(source_url, digest, offset)
301
302 if not response:
303 raise IOError(
304 'Attempted to fetch from %s; no data exist: %s / %s.' % (
305 source_url, self._namespace, digest))
306
307 # for DB uploads
308 content = response.get('content')
309 if content is not None:
310 yield base64.b64decode(content)
311 return
312
313 # for GS entities
314 connection = net.url_open(response['url'])
315 if not connection:
316 raise IOError('Failed to download %s / %s' % (self._namespace, digest))
317
318 # If |offset|, verify server respects it by checking Content-Range.
319 if offset:
320 content_range = connection.get_header('Content-Range')
321 if not content_range:
322 raise IOError('Missing Content-Range header')
323
324 # 'Content-Range' format is 'bytes <offset>-<last_byte_index>/<size>'.
325 # According to a spec, <size> can be '*' meaning "Total size of the file
326 # is not known in advance".
327 try:
328 match = re.match(r'bytes (\d+)-(\d+)/(\d+|\*)', content_range)
329 if not match:
330 raise ValueError()
331 content_offset = int(match.group(1))
332 last_byte_index = int(match.group(2))
333 size = None if match.group(3) == '*' else int(match.group(3))
334 except ValueError:
335 raise IOError('Invalid Content-Range header: %s' % content_range)
336
337 # Ensure returned offset equals requested one.
338 if offset != content_offset:
339 raise IOError('Expecting offset %d, got %d (Content-Range is %s)' % (
340 offset, content_offset, content_range))
341
342 # Ensure entire tail of the file is returned.
343 if size is not None and last_byte_index + 1 != size:
344 raise IOError('Incomplete response. Content-Range: %s' % content_range)
345
346 for data in connection.iter_content(NET_IO_FILE_CHUNK):
347 yield data
348
349 def push(self, item, push_state, content=None):
350 assert isinstance(item, Item)
351 assert item.digest is not None
352 assert item.size is not None
353 assert isinstance(push_state, _IsolateServerPushState)
354 assert not push_state.finalized
355
356 # Default to item.content().
357 content = item.content() if content is None else content
358 logging.info('Push state size: %d', push_state.size)
359 guard_memory_use(self, content, push_state.size)
360
361 try:
362 # This push operation may be a retry after failed finalization call below,
363 # no need to reupload contents in that case.
364 if not push_state.uploaded:
365 # PUT file to |upload_url|.
366 success = self._do_push(push_state, content)
367 if not success:
368 raise IOError('Failed to upload file with hash %s to URL %s' % (
369 item.digest, push_state.upload_url))
370 push_state.uploaded = True
371 else:
372 logging.info(
373 'A file %s already uploaded, retrying finalization only',
374 item.digest)
375
376 # Optionally notify the server that it's done.
377 if push_state.finalize_url:
378 # TODO(vadimsh): Calculate MD5 or CRC32C sum while uploading a file and
379 # send it to isolated server. That way isolate server can verify that
380 # the data safely reached Google Storage (GS provides MD5 and CRC32C of
381 # stored files).
382 # TODO(maruel): Fix the server to accept properly data={} so
383 # url_read_json() can be used.
384 response = net.url_read_json(
385 url='%s/%s' % (self._base_url, push_state.finalize_url),
386 data={
387 'upload_ticket': push_state.preupload_status['upload_ticket'],
388 })
389 if not response or not response['ok']:
390 raise IOError('Failed to finalize file with hash %s.' % item.digest)
391 push_state.finalized = True
392 finally:
393 with self._lock:
394 self._memory_use -= push_state.size
395
396 def contains(self, items):
397 # Ensure all items were initialized with 'prepare' call. Storage does that.
398 assert all(i.digest is not None and i.size is not None for i in items)
399
400 # Request body is a json encoded list of dicts.
401 body = {
402 'items': [
403 {
404 'digest': item.digest,
405 'is_isolated': bool(item.high_priority),
406 'size': item.size,
407 } for item in items
408 ],
409 'namespace': self._namespace_dict,
410 }
411
412 query_url = '%s/api/isolateservice/v1/preupload' % self._base_url
413
414 # Response body is a list of push_urls (or null if file is already present).
415 response = None
416 try:
417 response = net.url_read_json(url=query_url, data=body)
418 if response is None:
419 raise isolated_format.MappingError(
420 'Failed to execute preupload query')
421 except ValueError as err:
422 raise isolated_format.MappingError(
423 'Invalid response from server: %s, body is %s' % (err, response))
424
425 # Pick Items that are missing, attach _PushState to them.
426 missing_items = {}
427 for preupload_status in response.get('items', []):
428 assert 'upload_ticket' in preupload_status, (
429 preupload_status, '/preupload did not generate an upload ticket')
430 index = int(preupload_status['index'])
431 missing_items[items[index]] = _IsolateServerPushState(
432 preupload_status, items[index].size)
433 logging.info('Queried %d files, %d cache hit',
434 len(items), len(items) - len(missing_items))
435 return missing_items
436
437 def _do_fetch(self, url, digest, offset):
438 """Fetches isolated data from the URL.
439
440 Used only for fetching files, not for API calls. Can be overridden in
441 subclasses.
442
443 Args:
444 url: URL to fetch the data from, can possibly return http redirect.
445 offset: byte offset inside the file to start fetching from.
446
447 Returns:
448 net.HttpResponse compatible object, with 'read' and 'get_header' calls.
449 """
450 assert isinstance(offset, int)
451 data = {
452 'digest': digest.encode('utf-8'),
453 'namespace': self._namespace_dict,
454 'offset': offset,
455 }
456 # TODO(maruel): url + '?' + urllib.urlencode(data) once a HTTP GET endpoint
457 # is added.
458 return net.url_read_json(
459 url=url,
460 data=data,
461 read_timeout=DOWNLOAD_READ_TIMEOUT)
462
463 def _do_push(self, push_state, content):
464 """Uploads isolated file to the URL.
465
466 Used only for storing files, not for API calls. Can be overridden in
467 subclasses.
468
469 Args:
470 url: URL to upload the data to.
471 push_state: an _IsolateServicePushState instance
472 item: the original Item to be uploaded
473 content: an iterable that yields 'str' chunks.
474 """
475 # A cheezy way to avoid memcpy of (possibly huge) file, until streaming
476 # upload support is implemented.
477 if isinstance(content, list) and len(content) == 1:
478 content = content[0]
479 else:
480 content = ''.join(content)
481
482 # DB upload
483 if not push_state.finalize_url:
484 url = '%s/%s' % (self._base_url, push_state.upload_url)
485 content = base64.b64encode(content)
486 data = {
487 'upload_ticket': push_state.preupload_status['upload_ticket'],
488 'content': content,
489 }
490 response = net.url_read_json(url=url, data=data)
491 return response is not None and response['ok']
492
493 # upload to GS
494 url = push_state.upload_url
495 response = net.url_read(
496 content_type='application/octet-stream',
497 data=content,
498 method='PUT',
499 headers={'Cache-Control': 'public, max-age=31536000'},
500 url=url)
501 return response is not None
502
503
504class _IsolateServerGrpcPushState(object):
505 """Empty class, just to present same interface as IsolateServer """
506
507 def __init__(self):
508 pass
509
510
511class IsolateServerGrpc(StorageApi):
512 """StorageApi implementation that downloads and uploads to a gRPC service.
513
Adrian Ludwinb5b05312017-09-13 07:46:24 -0400514 Limitations: does not pass on namespace to the server (uses it only for hash
515 algo and compression), and only allows zero offsets while fetching.
aludwin81178302016-11-30 17:18:49 -0800516 """
517
aludwin758d2462017-06-29 16:38:50 -0700518 def __init__(self, server, namespace, proxy):
aludwin81178302016-11-30 17:18:49 -0800519 super(IsolateServerGrpc, self).__init__()
Adrian Ludwinb5b05312017-09-13 07:46:24 -0400520 logging.info('Using gRPC for Isolate with server %s, '
521 'namespace %s, proxy %s',
522 server, namespace, proxy)
aludwin120e37e2017-06-28 13:05:58 -0700523 self._server = server
524 self._lock = threading.Lock()
525 self._memory_use = 0
526 self._num_pushes = 0
527 self._already_exists = 0
aludwind5a67882017-08-04 12:58:10 -0700528 self._proxy = grpc_proxy.Proxy(proxy, bytestream_pb2.ByteStreamStub)
aludwinef01fa32017-06-29 12:01:03 -0700529 self._namespace = namespace
530
aludwin81178302016-11-30 17:18:49 -0800531
532 @property
533 def location(self):
534 return self._server
535
536 @property
537 def namespace(self):
aludwinef01fa32017-06-29 12:01:03 -0700538 return self._namespace
539
540 @property
541 def internal_compression(self):
542 # gRPC natively compresses all messages before transmission.
543 return True
aludwin81178302016-11-30 17:18:49 -0800544
Adrian Ludwinb5b05312017-09-13 07:46:24 -0400545 def fetch(self, digest, size, offset):
aludwin81178302016-11-30 17:18:49 -0800546 # The gRPC APIs only work with an offset of 0
547 assert offset == 0
aludwin120e37e2017-06-28 13:05:58 -0700548 request = bytestream_pb2.ReadRequest()
Adrian Ludwinb5b05312017-09-13 07:46:24 -0400549 if not size:
550 size = -1
551 request.resource_name = '%s/blobs/%s/%d' % (
552 self._proxy.prefix, digest, size)
aludwinee2faf72016-12-21 10:36:03 -0800553 try:
aludwind5a67882017-08-04 12:58:10 -0700554 for response in self._proxy.get_stream('Read', request):
aludwin120e37e2017-06-28 13:05:58 -0700555 yield response.data
aludwinee2faf72016-12-21 10:36:03 -0800556 except grpc.RpcError as g:
557 logging.error('gRPC error during fetch: re-throwing as IOError (%s)' % g)
558 raise IOError(g)
aludwin81178302016-11-30 17:18:49 -0800559
560 def push(self, item, push_state, content=None):
561 assert isinstance(item, Item)
562 assert item.digest is not None
563 assert item.size is not None
564 assert isinstance(push_state, _IsolateServerGrpcPushState)
565
566 # Default to item.content().
567 content = item.content() if content is None else content
568 guard_memory_use(self, content, item.size)
aludwin120e37e2017-06-28 13:05:58 -0700569 self._num_pushes += 1
aludwin81178302016-11-30 17:18:49 -0800570
571 try:
aludwinbdcd1bb2016-12-08 08:39:53 -0800572 def chunker():
573 # Returns one bit of content at a time
aludwind1e59132016-12-14 08:22:41 -0800574 if (isinstance(content, str)
575 or not isinstance(content, collections.Iterable)):
aludwinbdcd1bb2016-12-08 08:39:53 -0800576 yield content
577 else:
578 for chunk in content:
579 yield chunk
580 def slicer():
581 # Ensures every bit of content is under the gRPC max size; yields
582 # proto messages to send via gRPC.
aludwin120e37e2017-06-28 13:05:58 -0700583 request = bytestream_pb2.WriteRequest()
584 u = uuid.uuid4()
aludwind5a67882017-08-04 12:58:10 -0700585 request.resource_name = '%s/uploads/%s/blobs/%s/%d' % (
586 self._proxy.prefix, u, item.digest, item.size)
aludwin120e37e2017-06-28 13:05:58 -0700587 request.write_offset = 0
aludwinbdcd1bb2016-12-08 08:39:53 -0800588 for chunk in chunker():
589 # Make sure we send at least one chunk for zero-length blobs
590 has_sent_anything = False
aludwind1e59132016-12-14 08:22:41 -0800591 while chunk or not has_sent_anything:
aludwinbdcd1bb2016-12-08 08:39:53 -0800592 has_sent_anything = True
aludwin120e37e2017-06-28 13:05:58 -0700593 slice_len = min(len(chunk), NET_IO_FILE_CHUNK)
594 request.data = chunk[:slice_len]
595 if request.write_offset + slice_len == item.size:
596 request.finish_write = True
597 yield request
598 request.write_offset += slice_len
aludwinbdcd1bb2016-12-08 08:39:53 -0800599 chunk = chunk[slice_len:]
600
aludwinede55f22017-07-07 11:45:33 -0700601 response = None
aludwinee2faf72016-12-21 10:36:03 -0800602 try:
aludwind5a67882017-08-04 12:58:10 -0700603 response = self._proxy.call_no_retries('Write', slicer())
aludwinede55f22017-07-07 11:45:33 -0700604 except grpc.RpcError as r:
605 if r.code() == grpc.StatusCode.ALREADY_EXISTS:
aludwin120e37e2017-06-28 13:05:58 -0700606 # This is legit - we didn't check before we pushed so no problem if
607 # it's already there.
608 self._already_exists += 1
609 if self._already_exists % 100 == 0:
610 logging.info('unnecessarily pushed %d/%d blobs (%.1f%%)' % (
611 self._already_exists, self._num_pushes,
612 100.0 * self._already_exists / self._num_pushes))
613 else:
aludwinede55f22017-07-07 11:45:33 -0700614 logging.error('gRPC error during push: throwing as IOError (%s)' % r)
615 raise IOError(r)
aludwin120e37e2017-06-28 13:05:58 -0700616 except Exception as e:
617 logging.error('error during push: throwing as IOError (%s)' % e)
618 raise IOError(e)
aludwinee2faf72016-12-21 10:36:03 -0800619
aludwinede55f22017-07-07 11:45:33 -0700620 if response is not None and response.committed_size != item.size:
aludwin120e37e2017-06-28 13:05:58 -0700621 raise IOError('%s/%d: incorrect size written (%d)' % (
622 item.digest, item.size, response.committed_size))
aludwin81178302016-11-30 17:18:49 -0800623
624 finally:
625 with self._lock:
626 self._memory_use -= item.size
627
628 def contains(self, items):
629 """Returns the set of all missing items."""
aludwin120e37e2017-06-28 13:05:58 -0700630 # TODO(aludwin): this isn't supported directly in Bytestream, so for now
631 # assume that nothing is present in the cache.
aludwin81178302016-11-30 17:18:49 -0800632 # Ensure all items were initialized with 'prepare' call. Storage does that.
633 assert all(i.digest is not None and i.size is not None for i in items)
aludwin120e37e2017-06-28 13:05:58 -0700634 # Assume all Items are missing, and attach _PushState to them. The gRPC
635 # implementation doesn't actually have a push state, we just attach empty
636 # objects to satisfy the StorageApi interface.
aludwin81178302016-11-30 17:18:49 -0800637 missing_items = {}
aludwin120e37e2017-06-28 13:05:58 -0700638 for item in items:
aludwin81178302016-11-30 17:18:49 -0800639 missing_items[item] = _IsolateServerGrpcPushState()
aludwin81178302016-11-30 17:18:49 -0800640 return missing_items
641
642
aludwin758d2462017-06-29 16:38:50 -0700643def set_grpc_proxy(proxy):
644 """Sets the StorageApi to use the specified proxy."""
645 global _grpc_proxy
646 assert _grpc_proxy is None
647 _grpc_proxy = proxy
aludwin81178302016-11-30 17:18:49 -0800648
649
650def get_storage_api(url, namespace):
651 """Returns an object that implements low-level StorageApi interface.
652
653 It is used by Storage to work with single isolate |namespace|. It should
654 rarely be used directly by clients, see 'get_storage' for
655 a better alternative.
656
657 Arguments:
658 url: URL of isolate service to use shared cloud based storage.
659 namespace: isolate namespace to operate in, also defines hashing and
660 compression scheme used, i.e. namespace names that end with '-gzip'
661 store compressed data.
662
663 Returns:
664 Instance of StorageApi subclass.
665 """
aludwin758d2462017-06-29 16:38:50 -0700666 if _grpc_proxy is not None:
667 return IsolateServerGrpc(url, namespace, _grpc_proxy)
668 return IsolateServer(url, namespace)