blob: 5838e6d97eddb62a6dd4c4e60a57670ec209d9cc [file] [log] [blame]
aludwin81178302016-11-30 17:18:49 -08001#!/usr/bin/env python
2# Copyright 2016 The LUCI Authors. All rights reserved.
3# Use of this source code is governed under the Apache License, Version 2.0
4# that can be found in the LICENSE file.
5
6"""A low-level blob storage/retrieval interface to the Isolate server"""
7
8import base64
9import binascii
aludwinbdcd1bb2016-12-08 08:39:53 -080010import collections
aludwin81178302016-11-30 17:18:49 -080011import logging
12import re
13import sys
14import threading
15import time
16import types
17
18from utils import file_path
19from utils import net
20
21import isolated_format
22
23# gRPC may not be installed on the worker machine. This is fine, as long as
24# the bot doesn't attempt to use gRPC (checked in IsolateServerGrpc.__init__).
25try:
26 import grpc
27 from proto import isolate_bot_pb2
28except ImportError:
29 grpc = None
30 isolate_bot_pb2 = None
31
32
33# Chunk size to use when reading from network stream.
34NET_IO_FILE_CHUNK = 16 * 1024
35
36
37# Read timeout in seconds for downloads from isolate storage. If there's no
38# response from the server within this timeout whole download will be aborted.
39DOWNLOAD_READ_TIMEOUT = 60
40
41
42# A class to use to communicate with the server by default. Can be changed by
43# 'set_storage_api_class'. Default is IsolateServer.
44_storage_api_cls = None
45
46
47class Item(object):
48 """An item to push to Storage.
49
50 Its digest and size may be provided in advance, if known. Otherwise they will
51 be derived from content(). If digest is provided, it MUST correspond to
52 hash algorithm used by Storage.
53
54 When used with Storage, Item starts its life in a main thread, travels
55 to 'contains' thread, then to 'push' thread and then finally back to
56 the main thread. It is never used concurrently from multiple threads.
57 """
58
59 def __init__(self, digest=None, size=None, high_priority=False):
60 self.digest = digest
61 self.size = size
62 self.high_priority = high_priority
63 self.compression_level = 6
64
65 def content(self):
66 """Iterable with content of this item as byte string (str) chunks."""
67 raise NotImplementedError()
68
69 def prepare(self, hash_algo):
70 """Ensures self.digest and self.size are set.
71
72 Uses content() as a source of data to calculate them. Does nothing if digest
73 and size is already known.
74
75 Arguments:
76 hash_algo: hash algorithm to use to calculate digest.
77 """
78 if self.digest is None or self.size is None:
79 digest = hash_algo()
80 total = 0
81 for chunk in self.content():
82 digest.update(chunk)
83 total += len(chunk)
84 self.digest = digest.hexdigest()
85 self.size = total
86
87
88class StorageApi(object):
89 """Interface for classes that implement low-level storage operations.
90
91 StorageApi is oblivious of compression and hashing scheme used. This details
92 are handled in higher level Storage class.
93
94 Clients should generally not use StorageApi directly. Storage class is
95 preferred since it implements compression and upload optimizations.
96 """
97
98 @property
99 def location(self):
100 """URL of the backing store that this class is using."""
101 raise NotImplementedError()
102
103 @property
104 def namespace(self):
105 """Isolate namespace used by this storage.
106
107 Indirectly defines hashing scheme and compression method used.
108 """
109 raise NotImplementedError()
110
111 def fetch(self, digest, offset=0):
112 """Fetches an object and yields its content.
113
114 Arguments:
115 digest: hash digest of item to download.
116 offset: offset (in bytes) from the start of the file to resume fetch from.
117
118 Yields:
119 Chunks of downloaded item (as str objects).
120 """
121 raise NotImplementedError()
122
123 def push(self, item, push_state, content=None):
124 """Uploads an |item| with content generated by |content| generator.
125
126 |item| MUST go through 'contains' call to get |push_state| before it can
127 be pushed to the storage.
128
129 To be clear, here is one possible usage:
130 all_items = [... all items to push as Item subclasses ...]
131 for missing_item, push_state in storage_api.contains(all_items).items():
132 storage_api.push(missing_item, push_state)
133
134 When pushing to a namespace with compression, data that should be pushed
135 and data provided by the item is not the same. In that case |content| is
136 not None and it yields chunks of compressed data (using item.content() as
137 a source of original uncompressed data). This is implemented by Storage
138 class.
139
140 Arguments:
141 item: Item object that holds information about an item being pushed.
142 push_state: push state object as returned by 'contains' call.
143 content: a generator that yields chunks to push, item.content() if None.
144
145 Returns:
146 None.
147 """
148 raise NotImplementedError()
149
150 def contains(self, items):
151 """Checks for |items| on the server, prepares missing ones for upload.
152
153 Arguments:
154 items: list of Item objects to check for presence.
155
156 Returns:
157 A dict missing Item -> opaque push state object to be passed to 'push'.
158 See doc string for 'push'.
159 """
160 raise NotImplementedError()
161
162
163class _IsolateServerPushState(object):
164 """Per-item state passed from IsolateServer.contains to IsolateServer.push.
165
166 Note this needs to be a global class to support pickling.
167 """
168
169 def __init__(self, preupload_status, size):
170 self.preupload_status = preupload_status
171 gs_upload_url = preupload_status.get('gs_upload_url') or None
172 if gs_upload_url:
173 self.upload_url = gs_upload_url
174 self.finalize_url = 'api/isolateservice/v1/finalize_gs_upload'
175 else:
176 self.upload_url = 'api/isolateservice/v1/store_inline'
177 self.finalize_url = None
178 self.uploaded = False
179 self.finalized = False
180 self.size = size
181
182
183def guard_memory_use(server, content, size):
184 """Guards a server against using excessive memory while uploading.
185
186 The server needs to contain a _memory_use int and a _lock mutex
187 (both IsolateServer and IsolateServerGrpc qualify); this function
188 then uses those values to track memory usage in a thread-safe way.
189
190 If a request would cause the memory usage to exceed a safe maximum,
191 this function sleeps in 0.1s increments until memory usage falls
192 below the maximum.
193 """
194 if isinstance(content, (basestring, list)):
195 # Memory is already used, too late.
196 with server._lock:
197 server._memory_use += size
198 else:
199 # TODO(vadimsh): Do not read from |content| generator when retrying push.
200 # If |content| is indeed a generator, it can not be re-winded back to the
201 # beginning of the stream. A retry will find it exhausted. A possible
202 # solution is to wrap |content| generator with some sort of caching
203 # restartable generator. It should be done alongside streaming support
204 # implementation.
205 #
206 # In theory, we should keep the generator, so that it is not serialized in
207 # memory. Sadly net.HttpService.request() requires the body to be
208 # serialized.
209 assert isinstance(content, types.GeneratorType), repr(content)
210 slept = False
211 # HACK HACK HACK. Please forgive me for my sins but OMG, it works!
212 # One byte less than 512mb. This is to cope with incompressible content.
213 max_size = int(sys.maxsize * 0.25)
214 while True:
215 with server._lock:
216 # This is due to 32 bits python when uploading very large files. The
217 # problem is that it's comparing uncompressed sizes, while we care
218 # about compressed sizes since it's what is serialized in memory.
219 # The first check assumes large files are compressible and that by
220 # throttling one upload at once, we can survive. Otherwise, kaboom.
221 memory_use = server._memory_use
222 if ((size >= max_size and not memory_use) or
223 (memory_use + size <= max_size)):
224 server._memory_use += size
225 memory_use = server._memory_use
226 break
227 time.sleep(0.1)
228 slept = True
229 if slept:
230 logging.info('Unblocked: %d %d', memory_use, size)
231
232
233class IsolateServer(StorageApi):
234 """StorageApi implementation that downloads and uploads to Isolate Server.
235
236 It uploads and downloads directly from Google Storage whenever appropriate.
237 Works only within single namespace.
238 """
239
240 def __init__(self, base_url, namespace):
241 super(IsolateServer, self).__init__()
242 assert file_path.is_url(base_url), base_url
243 self._base_url = base_url.rstrip('/')
244 self._namespace = namespace
245 self._namespace_dict = {
246 'compression': 'flate' if namespace.endswith(
247 ('-gzip', '-flate')) else '',
248 'digest_hash': 'sha-1',
249 'namespace': namespace,
250 }
251 self._lock = threading.Lock()
252 self._server_caps = None
253 self._memory_use = 0
254
255 @property
256 def _server_capabilities(self):
257 """Gets server details.
258
259 Returns:
260 Server capabilities dictionary as returned by /server_details endpoint.
261 """
262 # TODO(maruel): Make this request much earlier asynchronously while the
263 # files are being enumerated.
264
265 # TODO(vadimsh): Put |namespace| in the URL so that server can apply
266 # namespace-level ACLs to this call.
267
268 with self._lock:
269 if self._server_caps is None:
270 self._server_caps = net.url_read_json(
271 url='%s/api/isolateservice/v1/server_details' % self._base_url,
272 data={})
273 return self._server_caps
274
275 @property
276 def location(self):
277 return self._base_url
278
279 @property
280 def namespace(self):
281 return self._namespace
282
283 def fetch(self, digest, offset=0):
284 assert offset >= 0
285 source_url = '%s/api/isolateservice/v1/retrieve' % (
286 self._base_url)
287 logging.debug('download_file(%s, %d)', source_url, offset)
288 response = self._do_fetch(source_url, digest, offset)
289
290 if not response:
291 raise IOError(
292 'Attempted to fetch from %s; no data exist: %s / %s.' % (
293 source_url, self._namespace, digest))
294
295 # for DB uploads
296 content = response.get('content')
297 if content is not None:
298 yield base64.b64decode(content)
299 return
300
301 # for GS entities
302 connection = net.url_open(response['url'])
303 if not connection:
304 raise IOError('Failed to download %s / %s' % (self._namespace, digest))
305
306 # If |offset|, verify server respects it by checking Content-Range.
307 if offset:
308 content_range = connection.get_header('Content-Range')
309 if not content_range:
310 raise IOError('Missing Content-Range header')
311
312 # 'Content-Range' format is 'bytes <offset>-<last_byte_index>/<size>'.
313 # According to a spec, <size> can be '*' meaning "Total size of the file
314 # is not known in advance".
315 try:
316 match = re.match(r'bytes (\d+)-(\d+)/(\d+|\*)', content_range)
317 if not match:
318 raise ValueError()
319 content_offset = int(match.group(1))
320 last_byte_index = int(match.group(2))
321 size = None if match.group(3) == '*' else int(match.group(3))
322 except ValueError:
323 raise IOError('Invalid Content-Range header: %s' % content_range)
324
325 # Ensure returned offset equals requested one.
326 if offset != content_offset:
327 raise IOError('Expecting offset %d, got %d (Content-Range is %s)' % (
328 offset, content_offset, content_range))
329
330 # Ensure entire tail of the file is returned.
331 if size is not None and last_byte_index + 1 != size:
332 raise IOError('Incomplete response. Content-Range: %s' % content_range)
333
334 for data in connection.iter_content(NET_IO_FILE_CHUNK):
335 yield data
336
337 def push(self, item, push_state, content=None):
338 assert isinstance(item, Item)
339 assert item.digest is not None
340 assert item.size is not None
341 assert isinstance(push_state, _IsolateServerPushState)
342 assert not push_state.finalized
343
344 # Default to item.content().
345 content = item.content() if content is None else content
346 logging.info('Push state size: %d', push_state.size)
347 guard_memory_use(self, content, push_state.size)
348
349 try:
350 # This push operation may be a retry after failed finalization call below,
351 # no need to reupload contents in that case.
352 if not push_state.uploaded:
353 # PUT file to |upload_url|.
354 success = self._do_push(push_state, content)
355 if not success:
356 raise IOError('Failed to upload file with hash %s to URL %s' % (
357 item.digest, push_state.upload_url))
358 push_state.uploaded = True
359 else:
360 logging.info(
361 'A file %s already uploaded, retrying finalization only',
362 item.digest)
363
364 # Optionally notify the server that it's done.
365 if push_state.finalize_url:
366 # TODO(vadimsh): Calculate MD5 or CRC32C sum while uploading a file and
367 # send it to isolated server. That way isolate server can verify that
368 # the data safely reached Google Storage (GS provides MD5 and CRC32C of
369 # stored files).
370 # TODO(maruel): Fix the server to accept properly data={} so
371 # url_read_json() can be used.
372 response = net.url_read_json(
373 url='%s/%s' % (self._base_url, push_state.finalize_url),
374 data={
375 'upload_ticket': push_state.preupload_status['upload_ticket'],
376 })
377 if not response or not response['ok']:
378 raise IOError('Failed to finalize file with hash %s.' % item.digest)
379 push_state.finalized = True
380 finally:
381 with self._lock:
382 self._memory_use -= push_state.size
383
384 def contains(self, items):
385 # Ensure all items were initialized with 'prepare' call. Storage does that.
386 assert all(i.digest is not None and i.size is not None for i in items)
387
388 # Request body is a json encoded list of dicts.
389 body = {
390 'items': [
391 {
392 'digest': item.digest,
393 'is_isolated': bool(item.high_priority),
394 'size': item.size,
395 } for item in items
396 ],
397 'namespace': self._namespace_dict,
398 }
399
400 query_url = '%s/api/isolateservice/v1/preupload' % self._base_url
401
402 # Response body is a list of push_urls (or null if file is already present).
403 response = None
404 try:
405 response = net.url_read_json(url=query_url, data=body)
406 if response is None:
407 raise isolated_format.MappingError(
408 'Failed to execute preupload query')
409 except ValueError as err:
410 raise isolated_format.MappingError(
411 'Invalid response from server: %s, body is %s' % (err, response))
412
413 # Pick Items that are missing, attach _PushState to them.
414 missing_items = {}
415 for preupload_status in response.get('items', []):
416 assert 'upload_ticket' in preupload_status, (
417 preupload_status, '/preupload did not generate an upload ticket')
418 index = int(preupload_status['index'])
419 missing_items[items[index]] = _IsolateServerPushState(
420 preupload_status, items[index].size)
421 logging.info('Queried %d files, %d cache hit',
422 len(items), len(items) - len(missing_items))
423 return missing_items
424
425 def _do_fetch(self, url, digest, offset):
426 """Fetches isolated data from the URL.
427
428 Used only for fetching files, not for API calls. Can be overridden in
429 subclasses.
430
431 Args:
432 url: URL to fetch the data from, can possibly return http redirect.
433 offset: byte offset inside the file to start fetching from.
434
435 Returns:
436 net.HttpResponse compatible object, with 'read' and 'get_header' calls.
437 """
438 assert isinstance(offset, int)
439 data = {
440 'digest': digest.encode('utf-8'),
441 'namespace': self._namespace_dict,
442 'offset': offset,
443 }
444 # TODO(maruel): url + '?' + urllib.urlencode(data) once a HTTP GET endpoint
445 # is added.
446 return net.url_read_json(
447 url=url,
448 data=data,
449 read_timeout=DOWNLOAD_READ_TIMEOUT)
450
451 def _do_push(self, push_state, content):
452 """Uploads isolated file to the URL.
453
454 Used only for storing files, not for API calls. Can be overridden in
455 subclasses.
456
457 Args:
458 url: URL to upload the data to.
459 push_state: an _IsolateServicePushState instance
460 item: the original Item to be uploaded
461 content: an iterable that yields 'str' chunks.
462 """
463 # A cheezy way to avoid memcpy of (possibly huge) file, until streaming
464 # upload support is implemented.
465 if isinstance(content, list) and len(content) == 1:
466 content = content[0]
467 else:
468 content = ''.join(content)
469
470 # DB upload
471 if not push_state.finalize_url:
472 url = '%s/%s' % (self._base_url, push_state.upload_url)
473 content = base64.b64encode(content)
474 data = {
475 'upload_ticket': push_state.preupload_status['upload_ticket'],
476 'content': content,
477 }
478 response = net.url_read_json(url=url, data=data)
479 return response is not None and response['ok']
480
481 # upload to GS
482 url = push_state.upload_url
483 response = net.url_read(
484 content_type='application/octet-stream',
485 data=content,
486 method='PUT',
487 headers={'Cache-Control': 'public, max-age=31536000'},
488 url=url)
489 return response is not None
490
491
492class _IsolateServerGrpcPushState(object):
493 """Empty class, just to present same interface as IsolateServer """
494
495 def __init__(self):
496 pass
497
498
499class IsolateServerGrpc(StorageApi):
500 """StorageApi implementation that downloads and uploads to a gRPC service.
501
502 Limitations: only works for the default-gzip namespace, and with zero offsets
503 while fetching.
504 """
505
506 def __init__(self, server, namespace):
507 super(IsolateServerGrpc, self).__init__()
508 logging.info('Using gRPC for Isolate')
509
510 # Make sure grpc was successfully imported
511 assert grpc
512 assert isolate_bot_pb2
513
514 # Proxies only support the default-gzip namespace for now.
515 # TODO(aludwin): support other namespaces
516 assert namespace == 'default-gzip'
517 self._server = server
518 self._channel = grpc.insecure_channel(server)
519 self._stub = isolate_bot_pb2.FileServiceStub(self._channel)
520 self._lock = threading.Lock()
521 self._memory_use = 0
522 logging.info('...gRPC successfully initialized')
523
524 @property
525 def location(self):
526 return self._server
527
528 @property
529 def namespace(self):
530 # This is used to determine if the data is compressed, but gRPC proxies
531 # don't have concepts of 'namespaces' and natively compress all messages
532 # before transmission. So return an unlikely-to-be-used name so that
533 # isolateserver doesn't try to compress anything.
534 return 'grpc-proxy'
535
536 def fetch(self, digest, offset=0):
537 # The gRPC APIs only work with an offset of 0
538 assert offset == 0
539 request = isolate_bot_pb2.FetchBlobsRequest()
540 req_digest = request.digest.add()
541 # Convert the utf-8 encoded hexidecimal string (like '012abc') to a byte
542 # array (like [0x01, 0x2a, 0xbc]).
543 req_digest.digest = binascii.unhexlify(digest)
544 expected_offset = 0
545 for response in self._stub.FetchBlobs(request):
546 if not response.status.succeeded:
547 raise IOError(
aludwinbdcd1bb2016-12-08 08:39:53 -0800548 'Error while fetching %s: %s' % (digest, response.status))
aludwin81178302016-11-30 17:18:49 -0800549 if not expected_offset == response.data.offset:
550 raise IOError(
551 'Error while fetching %s: expected offset %d, got %d' % (
552 digest, expected_offset, response.data.offset))
aludwinbdcd1bb2016-12-08 08:39:53 -0800553 expected_offset += len(response.data.data)
aludwin81178302016-11-30 17:18:49 -0800554 yield response.data.data
555
556 def push(self, item, push_state, content=None):
557 assert isinstance(item, Item)
558 assert item.digest is not None
559 assert item.size is not None
560 assert isinstance(push_state, _IsolateServerGrpcPushState)
561
562 # Default to item.content().
563 content = item.content() if content is None else content
564 guard_memory_use(self, content, item.size)
565
566 try:
aludwinbdcd1bb2016-12-08 08:39:53 -0800567 def chunker():
568 # Returns one bit of content at a time
569 if not isinstance(content, collections.Iterable):
570 yield content
571 else:
572 for chunk in content:
573 yield chunk
574 def slicer():
575 # Ensures every bit of content is under the gRPC max size; yields
576 # proto messages to send via gRPC.
577 request = isolate_bot_pb2.PushBlobsRequest()
578 request.data.digest.digest = binascii.unhexlify(item.digest)
579 request.data.digest.size_bytes = item.size
580 request.data.offset = 0
581 for chunk in chunker():
582 # Make sure we send at least one chunk for zero-length blobs
583 has_sent_anything = False
584 while chunk and not has_sent_anything:
585 slice_len = min(len(chunk), NET_IO_FILE_CHUNK)
586 request.data.data = chunk[:slice_len]
587 yield request
588 has_sent_anything = True
589 request.data.offset += slice_len
590 # The proxy only expects the first chunk to have the digest
591 request.data.ClearField("digest")
592 chunk = chunk[slice_len:]
593
aludwin81178302016-11-30 17:18:49 -0800594 # TODO(aludwin): batch up several requests to reuse TCP connections
aludwinbdcd1bb2016-12-08 08:39:53 -0800595 response = self._stub.PushBlobs(slicer())
aludwin81178302016-11-30 17:18:49 -0800596 if not response.status.succeeded:
597 raise IOError(
598 'Error while uploading %s: %s' % (
599 item.digest, response.status.error_detail))
600
601 finally:
602 with self._lock:
603 self._memory_use -= item.size
604
605 def contains(self, items):
606 """Returns the set of all missing items."""
607 # Ensure all items were initialized with 'prepare' call. Storage does that.
608 assert all(i.digest is not None and i.size is not None for i in items)
609 request = isolate_bot_pb2.ContainsRequest()
610 items_by_digest = {}
611 for item in items:
612 cd = request.digest.add()
613 cd.digest = binascii.unhexlify(item.digest)
614 items_by_digest[cd.digest] = item
615 response = self._stub.Contains(request)
616
617 # If everything's present, return the empty set.
618 if response.status.succeeded:
619 return {}
620
621 if not response.status.error == isolate_bot_pb2.BlobStatus.MISSING_DIGEST:
622 raise IOError('Unknown response during lookup: %s' % response.status)
623
624 # Pick Items that are missing, attach _PushState to them. The gRPC
625 # implementation doesn't actually have a push state, we just attach
626 # empty objects to satisfy the StorageApi interface.
627 missing_items = {}
628 for missing in response.status.missing_digest:
629 item = items_by_digest[missing.digest]
630 missing_items[item] = _IsolateServerGrpcPushState()
631
632 logging.info('Queried %d files, %d cache hit',
633 len(items), len(items) - len(missing_items))
634 return missing_items
635
636
637def set_storage_api_class(cls):
638 """Replaces StorageApi implementation used by default."""
639 global _storage_api_cls
640 assert _storage_api_cls is None
641 assert issubclass(cls, StorageApi)
642 _storage_api_cls = cls
643
644
645def get_storage_api(url, namespace):
646 """Returns an object that implements low-level StorageApi interface.
647
648 It is used by Storage to work with single isolate |namespace|. It should
649 rarely be used directly by clients, see 'get_storage' for
650 a better alternative.
651
652 Arguments:
653 url: URL of isolate service to use shared cloud based storage.
654 namespace: isolate namespace to operate in, also defines hashing and
655 compression scheme used, i.e. namespace names that end with '-gzip'
656 store compressed data.
657
658 Returns:
659 Instance of StorageApi subclass.
660 """
661 cls = _storage_api_cls or IsolateServer
662 return cls(url, namespace)