blob: 4b28350a4def38edde259a6cae049564760f379c [file] [log] [blame]
aludwin81178302016-11-30 17:18:49 -08001#!/usr/bin/env python
2# Copyright 2016 The LUCI Authors. All rights reserved.
3# Use of this source code is governed under the Apache License, Version 2.0
4# that can be found in the LICENSE file.
5
6"""A low-level blob storage/retrieval interface to the Isolate server"""
7
8import base64
9import binascii
10import logging
11import re
12import sys
13import threading
14import time
15import types
16
17from utils import file_path
18from utils import net
19
20import isolated_format
21
22# gRPC may not be installed on the worker machine. This is fine, as long as
23# the bot doesn't attempt to use gRPC (checked in IsolateServerGrpc.__init__).
24try:
25 import grpc
26 from proto import isolate_bot_pb2
27except ImportError:
28 grpc = None
29 isolate_bot_pb2 = None
30
31
32# Chunk size to use when reading from network stream.
33NET_IO_FILE_CHUNK = 16 * 1024
34
35
36# Read timeout in seconds for downloads from isolate storage. If there's no
37# response from the server within this timeout whole download will be aborted.
38DOWNLOAD_READ_TIMEOUT = 60
39
40
41# A class to use to communicate with the server by default. Can be changed by
42# 'set_storage_api_class'. Default is IsolateServer.
43_storage_api_cls = None
44
45
46class Item(object):
47 """An item to push to Storage.
48
49 Its digest and size may be provided in advance, if known. Otherwise they will
50 be derived from content(). If digest is provided, it MUST correspond to
51 hash algorithm used by Storage.
52
53 When used with Storage, Item starts its life in a main thread, travels
54 to 'contains' thread, then to 'push' thread and then finally back to
55 the main thread. It is never used concurrently from multiple threads.
56 """
57
58 def __init__(self, digest=None, size=None, high_priority=False):
59 self.digest = digest
60 self.size = size
61 self.high_priority = high_priority
62 self.compression_level = 6
63
64 def content(self):
65 """Iterable with content of this item as byte string (str) chunks."""
66 raise NotImplementedError()
67
68 def prepare(self, hash_algo):
69 """Ensures self.digest and self.size are set.
70
71 Uses content() as a source of data to calculate them. Does nothing if digest
72 and size is already known.
73
74 Arguments:
75 hash_algo: hash algorithm to use to calculate digest.
76 """
77 if self.digest is None or self.size is None:
78 digest = hash_algo()
79 total = 0
80 for chunk in self.content():
81 digest.update(chunk)
82 total += len(chunk)
83 self.digest = digest.hexdigest()
84 self.size = total
85
86
87class StorageApi(object):
88 """Interface for classes that implement low-level storage operations.
89
90 StorageApi is oblivious of compression and hashing scheme used. This details
91 are handled in higher level Storage class.
92
93 Clients should generally not use StorageApi directly. Storage class is
94 preferred since it implements compression and upload optimizations.
95 """
96
97 @property
98 def location(self):
99 """URL of the backing store that this class is using."""
100 raise NotImplementedError()
101
102 @property
103 def namespace(self):
104 """Isolate namespace used by this storage.
105
106 Indirectly defines hashing scheme and compression method used.
107 """
108 raise NotImplementedError()
109
110 def fetch(self, digest, offset=0):
111 """Fetches an object and yields its content.
112
113 Arguments:
114 digest: hash digest of item to download.
115 offset: offset (in bytes) from the start of the file to resume fetch from.
116
117 Yields:
118 Chunks of downloaded item (as str objects).
119 """
120 raise NotImplementedError()
121
122 def push(self, item, push_state, content=None):
123 """Uploads an |item| with content generated by |content| generator.
124
125 |item| MUST go through 'contains' call to get |push_state| before it can
126 be pushed to the storage.
127
128 To be clear, here is one possible usage:
129 all_items = [... all items to push as Item subclasses ...]
130 for missing_item, push_state in storage_api.contains(all_items).items():
131 storage_api.push(missing_item, push_state)
132
133 When pushing to a namespace with compression, data that should be pushed
134 and data provided by the item is not the same. In that case |content| is
135 not None and it yields chunks of compressed data (using item.content() as
136 a source of original uncompressed data). This is implemented by Storage
137 class.
138
139 Arguments:
140 item: Item object that holds information about an item being pushed.
141 push_state: push state object as returned by 'contains' call.
142 content: a generator that yields chunks to push, item.content() if None.
143
144 Returns:
145 None.
146 """
147 raise NotImplementedError()
148
149 def contains(self, items):
150 """Checks for |items| on the server, prepares missing ones for upload.
151
152 Arguments:
153 items: list of Item objects to check for presence.
154
155 Returns:
156 A dict missing Item -> opaque push state object to be passed to 'push'.
157 See doc string for 'push'.
158 """
159 raise NotImplementedError()
160
161
162class _IsolateServerPushState(object):
163 """Per-item state passed from IsolateServer.contains to IsolateServer.push.
164
165 Note this needs to be a global class to support pickling.
166 """
167
168 def __init__(self, preupload_status, size):
169 self.preupload_status = preupload_status
170 gs_upload_url = preupload_status.get('gs_upload_url') or None
171 if gs_upload_url:
172 self.upload_url = gs_upload_url
173 self.finalize_url = 'api/isolateservice/v1/finalize_gs_upload'
174 else:
175 self.upload_url = 'api/isolateservice/v1/store_inline'
176 self.finalize_url = None
177 self.uploaded = False
178 self.finalized = False
179 self.size = size
180
181
182def guard_memory_use(server, content, size):
183 """Guards a server against using excessive memory while uploading.
184
185 The server needs to contain a _memory_use int and a _lock mutex
186 (both IsolateServer and IsolateServerGrpc qualify); this function
187 then uses those values to track memory usage in a thread-safe way.
188
189 If a request would cause the memory usage to exceed a safe maximum,
190 this function sleeps in 0.1s increments until memory usage falls
191 below the maximum.
192 """
193 if isinstance(content, (basestring, list)):
194 # Memory is already used, too late.
195 with server._lock:
196 server._memory_use += size
197 else:
198 # TODO(vadimsh): Do not read from |content| generator when retrying push.
199 # If |content| is indeed a generator, it can not be re-winded back to the
200 # beginning of the stream. A retry will find it exhausted. A possible
201 # solution is to wrap |content| generator with some sort of caching
202 # restartable generator. It should be done alongside streaming support
203 # implementation.
204 #
205 # In theory, we should keep the generator, so that it is not serialized in
206 # memory. Sadly net.HttpService.request() requires the body to be
207 # serialized.
208 assert isinstance(content, types.GeneratorType), repr(content)
209 slept = False
210 # HACK HACK HACK. Please forgive me for my sins but OMG, it works!
211 # One byte less than 512mb. This is to cope with incompressible content.
212 max_size = int(sys.maxsize * 0.25)
213 while True:
214 with server._lock:
215 # This is due to 32 bits python when uploading very large files. The
216 # problem is that it's comparing uncompressed sizes, while we care
217 # about compressed sizes since it's what is serialized in memory.
218 # The first check assumes large files are compressible and that by
219 # throttling one upload at once, we can survive. Otherwise, kaboom.
220 memory_use = server._memory_use
221 if ((size >= max_size and not memory_use) or
222 (memory_use + size <= max_size)):
223 server._memory_use += size
224 memory_use = server._memory_use
225 break
226 time.sleep(0.1)
227 slept = True
228 if slept:
229 logging.info('Unblocked: %d %d', memory_use, size)
230
231
232class IsolateServer(StorageApi):
233 """StorageApi implementation that downloads and uploads to Isolate Server.
234
235 It uploads and downloads directly from Google Storage whenever appropriate.
236 Works only within single namespace.
237 """
238
239 def __init__(self, base_url, namespace):
240 super(IsolateServer, self).__init__()
241 assert file_path.is_url(base_url), base_url
242 self._base_url = base_url.rstrip('/')
243 self._namespace = namespace
244 self._namespace_dict = {
245 'compression': 'flate' if namespace.endswith(
246 ('-gzip', '-flate')) else '',
247 'digest_hash': 'sha-1',
248 'namespace': namespace,
249 }
250 self._lock = threading.Lock()
251 self._server_caps = None
252 self._memory_use = 0
253
254 @property
255 def _server_capabilities(self):
256 """Gets server details.
257
258 Returns:
259 Server capabilities dictionary as returned by /server_details endpoint.
260 """
261 # TODO(maruel): Make this request much earlier asynchronously while the
262 # files are being enumerated.
263
264 # TODO(vadimsh): Put |namespace| in the URL so that server can apply
265 # namespace-level ACLs to this call.
266
267 with self._lock:
268 if self._server_caps is None:
269 self._server_caps = net.url_read_json(
270 url='%s/api/isolateservice/v1/server_details' % self._base_url,
271 data={})
272 return self._server_caps
273
274 @property
275 def location(self):
276 return self._base_url
277
278 @property
279 def namespace(self):
280 return self._namespace
281
282 def fetch(self, digest, offset=0):
283 assert offset >= 0
284 source_url = '%s/api/isolateservice/v1/retrieve' % (
285 self._base_url)
286 logging.debug('download_file(%s, %d)', source_url, offset)
287 response = self._do_fetch(source_url, digest, offset)
288
289 if not response:
290 raise IOError(
291 'Attempted to fetch from %s; no data exist: %s / %s.' % (
292 source_url, self._namespace, digest))
293
294 # for DB uploads
295 content = response.get('content')
296 if content is not None:
297 yield base64.b64decode(content)
298 return
299
300 # for GS entities
301 connection = net.url_open(response['url'])
302 if not connection:
303 raise IOError('Failed to download %s / %s' % (self._namespace, digest))
304
305 # If |offset|, verify server respects it by checking Content-Range.
306 if offset:
307 content_range = connection.get_header('Content-Range')
308 if not content_range:
309 raise IOError('Missing Content-Range header')
310
311 # 'Content-Range' format is 'bytes <offset>-<last_byte_index>/<size>'.
312 # According to a spec, <size> can be '*' meaning "Total size of the file
313 # is not known in advance".
314 try:
315 match = re.match(r'bytes (\d+)-(\d+)/(\d+|\*)', content_range)
316 if not match:
317 raise ValueError()
318 content_offset = int(match.group(1))
319 last_byte_index = int(match.group(2))
320 size = None if match.group(3) == '*' else int(match.group(3))
321 except ValueError:
322 raise IOError('Invalid Content-Range header: %s' % content_range)
323
324 # Ensure returned offset equals requested one.
325 if offset != content_offset:
326 raise IOError('Expecting offset %d, got %d (Content-Range is %s)' % (
327 offset, content_offset, content_range))
328
329 # Ensure entire tail of the file is returned.
330 if size is not None and last_byte_index + 1 != size:
331 raise IOError('Incomplete response. Content-Range: %s' % content_range)
332
333 for data in connection.iter_content(NET_IO_FILE_CHUNK):
334 yield data
335
336 def push(self, item, push_state, content=None):
337 assert isinstance(item, Item)
338 assert item.digest is not None
339 assert item.size is not None
340 assert isinstance(push_state, _IsolateServerPushState)
341 assert not push_state.finalized
342
343 # Default to item.content().
344 content = item.content() if content is None else content
345 logging.info('Push state size: %d', push_state.size)
346 guard_memory_use(self, content, push_state.size)
347
348 try:
349 # This push operation may be a retry after failed finalization call below,
350 # no need to reupload contents in that case.
351 if not push_state.uploaded:
352 # PUT file to |upload_url|.
353 success = self._do_push(push_state, content)
354 if not success:
355 raise IOError('Failed to upload file with hash %s to URL %s' % (
356 item.digest, push_state.upload_url))
357 push_state.uploaded = True
358 else:
359 logging.info(
360 'A file %s already uploaded, retrying finalization only',
361 item.digest)
362
363 # Optionally notify the server that it's done.
364 if push_state.finalize_url:
365 # TODO(vadimsh): Calculate MD5 or CRC32C sum while uploading a file and
366 # send it to isolated server. That way isolate server can verify that
367 # the data safely reached Google Storage (GS provides MD5 and CRC32C of
368 # stored files).
369 # TODO(maruel): Fix the server to accept properly data={} so
370 # url_read_json() can be used.
371 response = net.url_read_json(
372 url='%s/%s' % (self._base_url, push_state.finalize_url),
373 data={
374 'upload_ticket': push_state.preupload_status['upload_ticket'],
375 })
376 if not response or not response['ok']:
377 raise IOError('Failed to finalize file with hash %s.' % item.digest)
378 push_state.finalized = True
379 finally:
380 with self._lock:
381 self._memory_use -= push_state.size
382
383 def contains(self, items):
384 # Ensure all items were initialized with 'prepare' call. Storage does that.
385 assert all(i.digest is not None and i.size is not None for i in items)
386
387 # Request body is a json encoded list of dicts.
388 body = {
389 'items': [
390 {
391 'digest': item.digest,
392 'is_isolated': bool(item.high_priority),
393 'size': item.size,
394 } for item in items
395 ],
396 'namespace': self._namespace_dict,
397 }
398
399 query_url = '%s/api/isolateservice/v1/preupload' % self._base_url
400
401 # Response body is a list of push_urls (or null if file is already present).
402 response = None
403 try:
404 response = net.url_read_json(url=query_url, data=body)
405 if response is None:
406 raise isolated_format.MappingError(
407 'Failed to execute preupload query')
408 except ValueError as err:
409 raise isolated_format.MappingError(
410 'Invalid response from server: %s, body is %s' % (err, response))
411
412 # Pick Items that are missing, attach _PushState to them.
413 missing_items = {}
414 for preupload_status in response.get('items', []):
415 assert 'upload_ticket' in preupload_status, (
416 preupload_status, '/preupload did not generate an upload ticket')
417 index = int(preupload_status['index'])
418 missing_items[items[index]] = _IsolateServerPushState(
419 preupload_status, items[index].size)
420 logging.info('Queried %d files, %d cache hit',
421 len(items), len(items) - len(missing_items))
422 return missing_items
423
424 def _do_fetch(self, url, digest, offset):
425 """Fetches isolated data from the URL.
426
427 Used only for fetching files, not for API calls. Can be overridden in
428 subclasses.
429
430 Args:
431 url: URL to fetch the data from, can possibly return http redirect.
432 offset: byte offset inside the file to start fetching from.
433
434 Returns:
435 net.HttpResponse compatible object, with 'read' and 'get_header' calls.
436 """
437 assert isinstance(offset, int)
438 data = {
439 'digest': digest.encode('utf-8'),
440 'namespace': self._namespace_dict,
441 'offset': offset,
442 }
443 # TODO(maruel): url + '?' + urllib.urlencode(data) once a HTTP GET endpoint
444 # is added.
445 return net.url_read_json(
446 url=url,
447 data=data,
448 read_timeout=DOWNLOAD_READ_TIMEOUT)
449
450 def _do_push(self, push_state, content):
451 """Uploads isolated file to the URL.
452
453 Used only for storing files, not for API calls. Can be overridden in
454 subclasses.
455
456 Args:
457 url: URL to upload the data to.
458 push_state: an _IsolateServicePushState instance
459 item: the original Item to be uploaded
460 content: an iterable that yields 'str' chunks.
461 """
462 # A cheezy way to avoid memcpy of (possibly huge) file, until streaming
463 # upload support is implemented.
464 if isinstance(content, list) and len(content) == 1:
465 content = content[0]
466 else:
467 content = ''.join(content)
468
469 # DB upload
470 if not push_state.finalize_url:
471 url = '%s/%s' % (self._base_url, push_state.upload_url)
472 content = base64.b64encode(content)
473 data = {
474 'upload_ticket': push_state.preupload_status['upload_ticket'],
475 'content': content,
476 }
477 response = net.url_read_json(url=url, data=data)
478 return response is not None and response['ok']
479
480 # upload to GS
481 url = push_state.upload_url
482 response = net.url_read(
483 content_type='application/octet-stream',
484 data=content,
485 method='PUT',
486 headers={'Cache-Control': 'public, max-age=31536000'},
487 url=url)
488 return response is not None
489
490
491class _IsolateServerGrpcPushState(object):
492 """Empty class, just to present same interface as IsolateServer """
493
494 def __init__(self):
495 pass
496
497
498class IsolateServerGrpc(StorageApi):
499 """StorageApi implementation that downloads and uploads to a gRPC service.
500
501 Limitations: only works for the default-gzip namespace, and with zero offsets
502 while fetching.
503 """
504
505 def __init__(self, server, namespace):
506 super(IsolateServerGrpc, self).__init__()
507 logging.info('Using gRPC for Isolate')
508
509 # Make sure grpc was successfully imported
510 assert grpc
511 assert isolate_bot_pb2
512
513 # Proxies only support the default-gzip namespace for now.
514 # TODO(aludwin): support other namespaces
515 assert namespace == 'default-gzip'
516 self._server = server
517 self._channel = grpc.insecure_channel(server)
518 self._stub = isolate_bot_pb2.FileServiceStub(self._channel)
519 self._lock = threading.Lock()
520 self._memory_use = 0
521 logging.info('...gRPC successfully initialized')
522
523 @property
524 def location(self):
525 return self._server
526
527 @property
528 def namespace(self):
529 # This is used to determine if the data is compressed, but gRPC proxies
530 # don't have concepts of 'namespaces' and natively compress all messages
531 # before transmission. So return an unlikely-to-be-used name so that
532 # isolateserver doesn't try to compress anything.
533 return 'grpc-proxy'
534
535 def fetch(self, digest, offset=0):
536 # The gRPC APIs only work with an offset of 0
537 assert offset == 0
538 request = isolate_bot_pb2.FetchBlobsRequest()
539 req_digest = request.digest.add()
540 # Convert the utf-8 encoded hexidecimal string (like '012abc') to a byte
541 # array (like [0x01, 0x2a, 0xbc]).
542 req_digest.digest = binascii.unhexlify(digest)
543 expected_offset = 0
544 for response in self._stub.FetchBlobs(request):
545 if not response.status.succeeded:
546 raise IOError(
547 'Error while fetching %s: %s' % (
548 digest, response.status.error_detail))
549 if not expected_offset == response.data.offset:
550 raise IOError(
551 'Error while fetching %s: expected offset %d, got %d' % (
552 digest, expected_offset, response.data.offset))
553 offset += len(response.data.data)
554 yield response.data.data
555
556 def push(self, item, push_state, content=None):
557 assert isinstance(item, Item)
558 assert item.digest is not None
559 assert item.size is not None
560 assert isinstance(push_state, _IsolateServerGrpcPushState)
561
562 # Default to item.content().
563 content = item.content() if content is None else content
564 guard_memory_use(self, content, item.size)
565
566 try:
567 # a cheezy way to avoid memcpy of (possibly huge) file, until streaming
568 # upload support is implemented.
569 if isinstance(content, list) and len(content) == 1:
570 content = content[0]
571 else:
572 content = ''.join(content)
573 request = isolate_bot_pb2.PushBlobsRequest()
574 request.data.digest.digest = binascii.unhexlify(item.digest)
575 request.data.digest.size_bytes = item.size
576 request.data.data = content
577 # There's only one request, but the streaming API expects an interable
578 # like a list.
579 # TODO(aludwin): batch up several requests to reuse TCP connections
580 # TODO(aludwin): allow larger blobs to be broken up into chunks
581 requests = [request]
582 response = self._stub.PushBlobs(requests)
583 if not response.status.succeeded:
584 raise IOError(
585 'Error while uploading %s: %s' % (
586 item.digest, response.status.error_detail))
587
588 finally:
589 with self._lock:
590 self._memory_use -= item.size
591
592 def contains(self, items):
593 """Returns the set of all missing items."""
594 # Ensure all items were initialized with 'prepare' call. Storage does that.
595 assert all(i.digest is not None and i.size is not None for i in items)
596 request = isolate_bot_pb2.ContainsRequest()
597 items_by_digest = {}
598 for item in items:
599 cd = request.digest.add()
600 cd.digest = binascii.unhexlify(item.digest)
601 items_by_digest[cd.digest] = item
602 response = self._stub.Contains(request)
603
604 # If everything's present, return the empty set.
605 if response.status.succeeded:
606 return {}
607
608 if not response.status.error == isolate_bot_pb2.BlobStatus.MISSING_DIGEST:
609 raise IOError('Unknown response during lookup: %s' % response.status)
610
611 # Pick Items that are missing, attach _PushState to them. The gRPC
612 # implementation doesn't actually have a push state, we just attach
613 # empty objects to satisfy the StorageApi interface.
614 missing_items = {}
615 for missing in response.status.missing_digest:
616 item = items_by_digest[missing.digest]
617 missing_items[item] = _IsolateServerGrpcPushState()
618
619 logging.info('Queried %d files, %d cache hit',
620 len(items), len(items) - len(missing_items))
621 return missing_items
622
623
624def set_storage_api_class(cls):
625 """Replaces StorageApi implementation used by default."""
626 global _storage_api_cls
627 assert _storage_api_cls is None
628 assert issubclass(cls, StorageApi)
629 _storage_api_cls = cls
630
631
632def get_storage_api(url, namespace):
633 """Returns an object that implements low-level StorageApi interface.
634
635 It is used by Storage to work with single isolate |namespace|. It should
636 rarely be used directly by clients, see 'get_storage' for
637 a better alternative.
638
639 Arguments:
640 url: URL of isolate service to use shared cloud based storage.
641 namespace: isolate namespace to operate in, also defines hashing and
642 compression scheme used, i.e. namespace names that end with '-gzip'
643 store compressed data.
644
645 Returns:
646 Instance of StorageApi subclass.
647 """
648 cls = _storage_api_cls or IsolateServer
649 return cls(url, namespace)