blob: 8ce274f5892b1bd68901847720e5d5f7cf18b0e5 [file] [log] [blame]
aludwin81178302016-11-30 17:18:49 -08001#!/usr/bin/env python
2# Copyright 2016 The LUCI Authors. All rights reserved.
3# Use of this source code is governed under the Apache License, Version 2.0
4# that can be found in the LICENSE file.
5
6"""A low-level blob storage/retrieval interface to the Isolate server"""
7
8import base64
9import binascii
aludwinbdcd1bb2016-12-08 08:39:53 -080010import collections
aludwin81178302016-11-30 17:18:49 -080011import logging
aludwin120e37e2017-06-28 13:05:58 -070012import os
aludwin81178302016-11-30 17:18:49 -080013import re
14import sys
15import threading
16import time
17import types
aludwin120e37e2017-06-28 13:05:58 -070018import uuid
aludwin81178302016-11-30 17:18:49 -080019
20from utils import file_path
21from utils import net
22
23import isolated_format
24
25# gRPC may not be installed on the worker machine. This is fine, as long as
26# the bot doesn't attempt to use gRPC (checked in IsolateServerGrpc.__init__).
aludwin120e37e2017-06-28 13:05:58 -070027# Full external requirements are: grpcio, certifi.
aludwin81178302016-11-30 17:18:49 -080028try:
29 import grpc
aludwin120e37e2017-06-28 13:05:58 -070030 from google import auth as google_auth
31 from google.auth.transport import grpc as google_auth_transport_grpc
32 from google.auth.transport import requests as google_auth_transport_requests
33 from proto import bytestream_pb2
34except ImportError as err:
aludwin81178302016-11-30 17:18:49 -080035 grpc = None
aludwin120e37e2017-06-28 13:05:58 -070036 bytestream_pb2 = None
aludwin81178302016-11-30 17:18:49 -080037
aludwin120e37e2017-06-28 13:05:58 -070038# If gRPC is installed, at least give a warning if certifi is not. This is not
39# actually used anywhere in this module, but if certifi is missing,
40# google.auth.transport will fail with
41# https://stackoverflow.com/questions/24973326
42if grpc is not None:
43 try:
44 import certifi
45 except ImportError as err:
46 logging.warning('could not import certifi; gRPC HTTPS connections may fail')
aludwin81178302016-11-30 17:18:49 -080047
48# Chunk size to use when reading from network stream.
49NET_IO_FILE_CHUNK = 16 * 1024
50
51
52# Read timeout in seconds for downloads from isolate storage. If there's no
53# response from the server within this timeout whole download will be aborted.
54DOWNLOAD_READ_TIMEOUT = 60
55
56
aludwin758d2462017-06-29 16:38:50 -070057# Stores the gRPC proxy address. Must be set if the storage API class is
58# IsolateServerGrpc (call 'set_grpc_proxy').
59_grpc_proxy = None
aludwin81178302016-11-30 17:18:49 -080060
61
62class Item(object):
63 """An item to push to Storage.
64
65 Its digest and size may be provided in advance, if known. Otherwise they will
66 be derived from content(). If digest is provided, it MUST correspond to
67 hash algorithm used by Storage.
68
69 When used with Storage, Item starts its life in a main thread, travels
70 to 'contains' thread, then to 'push' thread and then finally back to
71 the main thread. It is never used concurrently from multiple threads.
72 """
73
74 def __init__(self, digest=None, size=None, high_priority=False):
75 self.digest = digest
76 self.size = size
77 self.high_priority = high_priority
78 self.compression_level = 6
79
80 def content(self):
81 """Iterable with content of this item as byte string (str) chunks."""
82 raise NotImplementedError()
83
84 def prepare(self, hash_algo):
85 """Ensures self.digest and self.size are set.
86
87 Uses content() as a source of data to calculate them. Does nothing if digest
88 and size is already known.
89
90 Arguments:
91 hash_algo: hash algorithm to use to calculate digest.
92 """
93 if self.digest is None or self.size is None:
94 digest = hash_algo()
95 total = 0
96 for chunk in self.content():
97 digest.update(chunk)
98 total += len(chunk)
99 self.digest = digest.hexdigest()
100 self.size = total
101
102
103class StorageApi(object):
104 """Interface for classes that implement low-level storage operations.
105
106 StorageApi is oblivious of compression and hashing scheme used. This details
107 are handled in higher level Storage class.
108
109 Clients should generally not use StorageApi directly. Storage class is
110 preferred since it implements compression and upload optimizations.
111 """
112
113 @property
114 def location(self):
115 """URL of the backing store that this class is using."""
116 raise NotImplementedError()
117
118 @property
119 def namespace(self):
120 """Isolate namespace used by this storage.
121
122 Indirectly defines hashing scheme and compression method used.
123 """
124 raise NotImplementedError()
125
aludwinef01fa32017-06-29 12:01:03 -0700126 @property
127 def internal_compression(self):
128 """True if this class doesn't require external compression.
129
130 If true, callers should not compress items, even if the namespace indicates
131 otherwise. Compression will be performed by the StorageApi class.
132 """
133 return False
134
aludwin81178302016-11-30 17:18:49 -0800135 def fetch(self, digest, offset=0):
136 """Fetches an object and yields its content.
137
138 Arguments:
139 digest: hash digest of item to download.
140 offset: offset (in bytes) from the start of the file to resume fetch from.
141
142 Yields:
143 Chunks of downloaded item (as str objects).
144 """
145 raise NotImplementedError()
146
147 def push(self, item, push_state, content=None):
148 """Uploads an |item| with content generated by |content| generator.
149
150 |item| MUST go through 'contains' call to get |push_state| before it can
151 be pushed to the storage.
152
153 To be clear, here is one possible usage:
154 all_items = [... all items to push as Item subclasses ...]
155 for missing_item, push_state in storage_api.contains(all_items).items():
156 storage_api.push(missing_item, push_state)
157
158 When pushing to a namespace with compression, data that should be pushed
159 and data provided by the item is not the same. In that case |content| is
160 not None and it yields chunks of compressed data (using item.content() as
161 a source of original uncompressed data). This is implemented by Storage
162 class.
163
164 Arguments:
165 item: Item object that holds information about an item being pushed.
166 push_state: push state object as returned by 'contains' call.
167 content: a generator that yields chunks to push, item.content() if None.
168
169 Returns:
170 None.
171 """
172 raise NotImplementedError()
173
174 def contains(self, items):
175 """Checks for |items| on the server, prepares missing ones for upload.
176
177 Arguments:
178 items: list of Item objects to check for presence.
179
180 Returns:
181 A dict missing Item -> opaque push state object to be passed to 'push'.
182 See doc string for 'push'.
183 """
184 raise NotImplementedError()
185
186
187class _IsolateServerPushState(object):
188 """Per-item state passed from IsolateServer.contains to IsolateServer.push.
189
190 Note this needs to be a global class to support pickling.
191 """
192
193 def __init__(self, preupload_status, size):
194 self.preupload_status = preupload_status
195 gs_upload_url = preupload_status.get('gs_upload_url') or None
196 if gs_upload_url:
197 self.upload_url = gs_upload_url
198 self.finalize_url = 'api/isolateservice/v1/finalize_gs_upload'
199 else:
200 self.upload_url = 'api/isolateservice/v1/store_inline'
201 self.finalize_url = None
202 self.uploaded = False
203 self.finalized = False
204 self.size = size
205
206
207def guard_memory_use(server, content, size):
208 """Guards a server against using excessive memory while uploading.
209
210 The server needs to contain a _memory_use int and a _lock mutex
211 (both IsolateServer and IsolateServerGrpc qualify); this function
212 then uses those values to track memory usage in a thread-safe way.
213
214 If a request would cause the memory usage to exceed a safe maximum,
215 this function sleeps in 0.1s increments until memory usage falls
216 below the maximum.
217 """
218 if isinstance(content, (basestring, list)):
219 # Memory is already used, too late.
220 with server._lock:
221 server._memory_use += size
222 else:
223 # TODO(vadimsh): Do not read from |content| generator when retrying push.
224 # If |content| is indeed a generator, it can not be re-winded back to the
225 # beginning of the stream. A retry will find it exhausted. A possible
226 # solution is to wrap |content| generator with some sort of caching
227 # restartable generator. It should be done alongside streaming support
228 # implementation.
229 #
230 # In theory, we should keep the generator, so that it is not serialized in
231 # memory. Sadly net.HttpService.request() requires the body to be
232 # serialized.
233 assert isinstance(content, types.GeneratorType), repr(content)
234 slept = False
235 # HACK HACK HACK. Please forgive me for my sins but OMG, it works!
236 # One byte less than 512mb. This is to cope with incompressible content.
237 max_size = int(sys.maxsize * 0.25)
238 while True:
239 with server._lock:
240 # This is due to 32 bits python when uploading very large files. The
241 # problem is that it's comparing uncompressed sizes, while we care
242 # about compressed sizes since it's what is serialized in memory.
243 # The first check assumes large files are compressible and that by
244 # throttling one upload at once, we can survive. Otherwise, kaboom.
245 memory_use = server._memory_use
246 if ((size >= max_size and not memory_use) or
247 (memory_use + size <= max_size)):
248 server._memory_use += size
249 memory_use = server._memory_use
250 break
251 time.sleep(0.1)
252 slept = True
253 if slept:
254 logging.info('Unblocked: %d %d', memory_use, size)
255
256
257class IsolateServer(StorageApi):
258 """StorageApi implementation that downloads and uploads to Isolate Server.
259
260 It uploads and downloads directly from Google Storage whenever appropriate.
261 Works only within single namespace.
262 """
263
264 def __init__(self, base_url, namespace):
265 super(IsolateServer, self).__init__()
266 assert file_path.is_url(base_url), base_url
267 self._base_url = base_url.rstrip('/')
268 self._namespace = namespace
269 self._namespace_dict = {
270 'compression': 'flate' if namespace.endswith(
271 ('-gzip', '-flate')) else '',
272 'digest_hash': 'sha-1',
273 'namespace': namespace,
274 }
275 self._lock = threading.Lock()
276 self._server_caps = None
277 self._memory_use = 0
278
279 @property
280 def _server_capabilities(self):
281 """Gets server details.
282
283 Returns:
284 Server capabilities dictionary as returned by /server_details endpoint.
285 """
286 # TODO(maruel): Make this request much earlier asynchronously while the
287 # files are being enumerated.
288
289 # TODO(vadimsh): Put |namespace| in the URL so that server can apply
290 # namespace-level ACLs to this call.
291
292 with self._lock:
293 if self._server_caps is None:
294 self._server_caps = net.url_read_json(
295 url='%s/api/isolateservice/v1/server_details' % self._base_url,
296 data={})
297 return self._server_caps
298
299 @property
300 def location(self):
301 return self._base_url
302
303 @property
304 def namespace(self):
305 return self._namespace
306
307 def fetch(self, digest, offset=0):
308 assert offset >= 0
309 source_url = '%s/api/isolateservice/v1/retrieve' % (
310 self._base_url)
311 logging.debug('download_file(%s, %d)', source_url, offset)
312 response = self._do_fetch(source_url, digest, offset)
313
314 if not response:
315 raise IOError(
316 'Attempted to fetch from %s; no data exist: %s / %s.' % (
317 source_url, self._namespace, digest))
318
319 # for DB uploads
320 content = response.get('content')
321 if content is not None:
322 yield base64.b64decode(content)
323 return
324
325 # for GS entities
326 connection = net.url_open(response['url'])
327 if not connection:
328 raise IOError('Failed to download %s / %s' % (self._namespace, digest))
329
330 # If |offset|, verify server respects it by checking Content-Range.
331 if offset:
332 content_range = connection.get_header('Content-Range')
333 if not content_range:
334 raise IOError('Missing Content-Range header')
335
336 # 'Content-Range' format is 'bytes <offset>-<last_byte_index>/<size>'.
337 # According to a spec, <size> can be '*' meaning "Total size of the file
338 # is not known in advance".
339 try:
340 match = re.match(r'bytes (\d+)-(\d+)/(\d+|\*)', content_range)
341 if not match:
342 raise ValueError()
343 content_offset = int(match.group(1))
344 last_byte_index = int(match.group(2))
345 size = None if match.group(3) == '*' else int(match.group(3))
346 except ValueError:
347 raise IOError('Invalid Content-Range header: %s' % content_range)
348
349 # Ensure returned offset equals requested one.
350 if offset != content_offset:
351 raise IOError('Expecting offset %d, got %d (Content-Range is %s)' % (
352 offset, content_offset, content_range))
353
354 # Ensure entire tail of the file is returned.
355 if size is not None and last_byte_index + 1 != size:
356 raise IOError('Incomplete response. Content-Range: %s' % content_range)
357
358 for data in connection.iter_content(NET_IO_FILE_CHUNK):
359 yield data
360
361 def push(self, item, push_state, content=None):
362 assert isinstance(item, Item)
363 assert item.digest is not None
364 assert item.size is not None
365 assert isinstance(push_state, _IsolateServerPushState)
366 assert not push_state.finalized
367
368 # Default to item.content().
369 content = item.content() if content is None else content
370 logging.info('Push state size: %d', push_state.size)
371 guard_memory_use(self, content, push_state.size)
372
373 try:
374 # This push operation may be a retry after failed finalization call below,
375 # no need to reupload contents in that case.
376 if not push_state.uploaded:
377 # PUT file to |upload_url|.
378 success = self._do_push(push_state, content)
379 if not success:
380 raise IOError('Failed to upload file with hash %s to URL %s' % (
381 item.digest, push_state.upload_url))
382 push_state.uploaded = True
383 else:
384 logging.info(
385 'A file %s already uploaded, retrying finalization only',
386 item.digest)
387
388 # Optionally notify the server that it's done.
389 if push_state.finalize_url:
390 # TODO(vadimsh): Calculate MD5 or CRC32C sum while uploading a file and
391 # send it to isolated server. That way isolate server can verify that
392 # the data safely reached Google Storage (GS provides MD5 and CRC32C of
393 # stored files).
394 # TODO(maruel): Fix the server to accept properly data={} so
395 # url_read_json() can be used.
396 response = net.url_read_json(
397 url='%s/%s' % (self._base_url, push_state.finalize_url),
398 data={
399 'upload_ticket': push_state.preupload_status['upload_ticket'],
400 })
401 if not response or not response['ok']:
402 raise IOError('Failed to finalize file with hash %s.' % item.digest)
403 push_state.finalized = True
404 finally:
405 with self._lock:
406 self._memory_use -= push_state.size
407
408 def contains(self, items):
409 # Ensure all items were initialized with 'prepare' call. Storage does that.
410 assert all(i.digest is not None and i.size is not None for i in items)
411
412 # Request body is a json encoded list of dicts.
413 body = {
414 'items': [
415 {
416 'digest': item.digest,
417 'is_isolated': bool(item.high_priority),
418 'size': item.size,
419 } for item in items
420 ],
421 'namespace': self._namespace_dict,
422 }
423
424 query_url = '%s/api/isolateservice/v1/preupload' % self._base_url
425
426 # Response body is a list of push_urls (or null if file is already present).
427 response = None
428 try:
429 response = net.url_read_json(url=query_url, data=body)
430 if response is None:
431 raise isolated_format.MappingError(
432 'Failed to execute preupload query')
433 except ValueError as err:
434 raise isolated_format.MappingError(
435 'Invalid response from server: %s, body is %s' % (err, response))
436
437 # Pick Items that are missing, attach _PushState to them.
438 missing_items = {}
439 for preupload_status in response.get('items', []):
440 assert 'upload_ticket' in preupload_status, (
441 preupload_status, '/preupload did not generate an upload ticket')
442 index = int(preupload_status['index'])
443 missing_items[items[index]] = _IsolateServerPushState(
444 preupload_status, items[index].size)
445 logging.info('Queried %d files, %d cache hit',
446 len(items), len(items) - len(missing_items))
447 return missing_items
448
449 def _do_fetch(self, url, digest, offset):
450 """Fetches isolated data from the URL.
451
452 Used only for fetching files, not for API calls. Can be overridden in
453 subclasses.
454
455 Args:
456 url: URL to fetch the data from, can possibly return http redirect.
457 offset: byte offset inside the file to start fetching from.
458
459 Returns:
460 net.HttpResponse compatible object, with 'read' and 'get_header' calls.
461 """
462 assert isinstance(offset, int)
463 data = {
464 'digest': digest.encode('utf-8'),
465 'namespace': self._namespace_dict,
466 'offset': offset,
467 }
468 # TODO(maruel): url + '?' + urllib.urlencode(data) once a HTTP GET endpoint
469 # is added.
470 return net.url_read_json(
471 url=url,
472 data=data,
473 read_timeout=DOWNLOAD_READ_TIMEOUT)
474
475 def _do_push(self, push_state, content):
476 """Uploads isolated file to the URL.
477
478 Used only for storing files, not for API calls. Can be overridden in
479 subclasses.
480
481 Args:
482 url: URL to upload the data to.
483 push_state: an _IsolateServicePushState instance
484 item: the original Item to be uploaded
485 content: an iterable that yields 'str' chunks.
486 """
487 # A cheezy way to avoid memcpy of (possibly huge) file, until streaming
488 # upload support is implemented.
489 if isinstance(content, list) and len(content) == 1:
490 content = content[0]
491 else:
492 content = ''.join(content)
493
494 # DB upload
495 if not push_state.finalize_url:
496 url = '%s/%s' % (self._base_url, push_state.upload_url)
497 content = base64.b64encode(content)
498 data = {
499 'upload_ticket': push_state.preupload_status['upload_ticket'],
500 'content': content,
501 }
502 response = net.url_read_json(url=url, data=data)
503 return response is not None and response['ok']
504
505 # upload to GS
506 url = push_state.upload_url
507 response = net.url_read(
508 content_type='application/octet-stream',
509 data=content,
510 method='PUT',
511 headers={'Cache-Control': 'public, max-age=31536000'},
512 url=url)
513 return response is not None
514
515
516class _IsolateServerGrpcPushState(object):
517 """Empty class, just to present same interface as IsolateServer """
518
519 def __init__(self):
520 pass
521
522
523class IsolateServerGrpc(StorageApi):
524 """StorageApi implementation that downloads and uploads to a gRPC service.
525
526 Limitations: only works for the default-gzip namespace, and with zero offsets
527 while fetching.
528 """
529
aludwin758d2462017-06-29 16:38:50 -0700530 def __init__(self, server, namespace, proxy):
aludwin81178302016-11-30 17:18:49 -0800531 super(IsolateServerGrpc, self).__init__()
532 logging.info('Using gRPC for Isolate')
aludwin120e37e2017-06-28 13:05:58 -0700533 self._server = server
534 self._lock = threading.Lock()
535 self._memory_use = 0
536 self._num_pushes = 0
537 self._already_exists = 0
aludwin81178302016-11-30 17:18:49 -0800538
aludwin81178302016-11-30 17:18:49 -0800539 # Proxies only support the default-gzip namespace for now.
aludwin120e37e2017-06-28 13:05:58 -0700540 # TODO(aludwin): support other namespaces if necessary
aludwin81178302016-11-30 17:18:49 -0800541 assert namespace == 'default-gzip'
aludwinef01fa32017-06-29 12:01:03 -0700542 self._namespace = namespace
543
544 # Make sure grpc was successfully imported
545 assert grpc
546 assert bytestream_pb2
aludwin120e37e2017-06-28 13:05:58 -0700547
aludwin758d2462017-06-29 16:38:50 -0700548 roots = os.environ.get('ISOLATE_GRPC_PROXY_TLS_ROOTS')
549 overd = os.environ.get('ISOLATE_GRPC_PROXY_TLS_OVERRIDE')
aludwin120e37e2017-06-28 13:05:58 -0700550
551 # The "proxy" envvar must be of the form:
552 # http[s]://<server>[:port][/prefix]
553 m = re.search('^(https?):\/\/([^\/]+)/?(.*)$', proxy)
554 if not m:
555 raise ValueError(('gRPC proxy must have the form: '
556 'http[s]://<server>[:port][/prefix] '
557 '(given: %s)') % proxy)
558 transport = m.group(1)
559 host = m.group(2)
560 prefix = m.group(3)
561 if not prefix.endswith('/'):
562 prefix = prefix + '/'
563 logging.info('gRPC proxy: transport %s, host %s, prefix %s',
564 transport, host, prefix)
565 self._prefix = prefix
566
567 if transport == 'http':
568 self._channel = grpc.insecure_channel(host)
569 elif transport == 'https':
570 # Using cloud container builder scopes for testing:
aludwin84cbcbb2017-06-30 12:13:18 -0700571 scopes = ('https://www.googleapis.com/auth/cloud-source-tools',)
aludwin120e37e2017-06-28 13:05:58 -0700572 credentials, _ = google_auth.default(scopes=scopes)
573 request = google_auth_transport_requests.Request()
574 options = ()
575 root_certs = None
576 if roots is not None:
577 logging.info('Using root CA %s', roots)
578 with open(roots) as f:
579 root_certs = f.read()
580 if overd is not None:
581 logging.info('Using TLS server override %s', overd)
582 options=(('grpc.ssl_target_name_override', overd),)
583 ssl_creds = grpc.ssl_channel_credentials(root_certificates=root_certs)
584 self._channel = google_auth_transport_grpc.secure_authorized_channel(
585 credentials, request, host, ssl_creds, options=options)
586 else:
587 raise ValueError('unknown transport %s (should be http[s])' % transport)
588 self._stub = bytestream_pb2.ByteStreamStub(self._channel)
aludwin81178302016-11-30 17:18:49 -0800589
590 @property
591 def location(self):
592 return self._server
593
594 @property
595 def namespace(self):
aludwinef01fa32017-06-29 12:01:03 -0700596 return self._namespace
597
598 @property
599 def internal_compression(self):
600 # gRPC natively compresses all messages before transmission.
601 return True
aludwin81178302016-11-30 17:18:49 -0800602
603 def fetch(self, digest, offset=0):
604 # The gRPC APIs only work with an offset of 0
605 assert offset == 0
aludwin120e37e2017-06-28 13:05:58 -0700606 request = bytestream_pb2.ReadRequest()
607 #TODO(aludwin): send the expected size of the item
608 request.resource_name = '%sblobs/%s/0' % (
609 self._prefix, digest)
aludwinee2faf72016-12-21 10:36:03 -0800610 try:
aludwin120e37e2017-06-28 13:05:58 -0700611 for response in self._stub.Read(request, timeout=DOWNLOAD_READ_TIMEOUT):
612 yield response.data
aludwinee2faf72016-12-21 10:36:03 -0800613 except grpc.RpcError as g:
614 logging.error('gRPC error during fetch: re-throwing as IOError (%s)' % g)
615 raise IOError(g)
aludwin81178302016-11-30 17:18:49 -0800616
617 def push(self, item, push_state, content=None):
618 assert isinstance(item, Item)
619 assert item.digest is not None
620 assert item.size is not None
621 assert isinstance(push_state, _IsolateServerGrpcPushState)
622
623 # Default to item.content().
624 content = item.content() if content is None else content
625 guard_memory_use(self, content, item.size)
aludwin120e37e2017-06-28 13:05:58 -0700626 self._num_pushes += 1
aludwin81178302016-11-30 17:18:49 -0800627
628 try:
aludwinbdcd1bb2016-12-08 08:39:53 -0800629 def chunker():
630 # Returns one bit of content at a time
aludwind1e59132016-12-14 08:22:41 -0800631 if (isinstance(content, str)
632 or not isinstance(content, collections.Iterable)):
aludwinbdcd1bb2016-12-08 08:39:53 -0800633 yield content
634 else:
635 for chunk in content:
636 yield chunk
637 def slicer():
638 # Ensures every bit of content is under the gRPC max size; yields
639 # proto messages to send via gRPC.
aludwin120e37e2017-06-28 13:05:58 -0700640 request = bytestream_pb2.WriteRequest()
641 u = uuid.uuid4()
642 request.resource_name = '%suploads/%s/blobs/%s/%d' % (
643 self._prefix, u, item.digest, item.size)
644 request.write_offset = 0
aludwinbdcd1bb2016-12-08 08:39:53 -0800645 for chunk in chunker():
646 # Make sure we send at least one chunk for zero-length blobs
647 has_sent_anything = False
aludwind1e59132016-12-14 08:22:41 -0800648 while chunk or not has_sent_anything:
aludwinbdcd1bb2016-12-08 08:39:53 -0800649 has_sent_anything = True
aludwin120e37e2017-06-28 13:05:58 -0700650 slice_len = min(len(chunk), NET_IO_FILE_CHUNK)
651 request.data = chunk[:slice_len]
652 if request.write_offset + slice_len == item.size:
653 request.finish_write = True
654 yield request
655 request.write_offset += slice_len
aludwinbdcd1bb2016-12-08 08:39:53 -0800656 chunk = chunk[slice_len:]
657
aludwinede55f22017-07-07 11:45:33 -0700658 response = None
aludwinee2faf72016-12-21 10:36:03 -0800659 try:
aludwin120e37e2017-06-28 13:05:58 -0700660 response = self._stub.Write(slicer())
aludwinede55f22017-07-07 11:45:33 -0700661 except grpc.RpcError as r:
662 if r.code() == grpc.StatusCode.ALREADY_EXISTS:
aludwin120e37e2017-06-28 13:05:58 -0700663 # This is legit - we didn't check before we pushed so no problem if
664 # it's already there.
665 self._already_exists += 1
666 if self._already_exists % 100 == 0:
667 logging.info('unnecessarily pushed %d/%d blobs (%.1f%%)' % (
668 self._already_exists, self._num_pushes,
669 100.0 * self._already_exists / self._num_pushes))
670 else:
aludwinede55f22017-07-07 11:45:33 -0700671 logging.error('gRPC error during push: throwing as IOError (%s)' % r)
672 raise IOError(r)
aludwin120e37e2017-06-28 13:05:58 -0700673 except Exception as e:
674 logging.error('error during push: throwing as IOError (%s)' % e)
675 raise IOError(e)
aludwinee2faf72016-12-21 10:36:03 -0800676
aludwinede55f22017-07-07 11:45:33 -0700677 if response is not None and response.committed_size != item.size:
aludwin120e37e2017-06-28 13:05:58 -0700678 raise IOError('%s/%d: incorrect size written (%d)' % (
679 item.digest, item.size, response.committed_size))
aludwin81178302016-11-30 17:18:49 -0800680
681 finally:
682 with self._lock:
683 self._memory_use -= item.size
684
685 def contains(self, items):
686 """Returns the set of all missing items."""
aludwin120e37e2017-06-28 13:05:58 -0700687 # TODO(aludwin): this isn't supported directly in Bytestream, so for now
688 # assume that nothing is present in the cache.
aludwin81178302016-11-30 17:18:49 -0800689 # Ensure all items were initialized with 'prepare' call. Storage does that.
690 assert all(i.digest is not None and i.size is not None for i in items)
aludwin120e37e2017-06-28 13:05:58 -0700691 # Assume all Items are missing, and attach _PushState to them. The gRPC
692 # implementation doesn't actually have a push state, we just attach empty
693 # objects to satisfy the StorageApi interface.
aludwin81178302016-11-30 17:18:49 -0800694 missing_items = {}
aludwin120e37e2017-06-28 13:05:58 -0700695 for item in items:
aludwin81178302016-11-30 17:18:49 -0800696 missing_items[item] = _IsolateServerGrpcPushState()
aludwin81178302016-11-30 17:18:49 -0800697 return missing_items
698
699
aludwin758d2462017-06-29 16:38:50 -0700700def set_grpc_proxy(proxy):
701 """Sets the StorageApi to use the specified proxy."""
702 global _grpc_proxy
703 assert _grpc_proxy is None
704 _grpc_proxy = proxy
aludwin81178302016-11-30 17:18:49 -0800705
706
707def get_storage_api(url, namespace):
708 """Returns an object that implements low-level StorageApi interface.
709
710 It is used by Storage to work with single isolate |namespace|. It should
711 rarely be used directly by clients, see 'get_storage' for
712 a better alternative.
713
714 Arguments:
715 url: URL of isolate service to use shared cloud based storage.
716 namespace: isolate namespace to operate in, also defines hashing and
717 compression scheme used, i.e. namespace names that end with '-gzip'
718 store compressed data.
719
720 Returns:
721 Instance of StorageApi subclass.
722 """
aludwin758d2462017-06-29 16:38:50 -0700723 if _grpc_proxy is not None:
724 return IsolateServerGrpc(url, namespace, _grpc_proxy)
725 return IsolateServer(url, namespace)