blob: f0e4682d232033f2be67960e1430fcf04e362427 [file] [log] [blame]
aludwin81178302016-11-30 17:18:49 -08001#!/usr/bin/env python
2# Copyright 2016 The LUCI Authors. All rights reserved.
3# Use of this source code is governed under the Apache License, Version 2.0
4# that can be found in the LICENSE file.
5
6"""A low-level blob storage/retrieval interface to the Isolate server"""
7
8import base64
9import binascii
aludwinbdcd1bb2016-12-08 08:39:53 -080010import collections
aludwin81178302016-11-30 17:18:49 -080011import logging
aludwin120e37e2017-06-28 13:05:58 -070012import os
aludwin81178302016-11-30 17:18:49 -080013import re
14import sys
15import threading
16import time
17import types
aludwin120e37e2017-06-28 13:05:58 -070018import uuid
aludwin81178302016-11-30 17:18:49 -080019
20from utils import file_path
21from utils import net
22
23import isolated_format
24
25# gRPC may not be installed on the worker machine. This is fine, as long as
26# the bot doesn't attempt to use gRPC (checked in IsolateServerGrpc.__init__).
aludwin120e37e2017-06-28 13:05:58 -070027# Full external requirements are: grpcio, certifi.
aludwin81178302016-11-30 17:18:49 -080028try:
29 import grpc
aludwin120e37e2017-06-28 13:05:58 -070030 from google import auth as google_auth
31 from google.auth.transport import grpc as google_auth_transport_grpc
32 from google.auth.transport import requests as google_auth_transport_requests
33 from proto import bytestream_pb2
34except ImportError as err:
aludwin81178302016-11-30 17:18:49 -080035 grpc = None
aludwin120e37e2017-06-28 13:05:58 -070036 bytestream_pb2 = None
aludwin81178302016-11-30 17:18:49 -080037
aludwin120e37e2017-06-28 13:05:58 -070038# If gRPC is installed, at least give a warning if certifi is not. This is not
39# actually used anywhere in this module, but if certifi is missing,
40# google.auth.transport will fail with
41# https://stackoverflow.com/questions/24973326
42if grpc is not None:
43 try:
44 import certifi
45 except ImportError as err:
46 logging.warning('could not import certifi; gRPC HTTPS connections may fail')
aludwin81178302016-11-30 17:18:49 -080047
48# Chunk size to use when reading from network stream.
49NET_IO_FILE_CHUNK = 16 * 1024
50
51
52# Read timeout in seconds for downloads from isolate storage. If there's no
53# response from the server within this timeout whole download will be aborted.
54DOWNLOAD_READ_TIMEOUT = 60
55
56
57# A class to use to communicate with the server by default. Can be changed by
58# 'set_storage_api_class'. Default is IsolateServer.
59_storage_api_cls = None
60
61
62class Item(object):
63 """An item to push to Storage.
64
65 Its digest and size may be provided in advance, if known. Otherwise they will
66 be derived from content(). If digest is provided, it MUST correspond to
67 hash algorithm used by Storage.
68
69 When used with Storage, Item starts its life in a main thread, travels
70 to 'contains' thread, then to 'push' thread and then finally back to
71 the main thread. It is never used concurrently from multiple threads.
72 """
73
74 def __init__(self, digest=None, size=None, high_priority=False):
75 self.digest = digest
76 self.size = size
77 self.high_priority = high_priority
78 self.compression_level = 6
79
80 def content(self):
81 """Iterable with content of this item as byte string (str) chunks."""
82 raise NotImplementedError()
83
84 def prepare(self, hash_algo):
85 """Ensures self.digest and self.size are set.
86
87 Uses content() as a source of data to calculate them. Does nothing if digest
88 and size is already known.
89
90 Arguments:
91 hash_algo: hash algorithm to use to calculate digest.
92 """
93 if self.digest is None or self.size is None:
94 digest = hash_algo()
95 total = 0
96 for chunk in self.content():
97 digest.update(chunk)
98 total += len(chunk)
99 self.digest = digest.hexdigest()
100 self.size = total
101
102
103class StorageApi(object):
104 """Interface for classes that implement low-level storage operations.
105
106 StorageApi is oblivious of compression and hashing scheme used. This details
107 are handled in higher level Storage class.
108
109 Clients should generally not use StorageApi directly. Storage class is
110 preferred since it implements compression and upload optimizations.
111 """
112
113 @property
114 def location(self):
115 """URL of the backing store that this class is using."""
116 raise NotImplementedError()
117
118 @property
119 def namespace(self):
120 """Isolate namespace used by this storage.
121
122 Indirectly defines hashing scheme and compression method used.
123 """
124 raise NotImplementedError()
125
aludwinef01fa32017-06-29 12:01:03 -0700126 @property
127 def internal_compression(self):
128 """True if this class doesn't require external compression.
129
130 If true, callers should not compress items, even if the namespace indicates
131 otherwise. Compression will be performed by the StorageApi class.
132 """
133 return False
134
aludwin81178302016-11-30 17:18:49 -0800135 def fetch(self, digest, offset=0):
136 """Fetches an object and yields its content.
137
138 Arguments:
139 digest: hash digest of item to download.
140 offset: offset (in bytes) from the start of the file to resume fetch from.
141
142 Yields:
143 Chunks of downloaded item (as str objects).
144 """
145 raise NotImplementedError()
146
147 def push(self, item, push_state, content=None):
148 """Uploads an |item| with content generated by |content| generator.
149
150 |item| MUST go through 'contains' call to get |push_state| before it can
151 be pushed to the storage.
152
153 To be clear, here is one possible usage:
154 all_items = [... all items to push as Item subclasses ...]
155 for missing_item, push_state in storage_api.contains(all_items).items():
156 storage_api.push(missing_item, push_state)
157
158 When pushing to a namespace with compression, data that should be pushed
159 and data provided by the item is not the same. In that case |content| is
160 not None and it yields chunks of compressed data (using item.content() as
161 a source of original uncompressed data). This is implemented by Storage
162 class.
163
164 Arguments:
165 item: Item object that holds information about an item being pushed.
166 push_state: push state object as returned by 'contains' call.
167 content: a generator that yields chunks to push, item.content() if None.
168
169 Returns:
170 None.
171 """
172 raise NotImplementedError()
173
174 def contains(self, items):
175 """Checks for |items| on the server, prepares missing ones for upload.
176
177 Arguments:
178 items: list of Item objects to check for presence.
179
180 Returns:
181 A dict missing Item -> opaque push state object to be passed to 'push'.
182 See doc string for 'push'.
183 """
184 raise NotImplementedError()
185
186
187class _IsolateServerPushState(object):
188 """Per-item state passed from IsolateServer.contains to IsolateServer.push.
189
190 Note this needs to be a global class to support pickling.
191 """
192
193 def __init__(self, preupload_status, size):
194 self.preupload_status = preupload_status
195 gs_upload_url = preupload_status.get('gs_upload_url') or None
196 if gs_upload_url:
197 self.upload_url = gs_upload_url
198 self.finalize_url = 'api/isolateservice/v1/finalize_gs_upload'
199 else:
200 self.upload_url = 'api/isolateservice/v1/store_inline'
201 self.finalize_url = None
202 self.uploaded = False
203 self.finalized = False
204 self.size = size
205
206
207def guard_memory_use(server, content, size):
208 """Guards a server against using excessive memory while uploading.
209
210 The server needs to contain a _memory_use int and a _lock mutex
211 (both IsolateServer and IsolateServerGrpc qualify); this function
212 then uses those values to track memory usage in a thread-safe way.
213
214 If a request would cause the memory usage to exceed a safe maximum,
215 this function sleeps in 0.1s increments until memory usage falls
216 below the maximum.
217 """
218 if isinstance(content, (basestring, list)):
219 # Memory is already used, too late.
220 with server._lock:
221 server._memory_use += size
222 else:
223 # TODO(vadimsh): Do not read from |content| generator when retrying push.
224 # If |content| is indeed a generator, it can not be re-winded back to the
225 # beginning of the stream. A retry will find it exhausted. A possible
226 # solution is to wrap |content| generator with some sort of caching
227 # restartable generator. It should be done alongside streaming support
228 # implementation.
229 #
230 # In theory, we should keep the generator, so that it is not serialized in
231 # memory. Sadly net.HttpService.request() requires the body to be
232 # serialized.
233 assert isinstance(content, types.GeneratorType), repr(content)
234 slept = False
235 # HACK HACK HACK. Please forgive me for my sins but OMG, it works!
236 # One byte less than 512mb. This is to cope with incompressible content.
237 max_size = int(sys.maxsize * 0.25)
238 while True:
239 with server._lock:
240 # This is due to 32 bits python when uploading very large files. The
241 # problem is that it's comparing uncompressed sizes, while we care
242 # about compressed sizes since it's what is serialized in memory.
243 # The first check assumes large files are compressible and that by
244 # throttling one upload at once, we can survive. Otherwise, kaboom.
245 memory_use = server._memory_use
246 if ((size >= max_size and not memory_use) or
247 (memory_use + size <= max_size)):
248 server._memory_use += size
249 memory_use = server._memory_use
250 break
251 time.sleep(0.1)
252 slept = True
253 if slept:
254 logging.info('Unblocked: %d %d', memory_use, size)
255
256
257class IsolateServer(StorageApi):
258 """StorageApi implementation that downloads and uploads to Isolate Server.
259
260 It uploads and downloads directly from Google Storage whenever appropriate.
261 Works only within single namespace.
262 """
263
264 def __init__(self, base_url, namespace):
265 super(IsolateServer, self).__init__()
266 assert file_path.is_url(base_url), base_url
267 self._base_url = base_url.rstrip('/')
268 self._namespace = namespace
269 self._namespace_dict = {
270 'compression': 'flate' if namespace.endswith(
271 ('-gzip', '-flate')) else '',
272 'digest_hash': 'sha-1',
273 'namespace': namespace,
274 }
275 self._lock = threading.Lock()
276 self._server_caps = None
277 self._memory_use = 0
278
279 @property
280 def _server_capabilities(self):
281 """Gets server details.
282
283 Returns:
284 Server capabilities dictionary as returned by /server_details endpoint.
285 """
286 # TODO(maruel): Make this request much earlier asynchronously while the
287 # files are being enumerated.
288
289 # TODO(vadimsh): Put |namespace| in the URL so that server can apply
290 # namespace-level ACLs to this call.
291
292 with self._lock:
293 if self._server_caps is None:
294 self._server_caps = net.url_read_json(
295 url='%s/api/isolateservice/v1/server_details' % self._base_url,
296 data={})
297 return self._server_caps
298
299 @property
300 def location(self):
301 return self._base_url
302
303 @property
304 def namespace(self):
305 return self._namespace
306
307 def fetch(self, digest, offset=0):
308 assert offset >= 0
309 source_url = '%s/api/isolateservice/v1/retrieve' % (
310 self._base_url)
311 logging.debug('download_file(%s, %d)', source_url, offset)
312 response = self._do_fetch(source_url, digest, offset)
313
314 if not response:
315 raise IOError(
316 'Attempted to fetch from %s; no data exist: %s / %s.' % (
317 source_url, self._namespace, digest))
318
319 # for DB uploads
320 content = response.get('content')
321 if content is not None:
322 yield base64.b64decode(content)
323 return
324
325 # for GS entities
326 connection = net.url_open(response['url'])
327 if not connection:
328 raise IOError('Failed to download %s / %s' % (self._namespace, digest))
329
330 # If |offset|, verify server respects it by checking Content-Range.
331 if offset:
332 content_range = connection.get_header('Content-Range')
333 if not content_range:
334 raise IOError('Missing Content-Range header')
335
336 # 'Content-Range' format is 'bytes <offset>-<last_byte_index>/<size>'.
337 # According to a spec, <size> can be '*' meaning "Total size of the file
338 # is not known in advance".
339 try:
340 match = re.match(r'bytes (\d+)-(\d+)/(\d+|\*)', content_range)
341 if not match:
342 raise ValueError()
343 content_offset = int(match.group(1))
344 last_byte_index = int(match.group(2))
345 size = None if match.group(3) == '*' else int(match.group(3))
346 except ValueError:
347 raise IOError('Invalid Content-Range header: %s' % content_range)
348
349 # Ensure returned offset equals requested one.
350 if offset != content_offset:
351 raise IOError('Expecting offset %d, got %d (Content-Range is %s)' % (
352 offset, content_offset, content_range))
353
354 # Ensure entire tail of the file is returned.
355 if size is not None and last_byte_index + 1 != size:
356 raise IOError('Incomplete response. Content-Range: %s' % content_range)
357
358 for data in connection.iter_content(NET_IO_FILE_CHUNK):
359 yield data
360
361 def push(self, item, push_state, content=None):
362 assert isinstance(item, Item)
363 assert item.digest is not None
364 assert item.size is not None
365 assert isinstance(push_state, _IsolateServerPushState)
366 assert not push_state.finalized
367
368 # Default to item.content().
369 content = item.content() if content is None else content
370 logging.info('Push state size: %d', push_state.size)
371 guard_memory_use(self, content, push_state.size)
372
373 try:
374 # This push operation may be a retry after failed finalization call below,
375 # no need to reupload contents in that case.
376 if not push_state.uploaded:
377 # PUT file to |upload_url|.
378 success = self._do_push(push_state, content)
379 if not success:
380 raise IOError('Failed to upload file with hash %s to URL %s' % (
381 item.digest, push_state.upload_url))
382 push_state.uploaded = True
383 else:
384 logging.info(
385 'A file %s already uploaded, retrying finalization only',
386 item.digest)
387
388 # Optionally notify the server that it's done.
389 if push_state.finalize_url:
390 # TODO(vadimsh): Calculate MD5 or CRC32C sum while uploading a file and
391 # send it to isolated server. That way isolate server can verify that
392 # the data safely reached Google Storage (GS provides MD5 and CRC32C of
393 # stored files).
394 # TODO(maruel): Fix the server to accept properly data={} so
395 # url_read_json() can be used.
396 response = net.url_read_json(
397 url='%s/%s' % (self._base_url, push_state.finalize_url),
398 data={
399 'upload_ticket': push_state.preupload_status['upload_ticket'],
400 })
401 if not response or not response['ok']:
402 raise IOError('Failed to finalize file with hash %s.' % item.digest)
403 push_state.finalized = True
404 finally:
405 with self._lock:
406 self._memory_use -= push_state.size
407
408 def contains(self, items):
409 # Ensure all items were initialized with 'prepare' call. Storage does that.
410 assert all(i.digest is not None and i.size is not None for i in items)
411
412 # Request body is a json encoded list of dicts.
413 body = {
414 'items': [
415 {
416 'digest': item.digest,
417 'is_isolated': bool(item.high_priority),
418 'size': item.size,
419 } for item in items
420 ],
421 'namespace': self._namespace_dict,
422 }
423
424 query_url = '%s/api/isolateservice/v1/preupload' % self._base_url
425
426 # Response body is a list of push_urls (or null if file is already present).
427 response = None
428 try:
429 response = net.url_read_json(url=query_url, data=body)
430 if response is None:
431 raise isolated_format.MappingError(
432 'Failed to execute preupload query')
433 except ValueError as err:
434 raise isolated_format.MappingError(
435 'Invalid response from server: %s, body is %s' % (err, response))
436
437 # Pick Items that are missing, attach _PushState to them.
438 missing_items = {}
439 for preupload_status in response.get('items', []):
440 assert 'upload_ticket' in preupload_status, (
441 preupload_status, '/preupload did not generate an upload ticket')
442 index = int(preupload_status['index'])
443 missing_items[items[index]] = _IsolateServerPushState(
444 preupload_status, items[index].size)
445 logging.info('Queried %d files, %d cache hit',
446 len(items), len(items) - len(missing_items))
447 return missing_items
448
449 def _do_fetch(self, url, digest, offset):
450 """Fetches isolated data from the URL.
451
452 Used only for fetching files, not for API calls. Can be overridden in
453 subclasses.
454
455 Args:
456 url: URL to fetch the data from, can possibly return http redirect.
457 offset: byte offset inside the file to start fetching from.
458
459 Returns:
460 net.HttpResponse compatible object, with 'read' and 'get_header' calls.
461 """
462 assert isinstance(offset, int)
463 data = {
464 'digest': digest.encode('utf-8'),
465 'namespace': self._namespace_dict,
466 'offset': offset,
467 }
468 # TODO(maruel): url + '?' + urllib.urlencode(data) once a HTTP GET endpoint
469 # is added.
470 return net.url_read_json(
471 url=url,
472 data=data,
473 read_timeout=DOWNLOAD_READ_TIMEOUT)
474
475 def _do_push(self, push_state, content):
476 """Uploads isolated file to the URL.
477
478 Used only for storing files, not for API calls. Can be overridden in
479 subclasses.
480
481 Args:
482 url: URL to upload the data to.
483 push_state: an _IsolateServicePushState instance
484 item: the original Item to be uploaded
485 content: an iterable that yields 'str' chunks.
486 """
487 # A cheezy way to avoid memcpy of (possibly huge) file, until streaming
488 # upload support is implemented.
489 if isinstance(content, list) and len(content) == 1:
490 content = content[0]
491 else:
492 content = ''.join(content)
493
494 # DB upload
495 if not push_state.finalize_url:
496 url = '%s/%s' % (self._base_url, push_state.upload_url)
497 content = base64.b64encode(content)
498 data = {
499 'upload_ticket': push_state.preupload_status['upload_ticket'],
500 'content': content,
501 }
502 response = net.url_read_json(url=url, data=data)
503 return response is not None and response['ok']
504
505 # upload to GS
506 url = push_state.upload_url
507 response = net.url_read(
508 content_type='application/octet-stream',
509 data=content,
510 method='PUT',
511 headers={'Cache-Control': 'public, max-age=31536000'},
512 url=url)
513 return response is not None
514
515
516class _IsolateServerGrpcPushState(object):
517 """Empty class, just to present same interface as IsolateServer """
518
519 def __init__(self):
520 pass
521
522
523class IsolateServerGrpc(StorageApi):
524 """StorageApi implementation that downloads and uploads to a gRPC service.
525
526 Limitations: only works for the default-gzip namespace, and with zero offsets
527 while fetching.
528 """
529
530 def __init__(self, server, namespace):
531 super(IsolateServerGrpc, self).__init__()
532 logging.info('Using gRPC for Isolate')
aludwin120e37e2017-06-28 13:05:58 -0700533 self._server = server
534 self._lock = threading.Lock()
535 self._memory_use = 0
536 self._num_pushes = 0
537 self._already_exists = 0
aludwin81178302016-11-30 17:18:49 -0800538
aludwin81178302016-11-30 17:18:49 -0800539 # Proxies only support the default-gzip namespace for now.
aludwin120e37e2017-06-28 13:05:58 -0700540 # TODO(aludwin): support other namespaces if necessary
aludwin81178302016-11-30 17:18:49 -0800541 assert namespace == 'default-gzip'
aludwinef01fa32017-06-29 12:01:03 -0700542 self._namespace = namespace
543
544 # Make sure grpc was successfully imported
545 assert grpc
546 assert bytestream_pb2
aludwin120e37e2017-06-28 13:05:58 -0700547
548 proxy = os.environ.get('ISOLATED_GRPC_PROXY', '')
549 roots = os.environ.get('ISOLATED_GRPC_PROXY_TLS_ROOTS')
550 overd = os.environ.get('ISOLATED_GRPC_PROXY_TLS_OVERRIDE')
551
552 # The "proxy" envvar must be of the form:
553 # http[s]://<server>[:port][/prefix]
554 m = re.search('^(https?):\/\/([^\/]+)/?(.*)$', proxy)
555 if not m:
556 raise ValueError(('gRPC proxy must have the form: '
557 'http[s]://<server>[:port][/prefix] '
558 '(given: %s)') % proxy)
559 transport = m.group(1)
560 host = m.group(2)
561 prefix = m.group(3)
562 if not prefix.endswith('/'):
563 prefix = prefix + '/'
564 logging.info('gRPC proxy: transport %s, host %s, prefix %s',
565 transport, host, prefix)
566 self._prefix = prefix
567
568 if transport == 'http':
569 self._channel = grpc.insecure_channel(host)
570 elif transport == 'https':
571 # Using cloud container builder scopes for testing:
572 scopes = ('https://www.googleapis.com/auth/cloud-build-service',)
573 credentials, _ = google_auth.default(scopes=scopes)
574 request = google_auth_transport_requests.Request()
575 options = ()
576 root_certs = None
577 if roots is not None:
578 logging.info('Using root CA %s', roots)
579 with open(roots) as f:
580 root_certs = f.read()
581 if overd is not None:
582 logging.info('Using TLS server override %s', overd)
583 options=(('grpc.ssl_target_name_override', overd),)
584 ssl_creds = grpc.ssl_channel_credentials(root_certificates=root_certs)
585 self._channel = google_auth_transport_grpc.secure_authorized_channel(
586 credentials, request, host, ssl_creds, options=options)
587 else:
588 raise ValueError('unknown transport %s (should be http[s])' % transport)
589 self._stub = bytestream_pb2.ByteStreamStub(self._channel)
aludwin81178302016-11-30 17:18:49 -0800590
591 @property
592 def location(self):
593 return self._server
594
595 @property
596 def namespace(self):
aludwinef01fa32017-06-29 12:01:03 -0700597 return self._namespace
598
599 @property
600 def internal_compression(self):
601 # gRPC natively compresses all messages before transmission.
602 return True
aludwin81178302016-11-30 17:18:49 -0800603
604 def fetch(self, digest, offset=0):
605 # The gRPC APIs only work with an offset of 0
606 assert offset == 0
aludwin120e37e2017-06-28 13:05:58 -0700607 request = bytestream_pb2.ReadRequest()
608 #TODO(aludwin): send the expected size of the item
609 request.resource_name = '%sblobs/%s/0' % (
610 self._prefix, digest)
aludwinee2faf72016-12-21 10:36:03 -0800611 try:
aludwin120e37e2017-06-28 13:05:58 -0700612 for response in self._stub.Read(request, timeout=DOWNLOAD_READ_TIMEOUT):
613 yield response.data
aludwinee2faf72016-12-21 10:36:03 -0800614 except grpc.RpcError as g:
615 logging.error('gRPC error during fetch: re-throwing as IOError (%s)' % g)
616 raise IOError(g)
aludwin81178302016-11-30 17:18:49 -0800617
618 def push(self, item, push_state, content=None):
619 assert isinstance(item, Item)
620 assert item.digest is not None
621 assert item.size is not None
622 assert isinstance(push_state, _IsolateServerGrpcPushState)
623
624 # Default to item.content().
625 content = item.content() if content is None else content
626 guard_memory_use(self, content, item.size)
aludwin120e37e2017-06-28 13:05:58 -0700627 self._num_pushes += 1
aludwin81178302016-11-30 17:18:49 -0800628
629 try:
aludwinbdcd1bb2016-12-08 08:39:53 -0800630 def chunker():
631 # Returns one bit of content at a time
aludwind1e59132016-12-14 08:22:41 -0800632 if (isinstance(content, str)
633 or not isinstance(content, collections.Iterable)):
aludwinbdcd1bb2016-12-08 08:39:53 -0800634 yield content
635 else:
636 for chunk in content:
637 yield chunk
638 def slicer():
639 # Ensures every bit of content is under the gRPC max size; yields
640 # proto messages to send via gRPC.
aludwin120e37e2017-06-28 13:05:58 -0700641 request = bytestream_pb2.WriteRequest()
642 u = uuid.uuid4()
643 request.resource_name = '%suploads/%s/blobs/%s/%d' % (
644 self._prefix, u, item.digest, item.size)
645 request.write_offset = 0
aludwinbdcd1bb2016-12-08 08:39:53 -0800646 for chunk in chunker():
647 # Make sure we send at least one chunk for zero-length blobs
648 has_sent_anything = False
aludwind1e59132016-12-14 08:22:41 -0800649 while chunk or not has_sent_anything:
aludwinbdcd1bb2016-12-08 08:39:53 -0800650 has_sent_anything = True
aludwin120e37e2017-06-28 13:05:58 -0700651 slice_len = min(len(chunk), NET_IO_FILE_CHUNK)
652 request.data = chunk[:slice_len]
653 if request.write_offset + slice_len == item.size:
654 request.finish_write = True
655 yield request
656 request.write_offset += slice_len
aludwinbdcd1bb2016-12-08 08:39:53 -0800657 chunk = chunk[slice_len:]
658
aludwinee2faf72016-12-21 10:36:03 -0800659 try:
aludwin120e37e2017-06-28 13:05:58 -0700660 response = self._stub.Write(slicer())
661 except grpc.Call as c:
662 # You might think that errors from gRPC would be rpc.RpcError. You'd
663 # be... right... but it's *also* an instance of grpc.Call, and that's
664 # where the status code actually lives.
665 if c.code() == grpc.StatusCode.ALREADY_EXISTS:
666 # This is legit - we didn't check before we pushed so no problem if
667 # it's already there.
668 self._already_exists += 1
669 if self._already_exists % 100 == 0:
670 logging.info('unnecessarily pushed %d/%d blobs (%.1f%%)' % (
671 self._already_exists, self._num_pushes,
672 100.0 * self._already_exists / self._num_pushes))
673 else:
674 logging.error('gRPC error during push: throwing as IOError (%s)' % c)
675 raise IOError(c)
676 except Exception as e:
677 logging.error('error during push: throwing as IOError (%s)' % e)
678 raise IOError(e)
aludwinee2faf72016-12-21 10:36:03 -0800679
aludwin120e37e2017-06-28 13:05:58 -0700680 if response.committed_size != item.size:
681 raise IOError('%s/%d: incorrect size written (%d)' % (
682 item.digest, item.size, response.committed_size))
aludwin81178302016-11-30 17:18:49 -0800683
684 finally:
685 with self._lock:
686 self._memory_use -= item.size
687
688 def contains(self, items):
689 """Returns the set of all missing items."""
aludwin120e37e2017-06-28 13:05:58 -0700690 # TODO(aludwin): this isn't supported directly in Bytestream, so for now
691 # assume that nothing is present in the cache.
aludwin81178302016-11-30 17:18:49 -0800692 # Ensure all items were initialized with 'prepare' call. Storage does that.
693 assert all(i.digest is not None and i.size is not None for i in items)
aludwin120e37e2017-06-28 13:05:58 -0700694 # Assume all Items are missing, and attach _PushState to them. The gRPC
695 # implementation doesn't actually have a push state, we just attach empty
696 # objects to satisfy the StorageApi interface.
aludwin81178302016-11-30 17:18:49 -0800697 missing_items = {}
aludwin120e37e2017-06-28 13:05:58 -0700698 for item in items:
aludwin81178302016-11-30 17:18:49 -0800699 missing_items[item] = _IsolateServerGrpcPushState()
aludwin81178302016-11-30 17:18:49 -0800700 return missing_items
701
702
703def set_storage_api_class(cls):
704 """Replaces StorageApi implementation used by default."""
705 global _storage_api_cls
706 assert _storage_api_cls is None
707 assert issubclass(cls, StorageApi)
708 _storage_api_cls = cls
709
710
711def get_storage_api(url, namespace):
712 """Returns an object that implements low-level StorageApi interface.
713
714 It is used by Storage to work with single isolate |namespace|. It should
715 rarely be used directly by clients, see 'get_storage' for
716 a better alternative.
717
718 Arguments:
719 url: URL of isolate service to use shared cloud based storage.
720 namespace: isolate namespace to operate in, also defines hashing and
721 compression scheme used, i.e. namespace names that end with '-gzip'
722 store compressed data.
723
724 Returns:
725 Instance of StorageApi subclass.
726 """
727 cls = _storage_api_cls or IsolateServer
728 return cls(url, namespace)