blob: a158bbf3a3e5c5dc69e2ce70102abe944760cf6d [file] [log] [blame]
aludwin81178302016-11-30 17:18:49 -08001#!/usr/bin/env python
2# Copyright 2016 The LUCI Authors. All rights reserved.
3# Use of this source code is governed under the Apache License, Version 2.0
4# that can be found in the LICENSE file.
5
6"""A low-level blob storage/retrieval interface to the Isolate server"""
7
8import base64
aludwinbdcd1bb2016-12-08 08:39:53 -08009import collections
aludwin81178302016-11-30 17:18:49 -080010import logging
aludwin120e37e2017-06-28 13:05:58 -070011import os
aludwin81178302016-11-30 17:18:49 -080012import re
13import sys
14import threading
15import time
16import types
aludwin120e37e2017-06-28 13:05:58 -070017import uuid
aludwin81178302016-11-30 17:18:49 -080018
19from utils import file_path
20from utils import net
21
22import isolated_format
23
24# gRPC may not be installed on the worker machine. This is fine, as long as
25# the bot doesn't attempt to use gRPC (checked in IsolateServerGrpc.__init__).
aludwin120e37e2017-06-28 13:05:58 -070026# Full external requirements are: grpcio, certifi.
aludwin81178302016-11-30 17:18:49 -080027try:
28 import grpc
aludwin120e37e2017-06-28 13:05:58 -070029 from google import auth as google_auth
30 from google.auth.transport import grpc as google_auth_transport_grpc
31 from google.auth.transport import requests as google_auth_transport_requests
32 from proto import bytestream_pb2
33except ImportError as err:
aludwin81178302016-11-30 17:18:49 -080034 grpc = None
aludwin120e37e2017-06-28 13:05:58 -070035 bytestream_pb2 = None
aludwin81178302016-11-30 17:18:49 -080036
aludwin120e37e2017-06-28 13:05:58 -070037# If gRPC is installed, at least give a warning if certifi is not. This is not
38# actually used anywhere in this module, but if certifi is missing,
39# google.auth.transport will fail with
40# https://stackoverflow.com/questions/24973326
vadimshf5f441c2017-07-12 17:36:10 -070041certifi = None
aludwin120e37e2017-06-28 13:05:58 -070042if grpc is not None:
43 try:
44 import certifi
vadimshf5f441c2017-07-12 17:36:10 -070045 except ImportError:
46 # Could not import certifi; gRPC HTTPS connections may fail. This will be
47 # logged in IsolateServerGrpc.__init__, since the logger is not configured
48 # during the import time.
49 pass
aludwin81178302016-11-30 17:18:49 -080050
51# Chunk size to use when reading from network stream.
52NET_IO_FILE_CHUNK = 16 * 1024
53
54
55# Read timeout in seconds for downloads from isolate storage. If there's no
56# response from the server within this timeout whole download will be aborted.
57DOWNLOAD_READ_TIMEOUT = 60
58
59
aludwin758d2462017-06-29 16:38:50 -070060# Stores the gRPC proxy address. Must be set if the storage API class is
61# IsolateServerGrpc (call 'set_grpc_proxy').
62_grpc_proxy = None
aludwin81178302016-11-30 17:18:49 -080063
64
65class Item(object):
66 """An item to push to Storage.
67
68 Its digest and size may be provided in advance, if known. Otherwise they will
69 be derived from content(). If digest is provided, it MUST correspond to
70 hash algorithm used by Storage.
71
72 When used with Storage, Item starts its life in a main thread, travels
73 to 'contains' thread, then to 'push' thread and then finally back to
74 the main thread. It is never used concurrently from multiple threads.
75 """
76
77 def __init__(self, digest=None, size=None, high_priority=False):
78 self.digest = digest
79 self.size = size
80 self.high_priority = high_priority
81 self.compression_level = 6
82
83 def content(self):
84 """Iterable with content of this item as byte string (str) chunks."""
85 raise NotImplementedError()
86
87 def prepare(self, hash_algo):
88 """Ensures self.digest and self.size are set.
89
90 Uses content() as a source of data to calculate them. Does nothing if digest
91 and size is already known.
92
93 Arguments:
94 hash_algo: hash algorithm to use to calculate digest.
95 """
96 if self.digest is None or self.size is None:
97 digest = hash_algo()
98 total = 0
99 for chunk in self.content():
100 digest.update(chunk)
101 total += len(chunk)
102 self.digest = digest.hexdigest()
103 self.size = total
104
105
106class StorageApi(object):
107 """Interface for classes that implement low-level storage operations.
108
109 StorageApi is oblivious of compression and hashing scheme used. This details
110 are handled in higher level Storage class.
111
112 Clients should generally not use StorageApi directly. Storage class is
113 preferred since it implements compression and upload optimizations.
114 """
115
116 @property
117 def location(self):
118 """URL of the backing store that this class is using."""
119 raise NotImplementedError()
120
121 @property
122 def namespace(self):
123 """Isolate namespace used by this storage.
124
125 Indirectly defines hashing scheme and compression method used.
126 """
127 raise NotImplementedError()
128
aludwinef01fa32017-06-29 12:01:03 -0700129 @property
130 def internal_compression(self):
131 """True if this class doesn't require external compression.
132
133 If true, callers should not compress items, even if the namespace indicates
134 otherwise. Compression will be performed by the StorageApi class.
135 """
136 return False
137
aludwin81178302016-11-30 17:18:49 -0800138 def fetch(self, digest, offset=0):
139 """Fetches an object and yields its content.
140
141 Arguments:
142 digest: hash digest of item to download.
143 offset: offset (in bytes) from the start of the file to resume fetch from.
144
145 Yields:
146 Chunks of downloaded item (as str objects).
147 """
148 raise NotImplementedError()
149
150 def push(self, item, push_state, content=None):
151 """Uploads an |item| with content generated by |content| generator.
152
153 |item| MUST go through 'contains' call to get |push_state| before it can
154 be pushed to the storage.
155
156 To be clear, here is one possible usage:
157 all_items = [... all items to push as Item subclasses ...]
158 for missing_item, push_state in storage_api.contains(all_items).items():
159 storage_api.push(missing_item, push_state)
160
161 When pushing to a namespace with compression, data that should be pushed
162 and data provided by the item is not the same. In that case |content| is
163 not None and it yields chunks of compressed data (using item.content() as
164 a source of original uncompressed data). This is implemented by Storage
165 class.
166
167 Arguments:
168 item: Item object that holds information about an item being pushed.
169 push_state: push state object as returned by 'contains' call.
170 content: a generator that yields chunks to push, item.content() if None.
171
172 Returns:
173 None.
174 """
175 raise NotImplementedError()
176
177 def contains(self, items):
178 """Checks for |items| on the server, prepares missing ones for upload.
179
180 Arguments:
181 items: list of Item objects to check for presence.
182
183 Returns:
184 A dict missing Item -> opaque push state object to be passed to 'push'.
185 See doc string for 'push'.
186 """
187 raise NotImplementedError()
188
189
190class _IsolateServerPushState(object):
191 """Per-item state passed from IsolateServer.contains to IsolateServer.push.
192
193 Note this needs to be a global class to support pickling.
194 """
195
196 def __init__(self, preupload_status, size):
197 self.preupload_status = preupload_status
198 gs_upload_url = preupload_status.get('gs_upload_url') or None
199 if gs_upload_url:
200 self.upload_url = gs_upload_url
201 self.finalize_url = 'api/isolateservice/v1/finalize_gs_upload'
202 else:
203 self.upload_url = 'api/isolateservice/v1/store_inline'
204 self.finalize_url = None
205 self.uploaded = False
206 self.finalized = False
207 self.size = size
208
209
210def guard_memory_use(server, content, size):
211 """Guards a server against using excessive memory while uploading.
212
213 The server needs to contain a _memory_use int and a _lock mutex
214 (both IsolateServer and IsolateServerGrpc qualify); this function
215 then uses those values to track memory usage in a thread-safe way.
216
217 If a request would cause the memory usage to exceed a safe maximum,
218 this function sleeps in 0.1s increments until memory usage falls
219 below the maximum.
220 """
221 if isinstance(content, (basestring, list)):
222 # Memory is already used, too late.
223 with server._lock:
224 server._memory_use += size
225 else:
226 # TODO(vadimsh): Do not read from |content| generator when retrying push.
227 # If |content| is indeed a generator, it can not be re-winded back to the
228 # beginning of the stream. A retry will find it exhausted. A possible
229 # solution is to wrap |content| generator with some sort of caching
230 # restartable generator. It should be done alongside streaming support
231 # implementation.
232 #
233 # In theory, we should keep the generator, so that it is not serialized in
234 # memory. Sadly net.HttpService.request() requires the body to be
235 # serialized.
236 assert isinstance(content, types.GeneratorType), repr(content)
237 slept = False
238 # HACK HACK HACK. Please forgive me for my sins but OMG, it works!
239 # One byte less than 512mb. This is to cope with incompressible content.
240 max_size = int(sys.maxsize * 0.25)
241 while True:
242 with server._lock:
243 # This is due to 32 bits python when uploading very large files. The
244 # problem is that it's comparing uncompressed sizes, while we care
245 # about compressed sizes since it's what is serialized in memory.
246 # The first check assumes large files are compressible and that by
247 # throttling one upload at once, we can survive. Otherwise, kaboom.
248 memory_use = server._memory_use
249 if ((size >= max_size and not memory_use) or
250 (memory_use + size <= max_size)):
251 server._memory_use += size
252 memory_use = server._memory_use
253 break
254 time.sleep(0.1)
255 slept = True
256 if slept:
257 logging.info('Unblocked: %d %d', memory_use, size)
258
259
260class IsolateServer(StorageApi):
261 """StorageApi implementation that downloads and uploads to Isolate Server.
262
263 It uploads and downloads directly from Google Storage whenever appropriate.
264 Works only within single namespace.
265 """
266
267 def __init__(self, base_url, namespace):
268 super(IsolateServer, self).__init__()
269 assert file_path.is_url(base_url), base_url
270 self._base_url = base_url.rstrip('/')
271 self._namespace = namespace
272 self._namespace_dict = {
273 'compression': 'flate' if namespace.endswith(
274 ('-gzip', '-flate')) else '',
275 'digest_hash': 'sha-1',
276 'namespace': namespace,
277 }
278 self._lock = threading.Lock()
279 self._server_caps = None
280 self._memory_use = 0
281
282 @property
283 def _server_capabilities(self):
284 """Gets server details.
285
286 Returns:
287 Server capabilities dictionary as returned by /server_details endpoint.
288 """
289 # TODO(maruel): Make this request much earlier asynchronously while the
290 # files are being enumerated.
291
292 # TODO(vadimsh): Put |namespace| in the URL so that server can apply
293 # namespace-level ACLs to this call.
294
295 with self._lock:
296 if self._server_caps is None:
297 self._server_caps = net.url_read_json(
298 url='%s/api/isolateservice/v1/server_details' % self._base_url,
299 data={})
300 return self._server_caps
301
302 @property
303 def location(self):
304 return self._base_url
305
306 @property
307 def namespace(self):
308 return self._namespace
309
310 def fetch(self, digest, offset=0):
311 assert offset >= 0
312 source_url = '%s/api/isolateservice/v1/retrieve' % (
313 self._base_url)
314 logging.debug('download_file(%s, %d)', source_url, offset)
315 response = self._do_fetch(source_url, digest, offset)
316
317 if not response:
318 raise IOError(
319 'Attempted to fetch from %s; no data exist: %s / %s.' % (
320 source_url, self._namespace, digest))
321
322 # for DB uploads
323 content = response.get('content')
324 if content is not None:
325 yield base64.b64decode(content)
326 return
327
328 # for GS entities
329 connection = net.url_open(response['url'])
330 if not connection:
331 raise IOError('Failed to download %s / %s' % (self._namespace, digest))
332
333 # If |offset|, verify server respects it by checking Content-Range.
334 if offset:
335 content_range = connection.get_header('Content-Range')
336 if not content_range:
337 raise IOError('Missing Content-Range header')
338
339 # 'Content-Range' format is 'bytes <offset>-<last_byte_index>/<size>'.
340 # According to a spec, <size> can be '*' meaning "Total size of the file
341 # is not known in advance".
342 try:
343 match = re.match(r'bytes (\d+)-(\d+)/(\d+|\*)', content_range)
344 if not match:
345 raise ValueError()
346 content_offset = int(match.group(1))
347 last_byte_index = int(match.group(2))
348 size = None if match.group(3) == '*' else int(match.group(3))
349 except ValueError:
350 raise IOError('Invalid Content-Range header: %s' % content_range)
351
352 # Ensure returned offset equals requested one.
353 if offset != content_offset:
354 raise IOError('Expecting offset %d, got %d (Content-Range is %s)' % (
355 offset, content_offset, content_range))
356
357 # Ensure entire tail of the file is returned.
358 if size is not None and last_byte_index + 1 != size:
359 raise IOError('Incomplete response. Content-Range: %s' % content_range)
360
361 for data in connection.iter_content(NET_IO_FILE_CHUNK):
362 yield data
363
364 def push(self, item, push_state, content=None):
365 assert isinstance(item, Item)
366 assert item.digest is not None
367 assert item.size is not None
368 assert isinstance(push_state, _IsolateServerPushState)
369 assert not push_state.finalized
370
371 # Default to item.content().
372 content = item.content() if content is None else content
373 logging.info('Push state size: %d', push_state.size)
374 guard_memory_use(self, content, push_state.size)
375
376 try:
377 # This push operation may be a retry after failed finalization call below,
378 # no need to reupload contents in that case.
379 if not push_state.uploaded:
380 # PUT file to |upload_url|.
381 success = self._do_push(push_state, content)
382 if not success:
383 raise IOError('Failed to upload file with hash %s to URL %s' % (
384 item.digest, push_state.upload_url))
385 push_state.uploaded = True
386 else:
387 logging.info(
388 'A file %s already uploaded, retrying finalization only',
389 item.digest)
390
391 # Optionally notify the server that it's done.
392 if push_state.finalize_url:
393 # TODO(vadimsh): Calculate MD5 or CRC32C sum while uploading a file and
394 # send it to isolated server. That way isolate server can verify that
395 # the data safely reached Google Storage (GS provides MD5 and CRC32C of
396 # stored files).
397 # TODO(maruel): Fix the server to accept properly data={} so
398 # url_read_json() can be used.
399 response = net.url_read_json(
400 url='%s/%s' % (self._base_url, push_state.finalize_url),
401 data={
402 'upload_ticket': push_state.preupload_status['upload_ticket'],
403 })
404 if not response or not response['ok']:
405 raise IOError('Failed to finalize file with hash %s.' % item.digest)
406 push_state.finalized = True
407 finally:
408 with self._lock:
409 self._memory_use -= push_state.size
410
411 def contains(self, items):
412 # Ensure all items were initialized with 'prepare' call. Storage does that.
413 assert all(i.digest is not None and i.size is not None for i in items)
414
415 # Request body is a json encoded list of dicts.
416 body = {
417 'items': [
418 {
419 'digest': item.digest,
420 'is_isolated': bool(item.high_priority),
421 'size': item.size,
422 } for item in items
423 ],
424 'namespace': self._namespace_dict,
425 }
426
427 query_url = '%s/api/isolateservice/v1/preupload' % self._base_url
428
429 # Response body is a list of push_urls (or null if file is already present).
430 response = None
431 try:
432 response = net.url_read_json(url=query_url, data=body)
433 if response is None:
434 raise isolated_format.MappingError(
435 'Failed to execute preupload query')
436 except ValueError as err:
437 raise isolated_format.MappingError(
438 'Invalid response from server: %s, body is %s' % (err, response))
439
440 # Pick Items that are missing, attach _PushState to them.
441 missing_items = {}
442 for preupload_status in response.get('items', []):
443 assert 'upload_ticket' in preupload_status, (
444 preupload_status, '/preupload did not generate an upload ticket')
445 index = int(preupload_status['index'])
446 missing_items[items[index]] = _IsolateServerPushState(
447 preupload_status, items[index].size)
448 logging.info('Queried %d files, %d cache hit',
449 len(items), len(items) - len(missing_items))
450 return missing_items
451
452 def _do_fetch(self, url, digest, offset):
453 """Fetches isolated data from the URL.
454
455 Used only for fetching files, not for API calls. Can be overridden in
456 subclasses.
457
458 Args:
459 url: URL to fetch the data from, can possibly return http redirect.
460 offset: byte offset inside the file to start fetching from.
461
462 Returns:
463 net.HttpResponse compatible object, with 'read' and 'get_header' calls.
464 """
465 assert isinstance(offset, int)
466 data = {
467 'digest': digest.encode('utf-8'),
468 'namespace': self._namespace_dict,
469 'offset': offset,
470 }
471 # TODO(maruel): url + '?' + urllib.urlencode(data) once a HTTP GET endpoint
472 # is added.
473 return net.url_read_json(
474 url=url,
475 data=data,
476 read_timeout=DOWNLOAD_READ_TIMEOUT)
477
478 def _do_push(self, push_state, content):
479 """Uploads isolated file to the URL.
480
481 Used only for storing files, not for API calls. Can be overridden in
482 subclasses.
483
484 Args:
485 url: URL to upload the data to.
486 push_state: an _IsolateServicePushState instance
487 item: the original Item to be uploaded
488 content: an iterable that yields 'str' chunks.
489 """
490 # A cheezy way to avoid memcpy of (possibly huge) file, until streaming
491 # upload support is implemented.
492 if isinstance(content, list) and len(content) == 1:
493 content = content[0]
494 else:
495 content = ''.join(content)
496
497 # DB upload
498 if not push_state.finalize_url:
499 url = '%s/%s' % (self._base_url, push_state.upload_url)
500 content = base64.b64encode(content)
501 data = {
502 'upload_ticket': push_state.preupload_status['upload_ticket'],
503 'content': content,
504 }
505 response = net.url_read_json(url=url, data=data)
506 return response is not None and response['ok']
507
508 # upload to GS
509 url = push_state.upload_url
510 response = net.url_read(
511 content_type='application/octet-stream',
512 data=content,
513 method='PUT',
514 headers={'Cache-Control': 'public, max-age=31536000'},
515 url=url)
516 return response is not None
517
518
519class _IsolateServerGrpcPushState(object):
520 """Empty class, just to present same interface as IsolateServer """
521
522 def __init__(self):
523 pass
524
525
526class IsolateServerGrpc(StorageApi):
527 """StorageApi implementation that downloads and uploads to a gRPC service.
528
529 Limitations: only works for the default-gzip namespace, and with zero offsets
530 while fetching.
531 """
532
aludwin758d2462017-06-29 16:38:50 -0700533 def __init__(self, server, namespace, proxy):
aludwin81178302016-11-30 17:18:49 -0800534 super(IsolateServerGrpc, self).__init__()
535 logging.info('Using gRPC for Isolate')
vadimshf5f441c2017-07-12 17:36:10 -0700536 if not certifi:
537 logging.warning(
538 'Could not import certifi; gRPC HTTPS connections may fail')
aludwin120e37e2017-06-28 13:05:58 -0700539 self._server = server
540 self._lock = threading.Lock()
541 self._memory_use = 0
542 self._num_pushes = 0
543 self._already_exists = 0
aludwin81178302016-11-30 17:18:49 -0800544
aludwin81178302016-11-30 17:18:49 -0800545 # Proxies only support the default-gzip namespace for now.
aludwin120e37e2017-06-28 13:05:58 -0700546 # TODO(aludwin): support other namespaces if necessary
aludwin81178302016-11-30 17:18:49 -0800547 assert namespace == 'default-gzip'
aludwinef01fa32017-06-29 12:01:03 -0700548 self._namespace = namespace
549
550 # Make sure grpc was successfully imported
551 assert grpc
552 assert bytestream_pb2
aludwin120e37e2017-06-28 13:05:58 -0700553
aludwin758d2462017-06-29 16:38:50 -0700554 roots = os.environ.get('ISOLATE_GRPC_PROXY_TLS_ROOTS')
555 overd = os.environ.get('ISOLATE_GRPC_PROXY_TLS_OVERRIDE')
aludwin120e37e2017-06-28 13:05:58 -0700556
557 # The "proxy" envvar must be of the form:
558 # http[s]://<server>[:port][/prefix]
559 m = re.search('^(https?):\/\/([^\/]+)/?(.*)$', proxy)
560 if not m:
561 raise ValueError(('gRPC proxy must have the form: '
562 'http[s]://<server>[:port][/prefix] '
563 '(given: %s)') % proxy)
564 transport = m.group(1)
565 host = m.group(2)
566 prefix = m.group(3)
567 if not prefix.endswith('/'):
568 prefix = prefix + '/'
569 logging.info('gRPC proxy: transport %s, host %s, prefix %s',
570 transport, host, prefix)
571 self._prefix = prefix
572
573 if transport == 'http':
574 self._channel = grpc.insecure_channel(host)
575 elif transport == 'https':
576 # Using cloud container builder scopes for testing:
aludwin84cbcbb2017-06-30 12:13:18 -0700577 scopes = ('https://www.googleapis.com/auth/cloud-source-tools',)
aludwin120e37e2017-06-28 13:05:58 -0700578 credentials, _ = google_auth.default(scopes=scopes)
579 request = google_auth_transport_requests.Request()
580 options = ()
581 root_certs = None
582 if roots is not None:
583 logging.info('Using root CA %s', roots)
584 with open(roots) as f:
585 root_certs = f.read()
586 if overd is not None:
587 logging.info('Using TLS server override %s', overd)
588 options=(('grpc.ssl_target_name_override', overd),)
589 ssl_creds = grpc.ssl_channel_credentials(root_certificates=root_certs)
590 self._channel = google_auth_transport_grpc.secure_authorized_channel(
591 credentials, request, host, ssl_creds, options=options)
592 else:
593 raise ValueError('unknown transport %s (should be http[s])' % transport)
594 self._stub = bytestream_pb2.ByteStreamStub(self._channel)
aludwin81178302016-11-30 17:18:49 -0800595
596 @property
597 def location(self):
598 return self._server
599
600 @property
601 def namespace(self):
aludwinef01fa32017-06-29 12:01:03 -0700602 return self._namespace
603
604 @property
605 def internal_compression(self):
606 # gRPC natively compresses all messages before transmission.
607 return True
aludwin81178302016-11-30 17:18:49 -0800608
609 def fetch(self, digest, offset=0):
610 # The gRPC APIs only work with an offset of 0
611 assert offset == 0
aludwin120e37e2017-06-28 13:05:58 -0700612 request = bytestream_pb2.ReadRequest()
613 #TODO(aludwin): send the expected size of the item
614 request.resource_name = '%sblobs/%s/0' % (
615 self._prefix, digest)
aludwinee2faf72016-12-21 10:36:03 -0800616 try:
aludwin120e37e2017-06-28 13:05:58 -0700617 for response in self._stub.Read(request, timeout=DOWNLOAD_READ_TIMEOUT):
618 yield response.data
aludwinee2faf72016-12-21 10:36:03 -0800619 except grpc.RpcError as g:
620 logging.error('gRPC error during fetch: re-throwing as IOError (%s)' % g)
621 raise IOError(g)
aludwin81178302016-11-30 17:18:49 -0800622
623 def push(self, item, push_state, content=None):
624 assert isinstance(item, Item)
625 assert item.digest is not None
626 assert item.size is not None
627 assert isinstance(push_state, _IsolateServerGrpcPushState)
628
629 # Default to item.content().
630 content = item.content() if content is None else content
631 guard_memory_use(self, content, item.size)
aludwin120e37e2017-06-28 13:05:58 -0700632 self._num_pushes += 1
aludwin81178302016-11-30 17:18:49 -0800633
634 try:
aludwinbdcd1bb2016-12-08 08:39:53 -0800635 def chunker():
636 # Returns one bit of content at a time
aludwind1e59132016-12-14 08:22:41 -0800637 if (isinstance(content, str)
638 or not isinstance(content, collections.Iterable)):
aludwinbdcd1bb2016-12-08 08:39:53 -0800639 yield content
640 else:
641 for chunk in content:
642 yield chunk
643 def slicer():
644 # Ensures every bit of content is under the gRPC max size; yields
645 # proto messages to send via gRPC.
aludwin120e37e2017-06-28 13:05:58 -0700646 request = bytestream_pb2.WriteRequest()
647 u = uuid.uuid4()
648 request.resource_name = '%suploads/%s/blobs/%s/%d' % (
649 self._prefix, u, item.digest, item.size)
650 request.write_offset = 0
aludwinbdcd1bb2016-12-08 08:39:53 -0800651 for chunk in chunker():
652 # Make sure we send at least one chunk for zero-length blobs
653 has_sent_anything = False
aludwind1e59132016-12-14 08:22:41 -0800654 while chunk or not has_sent_anything:
aludwinbdcd1bb2016-12-08 08:39:53 -0800655 has_sent_anything = True
aludwin120e37e2017-06-28 13:05:58 -0700656 slice_len = min(len(chunk), NET_IO_FILE_CHUNK)
657 request.data = chunk[:slice_len]
658 if request.write_offset + slice_len == item.size:
659 request.finish_write = True
660 yield request
661 request.write_offset += slice_len
aludwinbdcd1bb2016-12-08 08:39:53 -0800662 chunk = chunk[slice_len:]
663
aludwinede55f22017-07-07 11:45:33 -0700664 response = None
aludwinee2faf72016-12-21 10:36:03 -0800665 try:
aludwin120e37e2017-06-28 13:05:58 -0700666 response = self._stub.Write(slicer())
aludwinede55f22017-07-07 11:45:33 -0700667 except grpc.RpcError as r:
668 if r.code() == grpc.StatusCode.ALREADY_EXISTS:
aludwin120e37e2017-06-28 13:05:58 -0700669 # This is legit - we didn't check before we pushed so no problem if
670 # it's already there.
671 self._already_exists += 1
672 if self._already_exists % 100 == 0:
673 logging.info('unnecessarily pushed %d/%d blobs (%.1f%%)' % (
674 self._already_exists, self._num_pushes,
675 100.0 * self._already_exists / self._num_pushes))
676 else:
aludwinede55f22017-07-07 11:45:33 -0700677 logging.error('gRPC error during push: throwing as IOError (%s)' % r)
678 raise IOError(r)
aludwin120e37e2017-06-28 13:05:58 -0700679 except Exception as e:
680 logging.error('error during push: throwing as IOError (%s)' % e)
681 raise IOError(e)
aludwinee2faf72016-12-21 10:36:03 -0800682
aludwinede55f22017-07-07 11:45:33 -0700683 if response is not None and response.committed_size != item.size:
aludwin120e37e2017-06-28 13:05:58 -0700684 raise IOError('%s/%d: incorrect size written (%d)' % (
685 item.digest, item.size, response.committed_size))
aludwin81178302016-11-30 17:18:49 -0800686
687 finally:
688 with self._lock:
689 self._memory_use -= item.size
690
691 def contains(self, items):
692 """Returns the set of all missing items."""
aludwin120e37e2017-06-28 13:05:58 -0700693 # TODO(aludwin): this isn't supported directly in Bytestream, so for now
694 # assume that nothing is present in the cache.
aludwin81178302016-11-30 17:18:49 -0800695 # Ensure all items were initialized with 'prepare' call. Storage does that.
696 assert all(i.digest is not None and i.size is not None for i in items)
aludwin120e37e2017-06-28 13:05:58 -0700697 # Assume all Items are missing, and attach _PushState to them. The gRPC
698 # implementation doesn't actually have a push state, we just attach empty
699 # objects to satisfy the StorageApi interface.
aludwin81178302016-11-30 17:18:49 -0800700 missing_items = {}
aludwin120e37e2017-06-28 13:05:58 -0700701 for item in items:
aludwin81178302016-11-30 17:18:49 -0800702 missing_items[item] = _IsolateServerGrpcPushState()
aludwin81178302016-11-30 17:18:49 -0800703 return missing_items
704
705
aludwin758d2462017-06-29 16:38:50 -0700706def set_grpc_proxy(proxy):
707 """Sets the StorageApi to use the specified proxy."""
708 global _grpc_proxy
709 assert _grpc_proxy is None
710 _grpc_proxy = proxy
aludwin81178302016-11-30 17:18:49 -0800711
712
713def get_storage_api(url, namespace):
714 """Returns an object that implements low-level StorageApi interface.
715
716 It is used by Storage to work with single isolate |namespace|. It should
717 rarely be used directly by clients, see 'get_storage' for
718 a better alternative.
719
720 Arguments:
721 url: URL of isolate service to use shared cloud based storage.
722 namespace: isolate namespace to operate in, also defines hashing and
723 compression scheme used, i.e. namespace names that end with '-gzip'
724 store compressed data.
725
726 Returns:
727 Instance of StorageApi subclass.
728 """
aludwin758d2462017-06-29 16:38:50 -0700729 if _grpc_proxy is not None:
730 return IsolateServerGrpc(url, namespace, _grpc_proxy)
731 return IsolateServer(url, namespace)