blob: ef0d79eae0d7a0899af1ba19600363976f7b0493 [file] [log] [blame]
aludwin81178302016-11-30 17:18:49 -08001#!/usr/bin/env python
2# Copyright 2016 The LUCI Authors. All rights reserved.
3# Use of this source code is governed under the Apache License, Version 2.0
4# that can be found in the LICENSE file.
5
6"""A low-level blob storage/retrieval interface to the Isolate server"""
7
8import base64
aludwinbdcd1bb2016-12-08 08:39:53 -08009import collections
aludwin81178302016-11-30 17:18:49 -080010import logging
aludwin120e37e2017-06-28 13:05:58 -070011import os
aludwin81178302016-11-30 17:18:49 -080012import re
13import sys
14import threading
15import time
16import types
aludwin120e37e2017-06-28 13:05:58 -070017import uuid
aludwin81178302016-11-30 17:18:49 -080018
19from utils import file_path
20from utils import net
21
22import isolated_format
23
aludwin81178302016-11-30 17:18:49 -080024try:
aludwind5a67882017-08-04 12:58:10 -070025 import grpc # for error codes
26 from utils import grpc_proxy
aludwin120e37e2017-06-28 13:05:58 -070027 from proto import bytestream_pb2
28except ImportError as err:
aludwin81178302016-11-30 17:18:49 -080029 grpc = None
aludwind5a67882017-08-04 12:58:10 -070030 grpc_proxy = None
aludwin120e37e2017-06-28 13:05:58 -070031 bytestream_pb2 = None
aludwin81178302016-11-30 17:18:49 -080032
33
34# Chunk size to use when reading from network stream.
35NET_IO_FILE_CHUNK = 16 * 1024
36
37
38# Read timeout in seconds for downloads from isolate storage. If there's no
39# response from the server within this timeout whole download will be aborted.
40DOWNLOAD_READ_TIMEOUT = 60
41
42
aludwin758d2462017-06-29 16:38:50 -070043# Stores the gRPC proxy address. Must be set if the storage API class is
44# IsolateServerGrpc (call 'set_grpc_proxy').
45_grpc_proxy = None
aludwin81178302016-11-30 17:18:49 -080046
47
48class Item(object):
49 """An item to push to Storage.
50
51 Its digest and size may be provided in advance, if known. Otherwise they will
52 be derived from content(). If digest is provided, it MUST correspond to
53 hash algorithm used by Storage.
54
55 When used with Storage, Item starts its life in a main thread, travels
56 to 'contains' thread, then to 'push' thread and then finally back to
57 the main thread. It is never used concurrently from multiple threads.
58 """
59
60 def __init__(self, digest=None, size=None, high_priority=False):
61 self.digest = digest
62 self.size = size
63 self.high_priority = high_priority
64 self.compression_level = 6
65
66 def content(self):
67 """Iterable with content of this item as byte string (str) chunks."""
68 raise NotImplementedError()
69
70 def prepare(self, hash_algo):
71 """Ensures self.digest and self.size are set.
72
73 Uses content() as a source of data to calculate them. Does nothing if digest
74 and size is already known.
75
76 Arguments:
77 hash_algo: hash algorithm to use to calculate digest.
78 """
79 if self.digest is None or self.size is None:
80 digest = hash_algo()
81 total = 0
82 for chunk in self.content():
83 digest.update(chunk)
84 total += len(chunk)
85 self.digest = digest.hexdigest()
86 self.size = total
87
88
89class StorageApi(object):
90 """Interface for classes that implement low-level storage operations.
91
92 StorageApi is oblivious of compression and hashing scheme used. This details
93 are handled in higher level Storage class.
94
95 Clients should generally not use StorageApi directly. Storage class is
96 preferred since it implements compression and upload optimizations.
97 """
98
99 @property
100 def location(self):
101 """URL of the backing store that this class is using."""
102 raise NotImplementedError()
103
104 @property
105 def namespace(self):
106 """Isolate namespace used by this storage.
107
108 Indirectly defines hashing scheme and compression method used.
109 """
110 raise NotImplementedError()
111
aludwinef01fa32017-06-29 12:01:03 -0700112 @property
113 def internal_compression(self):
114 """True if this class doesn't require external compression.
115
116 If true, callers should not compress items, even if the namespace indicates
117 otherwise. Compression will be performed by the StorageApi class.
118 """
119 return False
120
aludwin81178302016-11-30 17:18:49 -0800121 def fetch(self, digest, offset=0):
122 """Fetches an object and yields its content.
123
124 Arguments:
125 digest: hash digest of item to download.
126 offset: offset (in bytes) from the start of the file to resume fetch from.
127
128 Yields:
129 Chunks of downloaded item (as str objects).
130 """
131 raise NotImplementedError()
132
133 def push(self, item, push_state, content=None):
134 """Uploads an |item| with content generated by |content| generator.
135
136 |item| MUST go through 'contains' call to get |push_state| before it can
137 be pushed to the storage.
138
139 To be clear, here is one possible usage:
140 all_items = [... all items to push as Item subclasses ...]
141 for missing_item, push_state in storage_api.contains(all_items).items():
142 storage_api.push(missing_item, push_state)
143
144 When pushing to a namespace with compression, data that should be pushed
145 and data provided by the item is not the same. In that case |content| is
146 not None and it yields chunks of compressed data (using item.content() as
147 a source of original uncompressed data). This is implemented by Storage
148 class.
149
150 Arguments:
151 item: Item object that holds information about an item being pushed.
152 push_state: push state object as returned by 'contains' call.
153 content: a generator that yields chunks to push, item.content() if None.
154
155 Returns:
156 None.
157 """
158 raise NotImplementedError()
159
160 def contains(self, items):
161 """Checks for |items| on the server, prepares missing ones for upload.
162
163 Arguments:
164 items: list of Item objects to check for presence.
165
166 Returns:
167 A dict missing Item -> opaque push state object to be passed to 'push'.
168 See doc string for 'push'.
169 """
170 raise NotImplementedError()
171
172
173class _IsolateServerPushState(object):
174 """Per-item state passed from IsolateServer.contains to IsolateServer.push.
175
176 Note this needs to be a global class to support pickling.
177 """
178
179 def __init__(self, preupload_status, size):
180 self.preupload_status = preupload_status
181 gs_upload_url = preupload_status.get('gs_upload_url') or None
182 if gs_upload_url:
183 self.upload_url = gs_upload_url
184 self.finalize_url = 'api/isolateservice/v1/finalize_gs_upload'
185 else:
186 self.upload_url = 'api/isolateservice/v1/store_inline'
187 self.finalize_url = None
188 self.uploaded = False
189 self.finalized = False
190 self.size = size
191
192
193def guard_memory_use(server, content, size):
194 """Guards a server against using excessive memory while uploading.
195
196 The server needs to contain a _memory_use int and a _lock mutex
197 (both IsolateServer and IsolateServerGrpc qualify); this function
198 then uses those values to track memory usage in a thread-safe way.
199
200 If a request would cause the memory usage to exceed a safe maximum,
201 this function sleeps in 0.1s increments until memory usage falls
202 below the maximum.
203 """
204 if isinstance(content, (basestring, list)):
205 # Memory is already used, too late.
206 with server._lock:
207 server._memory_use += size
208 else:
209 # TODO(vadimsh): Do not read from |content| generator when retrying push.
210 # If |content| is indeed a generator, it can not be re-winded back to the
211 # beginning of the stream. A retry will find it exhausted. A possible
212 # solution is to wrap |content| generator with some sort of caching
213 # restartable generator. It should be done alongside streaming support
214 # implementation.
215 #
216 # In theory, we should keep the generator, so that it is not serialized in
217 # memory. Sadly net.HttpService.request() requires the body to be
218 # serialized.
219 assert isinstance(content, types.GeneratorType), repr(content)
220 slept = False
221 # HACK HACK HACK. Please forgive me for my sins but OMG, it works!
222 # One byte less than 512mb. This is to cope with incompressible content.
223 max_size = int(sys.maxsize * 0.25)
224 while True:
225 with server._lock:
226 # This is due to 32 bits python when uploading very large files. The
227 # problem is that it's comparing uncompressed sizes, while we care
228 # about compressed sizes since it's what is serialized in memory.
229 # The first check assumes large files are compressible and that by
230 # throttling one upload at once, we can survive. Otherwise, kaboom.
231 memory_use = server._memory_use
232 if ((size >= max_size and not memory_use) or
233 (memory_use + size <= max_size)):
234 server._memory_use += size
235 memory_use = server._memory_use
236 break
237 time.sleep(0.1)
238 slept = True
239 if slept:
240 logging.info('Unblocked: %d %d', memory_use, size)
241
242
243class IsolateServer(StorageApi):
244 """StorageApi implementation that downloads and uploads to Isolate Server.
245
246 It uploads and downloads directly from Google Storage whenever appropriate.
247 Works only within single namespace.
248 """
249
250 def __init__(self, base_url, namespace):
251 super(IsolateServer, self).__init__()
252 assert file_path.is_url(base_url), base_url
253 self._base_url = base_url.rstrip('/')
254 self._namespace = namespace
255 self._namespace_dict = {
256 'compression': 'flate' if namespace.endswith(
257 ('-gzip', '-flate')) else '',
258 'digest_hash': 'sha-1',
259 'namespace': namespace,
260 }
261 self._lock = threading.Lock()
262 self._server_caps = None
263 self._memory_use = 0
264
265 @property
266 def _server_capabilities(self):
267 """Gets server details.
268
269 Returns:
270 Server capabilities dictionary as returned by /server_details endpoint.
271 """
272 # TODO(maruel): Make this request much earlier asynchronously while the
273 # files are being enumerated.
274
275 # TODO(vadimsh): Put |namespace| in the URL so that server can apply
276 # namespace-level ACLs to this call.
277
278 with self._lock:
279 if self._server_caps is None:
280 self._server_caps = net.url_read_json(
281 url='%s/api/isolateservice/v1/server_details' % self._base_url,
282 data={})
283 return self._server_caps
284
285 @property
286 def location(self):
287 return self._base_url
288
289 @property
290 def namespace(self):
291 return self._namespace
292
293 def fetch(self, digest, offset=0):
294 assert offset >= 0
295 source_url = '%s/api/isolateservice/v1/retrieve' % (
296 self._base_url)
297 logging.debug('download_file(%s, %d)', source_url, offset)
298 response = self._do_fetch(source_url, digest, offset)
299
300 if not response:
301 raise IOError(
302 'Attempted to fetch from %s; no data exist: %s / %s.' % (
303 source_url, self._namespace, digest))
304
305 # for DB uploads
306 content = response.get('content')
307 if content is not None:
308 yield base64.b64decode(content)
309 return
310
311 # for GS entities
312 connection = net.url_open(response['url'])
313 if not connection:
314 raise IOError('Failed to download %s / %s' % (self._namespace, digest))
315
316 # If |offset|, verify server respects it by checking Content-Range.
317 if offset:
318 content_range = connection.get_header('Content-Range')
319 if not content_range:
320 raise IOError('Missing Content-Range header')
321
322 # 'Content-Range' format is 'bytes <offset>-<last_byte_index>/<size>'.
323 # According to a spec, <size> can be '*' meaning "Total size of the file
324 # is not known in advance".
325 try:
326 match = re.match(r'bytes (\d+)-(\d+)/(\d+|\*)', content_range)
327 if not match:
328 raise ValueError()
329 content_offset = int(match.group(1))
330 last_byte_index = int(match.group(2))
331 size = None if match.group(3) == '*' else int(match.group(3))
332 except ValueError:
333 raise IOError('Invalid Content-Range header: %s' % content_range)
334
335 # Ensure returned offset equals requested one.
336 if offset != content_offset:
337 raise IOError('Expecting offset %d, got %d (Content-Range is %s)' % (
338 offset, content_offset, content_range))
339
340 # Ensure entire tail of the file is returned.
341 if size is not None and last_byte_index + 1 != size:
342 raise IOError('Incomplete response. Content-Range: %s' % content_range)
343
344 for data in connection.iter_content(NET_IO_FILE_CHUNK):
345 yield data
346
347 def push(self, item, push_state, content=None):
348 assert isinstance(item, Item)
349 assert item.digest is not None
350 assert item.size is not None
351 assert isinstance(push_state, _IsolateServerPushState)
352 assert not push_state.finalized
353
354 # Default to item.content().
355 content = item.content() if content is None else content
356 logging.info('Push state size: %d', push_state.size)
357 guard_memory_use(self, content, push_state.size)
358
359 try:
360 # This push operation may be a retry after failed finalization call below,
361 # no need to reupload contents in that case.
362 if not push_state.uploaded:
363 # PUT file to |upload_url|.
364 success = self._do_push(push_state, content)
365 if not success:
366 raise IOError('Failed to upload file with hash %s to URL %s' % (
367 item.digest, push_state.upload_url))
368 push_state.uploaded = True
369 else:
370 logging.info(
371 'A file %s already uploaded, retrying finalization only',
372 item.digest)
373
374 # Optionally notify the server that it's done.
375 if push_state.finalize_url:
376 # TODO(vadimsh): Calculate MD5 or CRC32C sum while uploading a file and
377 # send it to isolated server. That way isolate server can verify that
378 # the data safely reached Google Storage (GS provides MD5 and CRC32C of
379 # stored files).
380 # TODO(maruel): Fix the server to accept properly data={} so
381 # url_read_json() can be used.
382 response = net.url_read_json(
383 url='%s/%s' % (self._base_url, push_state.finalize_url),
384 data={
385 'upload_ticket': push_state.preupload_status['upload_ticket'],
386 })
387 if not response or not response['ok']:
388 raise IOError('Failed to finalize file with hash %s.' % item.digest)
389 push_state.finalized = True
390 finally:
391 with self._lock:
392 self._memory_use -= push_state.size
393
394 def contains(self, items):
395 # Ensure all items were initialized with 'prepare' call. Storage does that.
396 assert all(i.digest is not None and i.size is not None for i in items)
397
398 # Request body is a json encoded list of dicts.
399 body = {
400 'items': [
401 {
402 'digest': item.digest,
403 'is_isolated': bool(item.high_priority),
404 'size': item.size,
405 } for item in items
406 ],
407 'namespace': self._namespace_dict,
408 }
409
410 query_url = '%s/api/isolateservice/v1/preupload' % self._base_url
411
412 # Response body is a list of push_urls (or null if file is already present).
413 response = None
414 try:
415 response = net.url_read_json(url=query_url, data=body)
416 if response is None:
417 raise isolated_format.MappingError(
418 'Failed to execute preupload query')
419 except ValueError as err:
420 raise isolated_format.MappingError(
421 'Invalid response from server: %s, body is %s' % (err, response))
422
423 # Pick Items that are missing, attach _PushState to them.
424 missing_items = {}
425 for preupload_status in response.get('items', []):
426 assert 'upload_ticket' in preupload_status, (
427 preupload_status, '/preupload did not generate an upload ticket')
428 index = int(preupload_status['index'])
429 missing_items[items[index]] = _IsolateServerPushState(
430 preupload_status, items[index].size)
431 logging.info('Queried %d files, %d cache hit',
432 len(items), len(items) - len(missing_items))
433 return missing_items
434
435 def _do_fetch(self, url, digest, offset):
436 """Fetches isolated data from the URL.
437
438 Used only for fetching files, not for API calls. Can be overridden in
439 subclasses.
440
441 Args:
442 url: URL to fetch the data from, can possibly return http redirect.
443 offset: byte offset inside the file to start fetching from.
444
445 Returns:
446 net.HttpResponse compatible object, with 'read' and 'get_header' calls.
447 """
448 assert isinstance(offset, int)
449 data = {
450 'digest': digest.encode('utf-8'),
451 'namespace': self._namespace_dict,
452 'offset': offset,
453 }
454 # TODO(maruel): url + '?' + urllib.urlencode(data) once a HTTP GET endpoint
455 # is added.
456 return net.url_read_json(
457 url=url,
458 data=data,
459 read_timeout=DOWNLOAD_READ_TIMEOUT)
460
461 def _do_push(self, push_state, content):
462 """Uploads isolated file to the URL.
463
464 Used only for storing files, not for API calls. Can be overridden in
465 subclasses.
466
467 Args:
468 url: URL to upload the data to.
469 push_state: an _IsolateServicePushState instance
470 item: the original Item to be uploaded
471 content: an iterable that yields 'str' chunks.
472 """
473 # A cheezy way to avoid memcpy of (possibly huge) file, until streaming
474 # upload support is implemented.
475 if isinstance(content, list) and len(content) == 1:
476 content = content[0]
477 else:
478 content = ''.join(content)
479
480 # DB upload
481 if not push_state.finalize_url:
482 url = '%s/%s' % (self._base_url, push_state.upload_url)
483 content = base64.b64encode(content)
484 data = {
485 'upload_ticket': push_state.preupload_status['upload_ticket'],
486 'content': content,
487 }
488 response = net.url_read_json(url=url, data=data)
489 return response is not None and response['ok']
490
491 # upload to GS
492 url = push_state.upload_url
493 response = net.url_read(
494 content_type='application/octet-stream',
495 data=content,
496 method='PUT',
497 headers={'Cache-Control': 'public, max-age=31536000'},
498 url=url)
499 return response is not None
500
501
502class _IsolateServerGrpcPushState(object):
503 """Empty class, just to present same interface as IsolateServer """
504
505 def __init__(self):
506 pass
507
508
509class IsolateServerGrpc(StorageApi):
510 """StorageApi implementation that downloads and uploads to a gRPC service.
511
512 Limitations: only works for the default-gzip namespace, and with zero offsets
513 while fetching.
514 """
515
aludwin758d2462017-06-29 16:38:50 -0700516 def __init__(self, server, namespace, proxy):
aludwin81178302016-11-30 17:18:49 -0800517 super(IsolateServerGrpc, self).__init__()
518 logging.info('Using gRPC for Isolate')
aludwind5a67882017-08-04 12:58:10 -0700519 # Proxies only support the default-gzip namespace for now.
520 # TODO(aludwin): support other namespaces if necessary
521 assert namespace == 'default-gzip'
aludwin120e37e2017-06-28 13:05:58 -0700522 self._server = server
523 self._lock = threading.Lock()
524 self._memory_use = 0
525 self._num_pushes = 0
526 self._already_exists = 0
aludwind5a67882017-08-04 12:58:10 -0700527 self._proxy = grpc_proxy.Proxy(proxy, bytestream_pb2.ByteStreamStub)
aludwinef01fa32017-06-29 12:01:03 -0700528 self._namespace = namespace
529
aludwin81178302016-11-30 17:18:49 -0800530
531 @property
532 def location(self):
533 return self._server
534
535 @property
536 def namespace(self):
aludwinef01fa32017-06-29 12:01:03 -0700537 return self._namespace
538
539 @property
540 def internal_compression(self):
541 # gRPC natively compresses all messages before transmission.
542 return True
aludwin81178302016-11-30 17:18:49 -0800543
544 def fetch(self, digest, offset=0):
545 # The gRPC APIs only work with an offset of 0
546 assert offset == 0
aludwin120e37e2017-06-28 13:05:58 -0700547 request = bytestream_pb2.ReadRequest()
548 #TODO(aludwin): send the expected size of the item
aludwind5a67882017-08-04 12:58:10 -0700549 request.resource_name = '%s/blobs/%s/0' % (
550 self._proxy.prefix, digest)
aludwinee2faf72016-12-21 10:36:03 -0800551 try:
aludwind5a67882017-08-04 12:58:10 -0700552 for response in self._proxy.get_stream('Read', request):
aludwin120e37e2017-06-28 13:05:58 -0700553 yield response.data
aludwinee2faf72016-12-21 10:36:03 -0800554 except grpc.RpcError as g:
555 logging.error('gRPC error during fetch: re-throwing as IOError (%s)' % g)
556 raise IOError(g)
aludwin81178302016-11-30 17:18:49 -0800557
558 def push(self, item, push_state, content=None):
559 assert isinstance(item, Item)
560 assert item.digest is not None
561 assert item.size is not None
562 assert isinstance(push_state, _IsolateServerGrpcPushState)
563
564 # Default to item.content().
565 content = item.content() if content is None else content
566 guard_memory_use(self, content, item.size)
aludwin120e37e2017-06-28 13:05:58 -0700567 self._num_pushes += 1
aludwin81178302016-11-30 17:18:49 -0800568
569 try:
aludwinbdcd1bb2016-12-08 08:39:53 -0800570 def chunker():
571 # Returns one bit of content at a time
aludwind1e59132016-12-14 08:22:41 -0800572 if (isinstance(content, str)
573 or not isinstance(content, collections.Iterable)):
aludwinbdcd1bb2016-12-08 08:39:53 -0800574 yield content
575 else:
576 for chunk in content:
577 yield chunk
578 def slicer():
579 # Ensures every bit of content is under the gRPC max size; yields
580 # proto messages to send via gRPC.
aludwin120e37e2017-06-28 13:05:58 -0700581 request = bytestream_pb2.WriteRequest()
582 u = uuid.uuid4()
aludwind5a67882017-08-04 12:58:10 -0700583 request.resource_name = '%s/uploads/%s/blobs/%s/%d' % (
584 self._proxy.prefix, u, item.digest, item.size)
aludwin120e37e2017-06-28 13:05:58 -0700585 request.write_offset = 0
aludwinbdcd1bb2016-12-08 08:39:53 -0800586 for chunk in chunker():
587 # Make sure we send at least one chunk for zero-length blobs
588 has_sent_anything = False
aludwind1e59132016-12-14 08:22:41 -0800589 while chunk or not has_sent_anything:
aludwinbdcd1bb2016-12-08 08:39:53 -0800590 has_sent_anything = True
aludwin120e37e2017-06-28 13:05:58 -0700591 slice_len = min(len(chunk), NET_IO_FILE_CHUNK)
592 request.data = chunk[:slice_len]
593 if request.write_offset + slice_len == item.size:
594 request.finish_write = True
595 yield request
596 request.write_offset += slice_len
aludwinbdcd1bb2016-12-08 08:39:53 -0800597 chunk = chunk[slice_len:]
598
aludwinede55f22017-07-07 11:45:33 -0700599 response = None
aludwinee2faf72016-12-21 10:36:03 -0800600 try:
aludwind5a67882017-08-04 12:58:10 -0700601 response = self._proxy.call_no_retries('Write', slicer())
aludwinede55f22017-07-07 11:45:33 -0700602 except grpc.RpcError as r:
603 if r.code() == grpc.StatusCode.ALREADY_EXISTS:
aludwin120e37e2017-06-28 13:05:58 -0700604 # This is legit - we didn't check before we pushed so no problem if
605 # it's already there.
606 self._already_exists += 1
607 if self._already_exists % 100 == 0:
608 logging.info('unnecessarily pushed %d/%d blobs (%.1f%%)' % (
609 self._already_exists, self._num_pushes,
610 100.0 * self._already_exists / self._num_pushes))
611 else:
aludwinede55f22017-07-07 11:45:33 -0700612 logging.error('gRPC error during push: throwing as IOError (%s)' % r)
613 raise IOError(r)
aludwin120e37e2017-06-28 13:05:58 -0700614 except Exception as e:
615 logging.error('error during push: throwing as IOError (%s)' % e)
616 raise IOError(e)
aludwinee2faf72016-12-21 10:36:03 -0800617
aludwinede55f22017-07-07 11:45:33 -0700618 if response is not None and response.committed_size != item.size:
aludwin120e37e2017-06-28 13:05:58 -0700619 raise IOError('%s/%d: incorrect size written (%d)' % (
620 item.digest, item.size, response.committed_size))
aludwin81178302016-11-30 17:18:49 -0800621
622 finally:
623 with self._lock:
624 self._memory_use -= item.size
625
626 def contains(self, items):
627 """Returns the set of all missing items."""
aludwin120e37e2017-06-28 13:05:58 -0700628 # TODO(aludwin): this isn't supported directly in Bytestream, so for now
629 # assume that nothing is present in the cache.
aludwin81178302016-11-30 17:18:49 -0800630 # Ensure all items were initialized with 'prepare' call. Storage does that.
631 assert all(i.digest is not None and i.size is not None for i in items)
aludwin120e37e2017-06-28 13:05:58 -0700632 # Assume all Items are missing, and attach _PushState to them. The gRPC
633 # implementation doesn't actually have a push state, we just attach empty
634 # objects to satisfy the StorageApi interface.
aludwin81178302016-11-30 17:18:49 -0800635 missing_items = {}
aludwin120e37e2017-06-28 13:05:58 -0700636 for item in items:
aludwin81178302016-11-30 17:18:49 -0800637 missing_items[item] = _IsolateServerGrpcPushState()
aludwin81178302016-11-30 17:18:49 -0800638 return missing_items
639
640
aludwin758d2462017-06-29 16:38:50 -0700641def set_grpc_proxy(proxy):
642 """Sets the StorageApi to use the specified proxy."""
643 global _grpc_proxy
644 assert _grpc_proxy is None
645 _grpc_proxy = proxy
aludwin81178302016-11-30 17:18:49 -0800646
647
648def get_storage_api(url, namespace):
649 """Returns an object that implements low-level StorageApi interface.
650
651 It is used by Storage to work with single isolate |namespace|. It should
652 rarely be used directly by clients, see 'get_storage' for
653 a better alternative.
654
655 Arguments:
656 url: URL of isolate service to use shared cloud based storage.
657 namespace: isolate namespace to operate in, also defines hashing and
658 compression scheme used, i.e. namespace names that end with '-gzip'
659 store compressed data.
660
661 Returns:
662 Instance of StorageApi subclass.
663 """
aludwin758d2462017-06-29 16:38:50 -0700664 if _grpc_proxy is not None:
665 return IsolateServerGrpc(url, namespace, _grpc_proxy)
666 return IsolateServer(url, namespace)