Formatting: Format all python code with black.
This CL is probably not what you're looking for, it's only
automated formatting. Ignore it with
`git blame --ignore-rev <revision>` for this commit.
BUG=b:233893248
TEST=CQ
Change-Id: I66591d7a738d241aed3290138c0f68065ab10a6d
Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/chromite/+/3879174
Reviewed-by: Mike Frysinger <vapier@chromium.org>
Tested-by: Alex Klein <saklein@chromium.org>
diff --git a/api/field_handler.py b/api/field_handler.py
index 018a861..c958566 100644
--- a/api/field_handler.py
+++ b/api/field_handler.py
@@ -23,379 +23,420 @@
if TYPE_CHECKING:
- from chromite.lib import chroot_lib
+ from chromite.lib import chroot_lib
class Error(Exception):
- """Base error class for the module."""
+ """Base error class for the module."""
class InvalidResultPathError(Error):
- """Result path is invalid."""
+ """Result path is invalid."""
class ChrootHandler(object):
- """Translate a Chroot message to chroot enter arguments and env."""
+ """Translate a Chroot message to chroot enter arguments and env."""
- def __init__(self, clear_field):
- self.clear_field = clear_field
+ def __init__(self, clear_field):
+ self.clear_field = clear_field
- def handle(self, message, recurse=True) -> Optional['chroot_lib.Chroot']:
- """Parse a message for a chroot field."""
- # Find the Chroot field. Search for the field by type to prevent it being
- # tied to a naming convention.
- for descriptor in message.DESCRIPTOR.fields:
- field = getattr(message, descriptor.name)
- if isinstance(field, common_pb2.Chroot):
- chroot = field
- if self.clear_field:
- message.ClearField(descriptor.name)
- return self.parse_chroot(chroot)
+ def handle(self, message, recurse=True) -> Optional["chroot_lib.Chroot"]:
+ """Parse a message for a chroot field."""
+ # Find the Chroot field. Search for the field by type to prevent it being
+ # tied to a naming convention.
+ for descriptor in message.DESCRIPTOR.fields:
+ field = getattr(message, descriptor.name)
+ if isinstance(field, common_pb2.Chroot):
+ chroot = field
+ if self.clear_field:
+ message.ClearField(descriptor.name)
+ return self.parse_chroot(chroot)
- # Recurse down one level. This is handy for meta-endpoints that use another
- # endpoint's request to produce data for or about the second endpoint.
- # e.g. PackageService/NeedsChromeSource.
- if recurse:
- for descriptor in message.DESCRIPTOR.fields:
- field = getattr(message, descriptor.name)
- if isinstance(field, protobuf_message.Message):
- chroot = self.handle(field, recurse=False)
- if chroot:
- return chroot
+ # Recurse down one level. This is handy for meta-endpoints that use another
+ # endpoint's request to produce data for or about the second endpoint.
+ # e.g. PackageService/NeedsChromeSource.
+ if recurse:
+ for descriptor in message.DESCRIPTOR.fields:
+ field = getattr(message, descriptor.name)
+ if isinstance(field, protobuf_message.Message):
+ chroot = self.handle(field, recurse=False)
+ if chroot:
+ return chroot
- return None
+ return None
- def parse_chroot(self,
- chroot_message: common_pb2.Chroot) -> 'chroot_lib.Chroot':
- """Parse a Chroot message instance."""
- return controller_util.ParseChroot(chroot_message)
+ def parse_chroot(
+ self, chroot_message: common_pb2.Chroot
+ ) -> "chroot_lib.Chroot":
+ """Parse a Chroot message instance."""
+ return controller_util.ParseChroot(chroot_message)
-def handle_chroot(message: protobuf_message.Message,
- clear_field: Optional[bool] = True) -> 'chroot_lib.Chroot':
- """Find and parse the chroot field, returning the Chroot instance."""
- handler = ChrootHandler(clear_field)
- chroot = handler.handle(message)
- if chroot:
- return chroot
+def handle_chroot(
+ message: protobuf_message.Message, clear_field: Optional[bool] = True
+) -> "chroot_lib.Chroot":
+ """Find and parse the chroot field, returning the Chroot instance."""
+ handler = ChrootHandler(clear_field)
+ chroot = handler.handle(message)
+ if chroot:
+ return chroot
- logging.warning('No chroot message found, falling back to defaults.')
- return handler.parse_chroot(common_pb2.Chroot())
+ logging.warning("No chroot message found, falling back to defaults.")
+ return handler.parse_chroot(common_pb2.Chroot())
def handle_goma(message, chroot_path):
- """Find and parse the GomaConfig field, returning the Goma instance."""
- for descriptor in message.DESCRIPTOR.fields:
- field = getattr(message, descriptor.name)
- if isinstance(field, common_pb2.GomaConfig):
- goma_config = field
- return controller_util.ParseGomaConfig(goma_config, chroot_path)
+ """Find and parse the GomaConfig field, returning the Goma instance."""
+ for descriptor in message.DESCRIPTOR.fields:
+ field = getattr(message, descriptor.name)
+ if isinstance(field, common_pb2.GomaConfig):
+ goma_config = field
+ return controller_util.ParseGomaConfig(goma_config, chroot_path)
- return None
+ return None
def handle_remoteexec(message: protobuf_message.Message):
- """Find the RemoteexecConfig field, returning the Remoteexec instance."""
- for descriptor in message.DESCRIPTOR.fields:
- field = getattr(message, descriptor.name)
- if isinstance(field, common_pb2.RemoteexecConfig):
- remoteexec_config = field
- return controller_util.ParseRemoteexecConfig(remoteexec_config)
+ """Find the RemoteexecConfig field, returning the Remoteexec instance."""
+ for descriptor in message.DESCRIPTOR.fields:
+ field = getattr(message, descriptor.name)
+ if isinstance(field, common_pb2.RemoteexecConfig):
+ remoteexec_config = field
+ return controller_util.ParseRemoteexecConfig(remoteexec_config)
- return None
+ return None
class PathHandler(object):
- """Handles copying a file or directory into or out of the chroot."""
+ """Handles copying a file or directory into or out of the chroot."""
- INSIDE = common_pb2.Path.INSIDE
- OUTSIDE = common_pb2.Path.OUTSIDE
+ INSIDE = common_pb2.Path.INSIDE
+ OUTSIDE = common_pb2.Path.OUTSIDE
- def __init__(self,
- field: common_pb2.Path,
- destination: str,
- delete: bool,
- prefix: Optional[str] = None,
- reset: Optional[bool] = True) -> None:
- """Path handler initialization.
+ def __init__(
+ self,
+ field: common_pb2.Path,
+ destination: str,
+ delete: bool,
+ prefix: Optional[str] = None,
+ reset: Optional[bool] = True,
+ ) -> None:
+ """Path handler initialization.
- Args:
- field: The Path message.
- destination: The destination base path.
- delete: Whether the copied file(s) should be deleted on cleanup.
- prefix: A path prefix to remove from the destination path when moving
- files inside the chroot, or to add to the source paths when moving files
- out of the chroot.
- reset: Whether to reset the state on cleanup.
- """
- assert isinstance(field, common_pb2.Path)
- assert field.path
- assert field.location
+ Args:
+ field: The Path message.
+ destination: The destination base path.
+ delete: Whether the copied file(s) should be deleted on cleanup.
+ prefix: A path prefix to remove from the destination path when moving
+ files inside the chroot, or to add to the source paths when moving files
+ out of the chroot.
+ reset: Whether to reset the state on cleanup.
+ """
+ assert isinstance(field, common_pb2.Path)
+ assert field.path
+ assert field.location
- self.field = field
- self.destination = destination
- self.prefix = '' if prefix is None else str(prefix)
- self.delete = delete
- self.tempdir = None
- self.reset = reset
+ self.field = field
+ self.destination = destination
+ self.prefix = "" if prefix is None else str(prefix)
+ self.delete = delete
+ self.tempdir = None
+ self.reset = reset
- # For resetting the state.
- self._transferred = False
- self._original_message = common_pb2.Path()
- self._original_message.CopyFrom(self.field)
+ # For resetting the state.
+ self._transferred = False
+ self._original_message = common_pb2.Path()
+ self._original_message.CopyFrom(self.field)
- def transfer(self, direction: int) -> None:
- """Copy the file or directory to its destination.
+ def transfer(self, direction: int) -> None:
+ """Copy the file or directory to its destination.
- Args:
- direction: The direction files are being copied (into or out of the
- chroot). Specifying the direction allows avoiding performing unnecessary
- copies.
- """
- if self._transferred:
- return
+ Args:
+ direction: The direction files are being copied (into or out of the
+ chroot). Specifying the direction allows avoiding performing unnecessary
+ copies.
+ """
+ if self._transferred:
+ return
- assert direction in [self.INSIDE, self.OUTSIDE]
+ assert direction in [self.INSIDE, self.OUTSIDE]
- if self.field.location == direction:
- # Already in the correct location, nothing to do.
- return
+ if self.field.location == direction:
+ # Already in the correct location, nothing to do.
+ return
- # Create a tempdir for the copied file if we're cleaning it up afterwords.
- if self.delete:
- self.tempdir = osutils.TempDir(base_dir=self.destination)
- destination = self.tempdir.tempdir
- else:
- destination = self.destination
+ # Create a tempdir for the copied file if we're cleaning it up afterwords.
+ if self.delete:
+ self.tempdir = osutils.TempDir(base_dir=self.destination)
+ destination = self.tempdir.tempdir
+ else:
+ destination = self.destination
- source = self.field.path
- if direction == self.OUTSIDE and self.prefix:
- # When we're extracting files, we need /tmp/result to be
- # /path/to/chroot/tmp/result.
- source = os.path.join(self.prefix, source.lstrip(os.sep))
+ source = self.field.path
+ if direction == self.OUTSIDE and self.prefix:
+ # When we're extracting files, we need /tmp/result to be
+ # /path/to/chroot/tmp/result.
+ source = os.path.join(self.prefix, source.lstrip(os.sep))
- if os.path.isfile(source):
- # File - use the old file name, just copy it into the destination.
- dest_path = os.path.join(destination, os.path.basename(source))
- copy_fn = shutil.copy
- else:
- # Directory - just copy everything into the new location.
- dest_path = destination
- copy_fn = functools.partial(osutils.CopyDirContents, allow_nonempty=True)
+ if os.path.isfile(source):
+ # File - use the old file name, just copy it into the destination.
+ dest_path = os.path.join(destination, os.path.basename(source))
+ copy_fn = shutil.copy
+ else:
+ # Directory - just copy everything into the new location.
+ dest_path = destination
+ copy_fn = functools.partial(
+ osutils.CopyDirContents, allow_nonempty=True
+ )
- logging.debug('Copying %s to %s', source, dest_path)
- copy_fn(source, dest_path)
+ logging.debug("Copying %s to %s", source, dest_path)
+ copy_fn(source, dest_path)
- # Clean up the destination path for returning, if applicable.
- return_path = dest_path
- if direction == self.INSIDE and return_path.startswith(self.prefix):
- return_path = return_path[len(self.prefix):]
+ # Clean up the destination path for returning, if applicable.
+ return_path = dest_path
+ if direction == self.INSIDE and return_path.startswith(self.prefix):
+ return_path = return_path[len(self.prefix) :]
- self.field.path = return_path
- self.field.location = direction
- self._transferred = True
+ self.field.path = return_path
+ self.field.location = direction
+ self._transferred = True
- def cleanup(self):
- if self.tempdir:
- self.tempdir.Cleanup()
- self.tempdir = None
+ def cleanup(self):
+ if self.tempdir:
+ self.tempdir.Cleanup()
+ self.tempdir = None
- if self.reset:
- self.field.CopyFrom(self._original_message)
+ if self.reset:
+ self.field.CopyFrom(self._original_message)
class SyncedDirHandler(object):
- """Handler for syncing directories across the chroot boundary."""
+ """Handler for syncing directories across the chroot boundary."""
- def __init__(self, field, destination, prefix):
- self.field = field
- self.prefix = prefix
+ def __init__(self, field, destination, prefix):
+ self.field = field
+ self.prefix = prefix
- self.source = self.field.dir
- if not self.source.endswith(os.sep):
- self.source += os.sep
+ self.source = self.field.dir
+ if not self.source.endswith(os.sep):
+ self.source += os.sep
- self.destination = destination
- if not self.destination.endswith(os.sep):
- self.destination += os.sep
+ self.destination = destination
+ if not self.destination.endswith(os.sep):
+ self.destination += os.sep
- # For resetting the message later.
- self._original_message = common_pb2.SyncedDir()
- self._original_message.CopyFrom(self.field)
+ # For resetting the message later.
+ self._original_message = common_pb2.SyncedDir()
+ self._original_message.CopyFrom(self.field)
- def _sync(self, src, dest):
- logging.info('Syncing %s to %s', src, dest)
- # TODO: This would probably be more efficient with rsync.
- osutils.EmptyDir(dest)
- osutils.CopyDirContents(src, dest)
+ def _sync(self, src, dest):
+ logging.info("Syncing %s to %s", src, dest)
+ # TODO: This would probably be more efficient with rsync.
+ osutils.EmptyDir(dest)
+ osutils.CopyDirContents(src, dest)
- def sync_in(self):
- """Sync files from the source directory to the destination directory."""
- self._sync(self.source, self.destination)
- self.field.dir = '/%s' % os.path.relpath(self.destination, self.prefix)
+ def sync_in(self):
+ """Sync files from the source directory to the destination directory."""
+ self._sync(self.source, self.destination)
+ self.field.dir = "/%s" % os.path.relpath(self.destination, self.prefix)
- def sync_out(self):
- """Sync files from the destination directory to the source directory."""
- self._sync(self.destination, self.source)
- self.field.CopyFrom(self._original_message)
+ def sync_out(self):
+ """Sync files from the destination directory to the source directory."""
+ self._sync(self.destination, self.source)
+ self.field.CopyFrom(self._original_message)
@contextlib.contextmanager
-def copy_paths_in(message: protobuf_message.Message,
- destination: str,
- delete: Optional[bool] = True,
- prefix: Optional[str] = None) -> Iterator[List[PathHandler]]:
- """Context manager function to transfer and cleanup all Path messages.
+def copy_paths_in(
+ message: protobuf_message.Message,
+ destination: str,
+ delete: Optional[bool] = True,
+ prefix: Optional[str] = None,
+) -> Iterator[List[PathHandler]]:
+ """Context manager function to transfer and cleanup all Path messages.
- Args:
- message: A message whose Path messages should be transferred.
- destination: The base destination path.
- delete: Whether the file(s) should be deleted.
- prefix: A prefix path to remove from the final destination path in the Path
- message (i.e. remove the chroot path).
+ Args:
+ message: A message whose Path messages should be transferred.
+ destination: The base destination path.
+ delete: Whether the file(s) should be deleted.
+ prefix: A prefix path to remove from the final destination path in the Path
+ message (i.e. remove the chroot path).
- Yields:
- list[PathHandler]: The path handlers.
- """
- assert destination
+ Yields:
+ list[PathHandler]: The path handlers.
+ """
+ assert destination
- handlers = _extract_handlers(message, destination, prefix, delete=delete,
- reset=True)
+ handlers = _extract_handlers(
+ message, destination, prefix, delete=delete, reset=True
+ )
- for handler in handlers:
- handler.transfer(PathHandler.INSIDE)
-
- try:
- yield handlers
- finally:
for handler in handlers:
- handler.cleanup()
+ handler.transfer(PathHandler.INSIDE)
+
+ try:
+ yield handlers
+ finally:
+ for handler in handlers:
+ handler.cleanup()
@contextlib.contextmanager
-def sync_dirs(message: protobuf_message.Message,
- destination: str,
- prefix: str) -> Iterator[SyncedDirHandler]:
- """Context manager function to handle SyncedDir messages.
+def sync_dirs(
+ message: protobuf_message.Message, destination: str, prefix: str
+) -> Iterator[SyncedDirHandler]:
+ """Context manager function to handle SyncedDir messages.
- The sync semantics are effectively:
- rsync -r --del source/ destination/
- * The endpoint runs. *
- rsync -r --del destination/ source/
+ The sync semantics are effectively:
+ rsync -r --del source/ destination/
+ * The endpoint runs. *
+ rsync -r --del destination/ source/
- Args:
- message: A message whose SyncedPath messages should be synced.
- destination: The destination path.
- prefix: A prefix path to remove from the final destination path in the Path
- message (i.e. remove the chroot path).
+ Args:
+ message: A message whose SyncedPath messages should be synced.
+ destination: The destination path.
+ prefix: A prefix path to remove from the final destination path in the Path
+ message (i.e. remove the chroot path).
- Yields:
- The handlers.
- """
- assert destination
+ Yields:
+ The handlers.
+ """
+ assert destination
- handlers = _extract_handlers(message, destination, prefix=prefix,
- delete=False, reset=True,
- message_type=common_pb2.SyncedDir)
+ handlers = _extract_handlers(
+ message,
+ destination,
+ prefix=prefix,
+ delete=False,
+ reset=True,
+ message_type=common_pb2.SyncedDir,
+ )
- for handler in handlers:
- handler.sync_in()
-
- try:
- yield handlers
- finally:
for handler in handlers:
- handler.sync_out()
+ handler.sync_in()
+
+ try:
+ yield handlers
+ finally:
+ for handler in handlers:
+ handler.sync_out()
-def extract_results(request_message: protobuf_message.Message,
- response_message: protobuf_message.Message,
- chroot: 'chroot_lib.Chroot') -> None:
- """Transfer all response Path messages to the request's ResultPath.
+def extract_results(
+ request_message: protobuf_message.Message,
+ response_message: protobuf_message.Message,
+ chroot: "chroot_lib.Chroot",
+) -> None:
+ """Transfer all response Path messages to the request's ResultPath.
- Args:
- request_message: The request message containing a ResultPath message.
- response_message: The response message whose Path message(s) are to be
- transferred.
- chroot: The chroot the files are being copied out of.
- """
- # Find the ResultPath.
- for descriptor in request_message.DESCRIPTOR.fields:
- field = getattr(request_message, descriptor.name)
- if isinstance(field, common_pb2.ResultPath):
- result_path_message = field
- break
- else:
- # No ResultPath to handle.
- return
-
- destination = result_path_message.path.path
- handlers = _extract_handlers(response_message, destination, chroot.path,
- delete=False, reset=False)
-
- for handler in handlers:
- handler.transfer(PathHandler.OUTSIDE)
- handler.cleanup()
-
-
-def _extract_handlers(message, destination, prefix, delete=False, reset=False,
- field_name=None, message_type=None):
- """Recursive helper for handle_paths to extract Path messages."""
- message_type = message_type or common_pb2.Path
- is_path_target = message_type is common_pb2.Path
- is_synced_target = message_type is common_pb2.SyncedDir
-
- is_message = isinstance(message, protobuf_message.Message)
- is_result_path = isinstance(message, common_pb2.ResultPath)
- if not is_message or is_result_path:
- # Base case: Nothing to handle.
- # There's nothing we can do with scalar values.
- # Skip ResultPath instances to avoid unnecessary file copying.
- return []
- elif is_path_target and isinstance(message, common_pb2.Path):
- # Base case: Create handler for this message.
- if not message.path or not message.location:
- logging.debug('Skipping %s; incomplete.', field_name or 'message')
- return []
-
- handler = PathHandler(message, destination, delete=delete, prefix=prefix,
- reset=reset)
- return [handler]
- elif is_synced_target and isinstance(message, common_pb2.SyncedDir):
- if not message.dir:
- logging.debug('Skipping %s; no directory given.', field_name or 'message')
- return []
-
- handler = SyncedDirHandler(message, destination, prefix)
- return [handler]
-
- # Iterate through each field and recurse.
- handlers = []
- for descriptor in message.DESCRIPTOR.fields:
- field = getattr(message, descriptor.name)
- if field_name:
- new_field_name = '%s.%s' % (field_name, descriptor.name)
+ Args:
+ request_message: The request message containing a ResultPath message.
+ response_message: The response message whose Path message(s) are to be
+ transferred.
+ chroot: The chroot the files are being copied out of.
+ """
+ # Find the ResultPath.
+ for descriptor in request_message.DESCRIPTOR.fields:
+ field = getattr(request_message, descriptor.name)
+ if isinstance(field, common_pb2.ResultPath):
+ result_path_message = field
+ break
else:
- new_field_name = descriptor.name
+ # No ResultPath to handle.
+ return
- if isinstance(field, protobuf_message.Message):
- # Recurse for nested Paths.
- handlers.extend(
- _extract_handlers(field, destination, prefix, delete, reset,
- field_name=new_field_name,
- message_type=message_type))
- else:
- # If it's iterable it may be a repeated field, try each element.
- try:
- iterator = iter(field)
- except TypeError:
- # Definitely not a repeated field, just move on.
- continue
+ destination = result_path_message.path.path
+ handlers = _extract_handlers(
+ response_message, destination, chroot.path, delete=False, reset=False
+ )
- for element in iterator:
- handlers.extend(
- _extract_handlers(element, destination, prefix, delete, reset,
- field_name=new_field_name,
- message_type=message_type))
+ for handler in handlers:
+ handler.transfer(PathHandler.OUTSIDE)
+ handler.cleanup()
- return handlers
+
+def _extract_handlers(
+ message,
+ destination,
+ prefix,
+ delete=False,
+ reset=False,
+ field_name=None,
+ message_type=None,
+):
+ """Recursive helper for handle_paths to extract Path messages."""
+ message_type = message_type or common_pb2.Path
+ is_path_target = message_type is common_pb2.Path
+ is_synced_target = message_type is common_pb2.SyncedDir
+
+ is_message = isinstance(message, protobuf_message.Message)
+ is_result_path = isinstance(message, common_pb2.ResultPath)
+ if not is_message or is_result_path:
+ # Base case: Nothing to handle.
+ # There's nothing we can do with scalar values.
+ # Skip ResultPath instances to avoid unnecessary file copying.
+ return []
+ elif is_path_target and isinstance(message, common_pb2.Path):
+ # Base case: Create handler for this message.
+ if not message.path or not message.location:
+ logging.debug("Skipping %s; incomplete.", field_name or "message")
+ return []
+
+ handler = PathHandler(
+ message, destination, delete=delete, prefix=prefix, reset=reset
+ )
+ return [handler]
+ elif is_synced_target and isinstance(message, common_pb2.SyncedDir):
+ if not message.dir:
+ logging.debug(
+ "Skipping %s; no directory given.", field_name or "message"
+ )
+ return []
+
+ handler = SyncedDirHandler(message, destination, prefix)
+ return [handler]
+
+ # Iterate through each field and recurse.
+ handlers = []
+ for descriptor in message.DESCRIPTOR.fields:
+ field = getattr(message, descriptor.name)
+ if field_name:
+ new_field_name = "%s.%s" % (field_name, descriptor.name)
+ else:
+ new_field_name = descriptor.name
+
+ if isinstance(field, protobuf_message.Message):
+ # Recurse for nested Paths.
+ handlers.extend(
+ _extract_handlers(
+ field,
+ destination,
+ prefix,
+ delete,
+ reset,
+ field_name=new_field_name,
+ message_type=message_type,
+ )
+ )
+ else:
+ # If it's iterable it may be a repeated field, try each element.
+ try:
+ iterator = iter(field)
+ except TypeError:
+ # Definitely not a repeated field, just move on.
+ continue
+
+ for element in iterator:
+ handlers.extend(
+ _extract_handlers(
+ element,
+ destination,
+ prefix,
+ delete,
+ reset,
+ field_name=new_field_name,
+ message_type=message_type,
+ )
+ )
+
+ return handlers