gs_archive_server: modify extract rpc to return bin file data
The extract RPC in gs_archive_server was originally written to return
text file data. This usage is deprectaed due to high level design
changes. This CL updates this RPC to return bin data extracted from a
tar file.
BUG=chromium:1077131
TEST=Manually tested on chromeos2-devservertest
Change-Id: I0c142f645c15285ad00c0c0edcc1d88dc69cf2df
Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/platform/dev-util/+/2207529
Tested-by: Sanika Kulkarni <sanikak@chromium.org>
Commit-Queue: Sanika Kulkarni <sanikak@chromium.org>
Auto-Submit: Sanika Kulkarni <sanikak@chromium.org>
Reviewed-by: Congbin Guo <guocb@chromium.org>
diff --git a/gs_cache/gs_archive_server.py b/gs_cache/gs_archive_server.py
index 55513f3..3467965 100644
--- a/gs_cache/gs_archive_server.py
+++ b/gs_cache/gs_archive_server.py
@@ -34,11 +34,10 @@
import urllib
import urlparse
-import cherrypy
import requests
+import cherrypy
import constants
-import range_response
import tarfile_utils
from chromite.lib import cros_logging as logging
from chromite.lib import gs
@@ -115,18 +114,13 @@
Raises:
Raise HTTP 400 error if no valid parameter in |all_params|.
"""
- result = set()
try:
value = all_params[param_name]
except KeyError:
raise cherrypy.HTTPError(httplib.BAD_REQUEST,
'Parameter "%s" is required!' % param_name)
- if isinstance(value, list):
- result |= set(value)
- else:
- result.add(value)
- return result
+ return set(value) if isinstance(value, list) else {value}
def _to_cherrypy_error(func):
@@ -148,20 +142,19 @@
return func_wrapper
-def _search_lines_by_pattern(all_lines, patterns):
+def _search_lines_by_pattern(all_lines, pattern):
"""Search plain text lines which matches one of shell style glob |patterns|.
Args:
all_lines: A list or an iterable object of all plain text lines to be
searched.
- patterns: A list of pattern that the target lines matched.
+ pattern: A pattern that the target lines matched.
Returns:
A set of found lines.
"""
found_lines = set()
- for pattern in patterns:
- found_lines |= set(fnmatch.filter(all_lines, pattern))
+ found_lines |= set(fnmatch.filter(all_lines, pattern))
return found_lines
@@ -259,6 +252,10 @@
return self._call('list_member', path, headers=headers)
+class GsArchiveServerError(Exception):
+ """Standard exception class for GsArchiveServer."""
+
+
class GsArchiveServer(object):
"""The backend of Google Storage Cache server."""
@@ -412,7 +409,10 @@
'/'.join(args),
ext_names=['.tar', '.tar.gz', '.tgz', '.tar.bz2', '.tar.xz'])
files = _safe_get_param(kwargs, 'file')
- _log('Extracting "%s" from "%s".', files, archive)
+ if len(files) != 1:
+ raise GsArchiveServerError('Cannot extract more than one file at a time.')
+ file_to_be_extracted = files.pop()
+ _log('Extracting "%s" from "%s".', file_to_be_extracted, archive)
archive_basename, archive_extname = os.path.splitext(archive)
headers = cherrypy.request.headers.copy()
@@ -434,29 +434,26 @@
else:
decompressed_archive_name = archive_basename
- return self._extract_files_from_tar(files, decompressed_archive_name,
- headers)
+ return self._extract_file_from_tar(file_to_be_extracted,
+ decompressed_archive_name, headers)
- def _extract_files_from_tar(self, files, archive, headers=None):
- """Extract files from |archive| with http headers |headers|."""
+ def _extract_file_from_tar(self, target_file, archive, headers=None):
+ """Extract file from |archive| with http headers |headers|."""
# Call `list_member` and search |filename| in it. If found, create another
# "Range Request" to download that range of bytes.
all_files = self._caching_server.list_member(archive, headers=headers)
- # The format of each line is '<filename>,<data1>,<data2>...'. And the
- # filename is encoded by URL percent encoding, so no ',' in it. Thus
- # we can match the line using the pattern of '<input pattern>,*'
- target_files = ['%s,*' % urllib.unquote(f) for f in files]
-
# Loading the file list into memory doesn't consume too much memory (usually
# just a few MBs), but which is very helpful for us to search.
found_lines = _search_lines_by_pattern(
list(all_files.iter_lines(chunk_size=constants.READ_BUFFER_SIZE_BYTES)),
- target_files
+ '%s,*' % target_file
)
if not found_lines:
- return '{}'
+ _log('No matching files found for %s.', target_file,
+ level=logging.WARNING)
+ yield None
# Too many ranges may result in error of 'request header too long'. So we
# split the files into chunks and request one by one.
@@ -466,14 +463,11 @@
for line in found_lines],
_MAX_RANGES_PER_REQUEST)
- streamer = range_response.JsonStreamer()
for part_of_found_files in found_files:
ranges = [(int(f.content_start), int(f.content_start) + int(f.size) - 1)
for f in part_of_found_files]
rsp = self._send_range_request(archive, ranges, headers)
- streamer.queue_response(rsp, part_of_found_files)
-
- return streamer.stream()
+ yield rsp.content
def _send_range_request(self, archive, ranges, headers):
"""Create and send a "Range Request" to caching server.
diff --git a/gs_cache/range_response.py b/gs_cache/range_response.py
deleted file mode 100644
index c8c4dac..0000000
--- a/gs_cache/range_response.py
+++ /dev/null
@@ -1,294 +0,0 @@
-# -*- coding: utf-8 -*-
-# Copyright 2018 The Chromium OS Authors. All rights reserved.
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-"""This module provides utils to handle response of "Range Request"."""
-
-from __future__ import absolute_import
-from __future__ import division
-from __future__ import print_function
-
-import collections
-import itertools
-import json
-import re
-
-import constants
-
-_RANGE_HEADER_SEPARATORS = re.compile('[-/ ]')
-_ONE_LINE = object() # Special object to indicate data reader to read one line.
-
-_ContentRangeHeader = collections.namedtuple('_ContentRangeHeader',
- ('bytes', 'start', 'end', 'total'))
-
-
-class FormatError(Exception):
- """Exception raised when we parse wrong format of response."""
-
-
-class NoFileFoundError(Exception):
- """Exception raised when we cannot get a file match the range."""
-
-
-class ResponseQueueError(Exception):
- """Exception raised when trying to queue responses not allowed."""
-
-
-def _get_file_by_range_header(range_header_str, file_name_map):
- """Get file name and size by the Content-Range header.
-
- The format of Content-Range header is like:
- Content-Range: bytes <start>-<end>/<total>
- We get the <start> and <end> from it and retrieve the file name from
- |file_name_map|.
-
- Args:
- range_header_str: A string of range header.
- file_name_map: A dict of {(<start:str>, <size:int>): filename, ...}.
-
- Returns:
- A tuple of (filename, size).
-
- Raises:
- FormatError: Raised when response content interrupted.
- NoFileFoundError: Raised when we cannot get a file matches the range.
- """
- # Split the part of 'Content-Range:' first if needed.
- if range_header_str.lower().startswith('content-range:'):
- range_header_str = range_header_str.split(': ', 1)[1]
-
- try:
- range_header = _ContentRangeHeader._make(
- _RANGE_HEADER_SEPARATORS.split(range_header_str)
- )
- size = int(range_header.end) - int(range_header.start) + 1
- except (IndexError, ValueError):
- raise FormatError('Wrong format of content range header: %s' %
- range_header_str)
-
- try:
- filename = file_name_map[(range_header.start, size)]
- except KeyError:
- raise NoFileFoundError('Cannot find a file matches the range %s' %
- range_header_str)
-
- return filename, size
-
-
-class JsonStreamer(object):
- """A class to stream the responses for range requests.
-
- The class accepts responses and format the file content in all of them as a
- JSON stream. The format:
- '{"<filename>": "<content>", "<filename>": "<content>", ...}'
- """
-
- def __init__(self):
- self._files_iter_list = []
- self._can_add_more_response = True
-
- def queue_response(self, response, file_info_list):
- """Add a reponse to the queue to be streamed as JSON.
-
- We can add either:
- 1. one and only one response for single-part range requests, or
- 2. a series of responses for multi-part range requests.
-
- Args:
- response: An instance of requests.Response, which may be the response of a
- single range request, or a multi-part range request.
- file_info_list: A list of tarfile_utils.TarMemberInfo. We use it to look
- up file name by content start offset and size.
-
- Raises:
- FormatError: Raised when response to be queued isn't for a range request.
- ResponseQueueError: Raised when either queuing more than one response for
- single-part range request, or mixed responses for single-part and
- multi-part range request.
- """
- if not self._can_add_more_response:
- raise ResponseQueueError(
- 'No more reponses can be added when there was a response for '
- 'single-part range request in the queue!')
-
- file_name_map = {(f.content_start, int(f.size)): f.filename
- for f in file_info_list}
-
- # Check if the response is for single range, or multi-part range. For a
- # single range request, the response must have header 'Content-Range'. For a
- # multi-part ranges request, the Content-Type header must be like
- # 'multipart/byteranges; ......'.
- content_range = response.headers.get('Content-Range', None)
- content_type = response.headers.get('Content-Type', '')
-
- if content_range:
- if self._files_iter_list:
- raise ResponseQueueError(
- 'Cannot queue more than one responses for single-part range '
- 'request, or mix responses for single-part and multi-part.')
- filename, _ = _get_file_by_range_header(content_range, file_name_map)
- self._files_iter_list = [iter([(filename, response.content)])]
- self._can_add_more_response = False
-
- elif content_type.startswith('multipart/byteranges;'):
- self._files_iter_list.append(
- _file_iterator(response, file_name_map))
-
- else:
- raise FormatError('The response is not for a range request.')
-
- def stream(self):
- """Yield the series of responses content as a JSON stream.
-
- Yields:
- A JSON stream in format described above.
- """
- files_iter = itertools.chain(*self._files_iter_list)
-
- json_encoder = json.JSONEncoder()
- filename, content = next(files_iter)
- yield '{%s: %s' % (json_encoder.encode(filename),
- json_encoder.encode(content))
- for filename, content in files_iter:
- yield ', %s: %s' % (json_encoder.encode(filename),
- json_encoder.encode(content))
- yield '}'
-
-
-def _data_reader(data_iter):
- """A coroutine to read data from |data_iter|.
-
- It accepts two type of parameter:
- 1. _ONE_LINE: Read one CRLF ended line if possible.
- 2. An integer N: Read at most N bytes.
-
- Args:
- data_iter: An iterator of data source.
-
- Yields:
- The data read.
- """
- buffered = next(data_iter)
-
- # Get what to be read in runtime by passing value into the generator. See
- # https://docs.python.org/2.5/whatsnew/pep-342.html for syntax details.
- to_be_read = yield
-
- while True:
- if to_be_read is _ONE_LINE:
- parts = buffered.split('\r\n', 1)
- if len(parts) == 2:
- line, buffered = parts
- to_be_read = (yield line)
- continue
-
- else: # Read at most |to_be_read| bytes of data.
- bytes_remaining = to_be_read - len(buffered)
- if bytes_remaining < 0:
- read_bytes = buffered[:bytes_remaining]
- buffered = buffered[bytes_remaining:]
- to_be_read = (yield read_bytes)
- continue
-
- try:
- buffered += next(data_iter)
- except StopIteration:
- break
-
- if buffered:
- yield buffered
-
-
-def _read_line(reader):
- """Read one CRLF ended line from the response.
-
- Returns:
- The line read. Return None if nothing to read.
- """
- return reader.send(_ONE_LINE)
-
-
-def _read_empty_line(reader):
- """Read one line and assert it is empty."""
- try:
- line = _read_line(reader)
- except StopIteration:
- raise FormatError('Expect an empty line, but got EOF.')
- if line:
- raise FormatError('Expect an empty line, but got "%s".' % line)
-
-
-def _read_bytes(reader, max_bytes):
- """Read at most |max_bytes| bytes from the reader.
-
- Args:
- reader:
- max_bytes: An integer of maximum bytes of bytes to read.
-
- Returns:
- The bytes read. Return None if nothing to read.
- """
- return reader.send(max_bytes)
-
-
-def _file_iterator(response, file_name_map):
- """The iterator of files in a response of multi-part range request.
-
- An example response is like:
-
- HTTP/1.1 206 Partial Content
- Content-Type: multipart/byteranges; boundary=magic_string
- Content-Length: 282
-
- --magic_string
- Content-Type: text/html
- Content-Range: bytes 0-50/1270
-
- <data>
- --magic_string
- Content-Type: text/html
- Content-Range: bytes 100-150/1270
-
- <data>
- --magic_string--
-
- In our application, each part is the content of a file. This class iterates
- the files.
-
- Args:
- response: An instance of requests.response.
- file_name_map: A dict of {(<start:str>, <size:int>): filename, ...}.
-
- Yields:
- A pair of (name, content) of the file.
-
- Raises:
- FormatError: Raised when response content interrupted.
- """
- reader = _data_reader(
- response.iter_content(constants.READ_BUFFER_SIZE_BYTES))
- reader.next() # initialize the coroutine
-
- _read_empty_line(reader) # The first line is empty.
- while True:
- _read_line(reader) # The second line is the boundary.
- _read_line(reader) # The line sub content type.
- sub_range_header = _read_line(reader) # The line of sub content range.
- if sub_range_header is None:
- break
- _read_empty_line(reader) # Another empty line.
-
- filename, size = _get_file_by_range_header(sub_range_header,
- file_name_map)
- content = _read_bytes(reader, size)
-
- _read_empty_line(reader) # Every content has a trailing '\r\n'.
-
- bytes_read = 0 if content is None else len(content)
- if bytes_read != size:
- raise FormatError(
- '%s: Error in reading content (read %d B, expect %d B)' %
- (filename, bytes_read, size)
- )
-
- yield filename, content
diff --git a/gs_cache/tests/range_response_test.py b/gs_cache/tests/range_response_test.py
deleted file mode 100644
index 8f51649..0000000
--- a/gs_cache/tests/range_response_test.py
+++ /dev/null
@@ -1,161 +0,0 @@
-# -*- coding: utf-8 -*-
-# Copyright 2018 The Chromium OS Authors. All rights reserved.
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-
-"""Tests of range_response."""
-
-from __future__ import absolute_import
-from __future__ import division
-from __future__ import print_function
-
-import json
-import unittest
-
-import mock
-
-import range_response
-import tarfile_utils
-
-
-# pylint: disable=protected-access
-class JsonStreamerBasicTest(unittest.TestCase):
- """Basic test case for range_response.JsonStreamer."""
-
- def setUp(self):
- self.streamer = range_response.JsonStreamer()
- self.single_part_response = mock.MagicMock()
- self.single_part_response.headers = {'Content-Range': 'bytes 100-1099/*'}
- self.single_part_response.content = 'A' * 1000
- self.file_info_list = [tarfile_utils.TarMemberInfo('foo', '', '', '100',
- '1000')]
-
- def test_single_part_response(self):
- """Test handling of single-part response."""
- self.streamer.queue_response(self.single_part_response, self.file_info_list)
- result = json.loads(''.join(self.streamer.stream()))
- self.assertDictEqual(result, {'foo': 'A' * 1000})
-
- def test_add_response_not_for_range_request(self):
- """Test add response which not for range request."""
- response = mock.MagicMock()
- response.headers = {}
- with self.assertRaises(range_response.FormatError):
- self.streamer.queue_response(response, [])
-
- def test_add_two_single_part_response(self):
- """Test adding two single-part response."""
- self.streamer.queue_response(self.single_part_response, self.file_info_list)
- with self.assertRaises(range_response.ResponseQueueError):
- self.streamer.queue_response(self.single_part_response, [])
-
- def test_add_single_part_after_a_multi_part(self):
- """Test adding a single-part response after some multi-part responses."""
- response = mock.MagicMock()
- response.headers = {
- 'Content-Type': 'multipart/byteranges; boundary=boundary',
- }
- response.iter_content.return_value = iter([''])
- self.streamer.queue_response(response, self.file_info_list)
-
- with self.assertRaises(range_response.ResponseQueueError):
- self.streamer.queue_response(self.single_part_response, [])
-
-
-class MultiPartResponseTest(unittest.TestCase):
- """Test class for handling one response of multi-part range request."""
-
- def setUp(self):
- self.response = mock.MagicMock()
- self.response.headers = {
- 'Content-Type': 'multipart/byteranges; boundary=boundary',
- }
- self.file_info_list = [
- tarfile_utils.TarMemberInfo('foo', '', '', '10', '10'),
- tarfile_utils.TarMemberInfo('bar', '', '', '123', '1000')]
-
- self.streamer = range_response.JsonStreamer()
-
- self.good_response = [
- '\r\nboundary\r\nContent-Type: some/type\r',
- '\nContent-Range: bytes 10-19/T\r\n\r\n012',
- '3456789\r\nboundary\r\nContent-Type: some',
- '/type\r\nContent-Range: bytes 123-1122/T\r'
- '\n\r\n' + 'a' * 400,
- 'a' * 600,
- '\r\nboundary--\r\n',
- ]
-
- def test_stream__empty_response(self):
- """Test streaming empty response."""
- self.response.iter_content.return_value = iter([''])
- self.streamer.queue_response(self.response, self.file_info_list)
- with self.assertRaises(range_response.FormatError):
- ''.join(self.streamer.stream())
-
- def test_stream__multipart_ranges(self):
- """Test streaming files in one response."""
- self.response.iter_content.return_value = iter(self.good_response)
- self.streamer.queue_response(self.response, self.file_info_list)
- result = json.loads(''.join(self.streamer.stream()))
- self.assertDictEqual(result, {'foo': '0123456789', 'bar': 'a' * 1000})
-
- def test_stream__two_multipart_ranges(self):
- """Test streaming files in two responses."""
- self.response.iter_content.return_value = iter(self.good_response)
- self.streamer.queue_response(self.response, self.file_info_list)
-
- response2 = mock.MagicMock()
- response2.headers = self.response.headers
- response2.iter_content.return_value = iter(self.good_response)
- self.streamer.queue_response(
- response2,
- [tarfile_utils.TarMemberInfo('FOO', '', '', '10', '10'),
- tarfile_utils.TarMemberInfo('BAR', '', '', '123', '1000')])
-
- result = json.loads(''.join(self.streamer.stream()))
- self.assertDictEqual(result, {'foo': '0123456789', 'FOO': '0123456789',
- 'bar': 'a' * 1000, 'BAR': 'a' * 1000})
-
- def test_stream__file_not_found(self):
- """Test streaming which cannot find file names."""
- self.response.iter_content.return_value = iter([
- '\r\nboundary\r\nContent-Type: some/type\r',
- '\nContent-Range: bytes 10-19/T\r\n\r\n012',
- '3456789\r\n',
- '\r\nboundary--\r\n',
- ])
- self.streamer.queue_response(self.response, [])
- with self.assertRaises(range_response.NoFileFoundError):
- list(self.streamer.stream())
-
- def test_stream__bad_sub_range_header(self):
- """Test streaming with bad range header."""
- self.response.iter_content.return_value = iter([
- '\r\nboundary\r\nContent-Type: some/type\r',
- '\nContent-RangeXXXXXXXXXXXXXXX'
- ])
- self.streamer.queue_response(self.response, [])
- with self.assertRaises(range_response.FormatError):
- list(self.streamer.stream())
-
- def test_stream__bad_size(self):
- """Test streaming with bad file size."""
- self.response.iter_content.return_value = iter([
- '\r\nboundary\r\nContent-Type: some/type\r',
- '\nContent-Range: bytes 10-19/T\r\n\r\n012',
- '34\r\n',
- '\r\nboundary--\r\n',
- ])
- self.streamer.queue_response(self.response, self.file_info_list)
- with self.assertRaises(range_response.FormatError):
- list(self.streamer.stream())
-
- def test_stream__single_range(self):
- """Test formatting a single range response."""
- self.response.headers = {'Content-Type': 'some/type',
- 'Content-Range': 'bytes 10-19/*'}
- self.response.content = 'x' * 10
- self.streamer.queue_response(self.response, self.file_info_list)
- result = ''.join(self.streamer.stream())
- self.assertEqual(result, json.dumps({'foo': self.response.content}))