cli: Warn and prompt when USE flags have changed
When executing `cros deploy --update`, check whether the new package's
USE flags match those from the package already on the DUT. If not, warn
and prompt the user to confirm they want to proceed.
BUG=b:187783171
TEST=./run_tests cli/deploy_unittest.py and `cros deploy` xattr to a DUT
Change-Id: I46a4c2ff01e605122f5e0124bb58755b78e86edf
Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/chromite/+/4460194
Tested-by: Tim Bain <tbain@google.com>
Commit-Queue: Tim Bain <tbain@google.com>
Reviewed-by: Ram Chandrasekar <rchandrasekar@google.com>
diff --git a/cli/deploy.py b/cli/deploy.py
index 5e48648..50d04d4 100644
--- a/cli/deploy.py
+++ b/cli/deploy.py
@@ -18,6 +18,7 @@
import os
from pathlib import Path
import tempfile
+from typing import Dict, List, Set, Tuple
from chromite.cli import command
from chromite.lib import build_target_lib
@@ -80,11 +81,11 @@
"Please restart any updated",
)
- def __init__(self, emerge):
+ def __init__(self, emerge: bool):
"""Construct BrilloDeployOperation object.
Args:
- emerge: True if emerge, False is unmerge.
+ emerge: True if emerge, False if unmerge.
"""
super().__init__()
if emerge:
@@ -127,14 +128,28 @@
class PkgInfo(object):
"""A record containing package information."""
- __slots__ = ("cpv", "build_time", "rdeps_raw", "rdeps", "rev_rdeps")
+ __slots__ = (
+ "cpv",
+ "build_time",
+ "rdeps_raw",
+ "use",
+ "rdeps",
+ "rev_rdeps",
+ )
def __init__(
- self, cpv, build_time, rdeps_raw, rdeps=None, rev_rdeps=None
+ self,
+ cpv: package_info.CPV,
+ build_time: int,
+ rdeps_raw: str,
+ use: str,
+ rdeps: set = None,
+ rev_rdeps: set = None,
):
self.cpv = cpv
self.build_time = build_time
self.rdeps_raw = rdeps_raw
+ self.use = use
self.rdeps = set() if rdeps is None else rdeps
self.rev_rdeps = set() if rev_rdeps is None else rev_rdeps
@@ -153,14 +168,14 @@
vartree = trees[target_root]['vartree']
pkg_info = []
for cpv in vartree.dbapi.cpv_all():
- slot, rdep_raw, build_time = vartree.dbapi.aux_get(
- cpv, ('SLOT', 'RDEPEND', 'BUILD_TIME'))
- pkg_info.append((cpv, slot, rdep_raw, build_time))
+ slot, rdep_raw, build_time, use = vartree.dbapi.aux_get(
+ cpv, ('SLOT', 'RDEPEND', 'BUILD_TIME', 'USE'))
+ pkg_info.append((cpv, slot, rdep_raw, build_time, use))
print(json.dumps(pkg_info))
"""
- def __init__(self, sysroot):
+ def __init__(self, sysroot: str):
self.sysroot = sysroot
# Members containing the sysroot (binpkg) and target (installed) package
# DB.
@@ -172,7 +187,7 @@
self.listed = None
@staticmethod
- def _GetCP(cpv):
+ def _GetCP(cpv: package_info.CPV) -> str:
"""Returns the CP value for a given CPV string."""
attrs = package_info.SplitCPV(cpv, strict=False)
if not attrs.cp:
@@ -180,18 +195,18 @@
return attrs.cp
@staticmethod
- def _InDB(cp, slot, db):
+ def _InDB(cp: str, slot: str, db: Dict[str, Dict[str, PkgInfo]]) -> bool:
"""Returns whether CP and slot are found in a database (if provided)."""
cp_slots = db.get(cp) if db else None
return cp_slots is not None and (not slot or slot in cp_slots)
@staticmethod
- def _AtomStr(cp, slot):
+ def _AtomStr(cp: str, slot: str) -> str:
"""Returns 'CP:slot' if slot is non-empty, else just 'CP'."""
return "%s:%s" % (cp, slot) if slot else cp
@classmethod
- def _GetVartreeSnippet(cls, root="/"):
+ def _GetVartreeSnippet(cls, root: str = "/") -> str:
"""Returns a code snippet for dumping the vartree on the target.
Args:
@@ -203,7 +218,9 @@
return cls._GET_VARTREE % {"root": root}
@classmethod
- def _StripDepAtom(cls, dep_atom, installed_db=None):
+ def _StripDepAtom(
+ cls, dep_atom: str, installed_db: Dict[str, Dict[str, PkgInfo]] = None
+ ) -> Tuple[str, str]:
"""Strips a dependency atom and returns a (CP, slot) pair."""
# TODO(garnold) This is a gross simplification of ebuild dependency
# semantics, stripping and ignoring various qualifiers (versions, slots,
@@ -248,7 +265,12 @@
return cp, slot
@classmethod
- def _ProcessDepStr(cls, dep_str, installed_db, avail_db):
+ def _ProcessDepStr(
+ cls,
+ dep_str: str,
+ installed_db: Dict[str, Dict[str, PkgInfo]],
+ avail_db: Dict[str, Dict[str, PkgInfo]],
+ ) -> set:
"""Resolves and returns a list of dependencies from a dependency string.
This parses a dependency string and returns a list of package names and
@@ -269,7 +291,9 @@
ValueError: the dependencies string is malformed.
"""
- def ProcessSubDeps(dep_exp, disjunct):
+ def ProcessSubDeps(
+ dep_exp: Set[Tuple[str, str]], disjunct: bool
+ ) -> Set[Tuple[str, str]]:
"""Parses and processes a dependency (sub)expression."""
deps = set()
default_deps = set()
@@ -327,8 +351,12 @@
raise ValueError("%s: %s" % (e, dep_str))
def _BuildDB(
- self, cpv_info, process_rdeps, process_rev_rdeps, installed_db=None
- ):
+ self,
+ cpv_info: List[Tuple[Dict[str, package_info.CPV], str, str, int, str]],
+ process_rdeps: bool,
+ process_rev_rdeps: bool,
+ installed_db: Dict[str, Dict[str, PkgInfo]] = None,
+ ) -> Dict[str, Dict[str, PkgInfo]]:
"""Returns a database of packages given a list of CPV info.
Args:
@@ -349,7 +377,7 @@
"""
db = {}
logging.debug("Populating package DB...")
- for cpv, slot, rdeps_raw, build_time in cpv_info:
+ for cpv, slot, rdeps_raw, build_time, use in cpv_info:
cp = self._GetCP(cpv)
cp_slots = db.setdefault(cp, dict())
if slot in cp_slots:
@@ -364,7 +392,7 @@
build_time,
rdeps_raw,
)
- cp_slots[slot] = self.PkgInfo(cpv, build_time, rdeps_raw)
+ cp_slots[slot] = self.PkgInfo(cpv, build_time, rdeps_raw, use)
avail_db = db
if installed_db is None:
@@ -417,7 +445,13 @@
return db
- def _InitTargetVarDB(self, device, root, process_rdeps, process_rev_rdeps):
+ def _InitTargetVarDB(
+ self,
+ device: remote_access.RemoteDevice,
+ root: str,
+ process_rdeps: bool,
+ process_rev_rdeps: bool,
+ ) -> None:
"""Initializes a dictionary of packages installed on |device|."""
get_vartree_script = self._GetVartreeSnippet(root)
try:
@@ -435,7 +469,7 @@
except ValueError as e:
raise self.VartreeError(str(e))
- def _InitBinpkgDB(self, process_rdeps):
+ def _InitBinpkgDB(self, process_rdeps: bool) -> None:
"""Initializes a dictionary of binpkgs for updating the target."""
# Get build root trees; portage indexes require a trailing '/'.
build_root = os.path.join(self.sysroot, "")
@@ -445,10 +479,10 @@
bintree = trees[build_root]["bintree"]
binpkgs_info = []
for cpv in bintree.dbapi.cpv_all():
- slot, rdep_raw, build_time = bintree.dbapi.aux_get(
- cpv, ["SLOT", "RDEPEND", "BUILD_TIME"]
+ slot, rdep_raw, build_time, use = bintree.dbapi.aux_get(
+ cpv, ["SLOT", "RDEPEND", "BUILD_TIME", "USE"]
)
- binpkgs_info.append((cpv, slot, rdep_raw, build_time))
+ binpkgs_info.append((cpv, slot, rdep_raw, build_time, use))
try:
self.binpkgs_db = self._BuildDB(
@@ -457,13 +491,13 @@
except ValueError as e:
raise self.BintreeError(str(e))
- def _InitDepQueue(self):
+ def _InitDepQueue(self) -> None:
"""Initializes the dependency work queue."""
self.queue = set()
self.seen = {}
self.listed = set()
- def _EnqDep(self, dep, listed, optional):
+ def _EnqDep(self, dep: str, listed: bool, optional: bool) -> bool:
"""Enqueues a dependency if not seen before or if set non-optional."""
if dep in self.seen and (optional or not self.seen[dep]):
return False
@@ -474,7 +508,7 @@
self.listed.add(dep)
return True
- def _DeqDep(self):
+ def _DeqDep(self) -> Tuple[str, bool, bool]:
"""Dequeues and returns a dependency, its listed and optional flags.
This returns listed packages first, if any are present, to ensure that
@@ -490,7 +524,7 @@
return dep, listed, self.seen[dep]
- def _FindPackageMatches(self, cpv_pattern):
+ def _FindPackageMatches(self, cpv_pattern: str) -> List[Tuple[str, str]]:
"""Returns list of binpkg (CP, slot) pairs that match |cpv_pattern|.
This is breaking |cpv_pattern| into its C, P and V components, each of
@@ -527,7 +561,7 @@
return matches
- def _FindPackage(self, pkg):
+ def _FindPackage(self, pkg: str) -> Tuple[str, str]:
"""Returns the (CP, slot) pair for a package matching |pkg|.
Args:
@@ -560,7 +594,9 @@
return matches[idx]
- def _NeedsInstall(self, cpv, slot, build_time, optional):
+ def _NeedsInstall(
+ self, cpv: str, slot: str, build_time: int, optional: bool
+ ) -> Tuple[bool, bool, bool]:
"""Returns whether a package needs to be installed on the target.
Args:
@@ -570,38 +606,71 @@
optional: Whether package is optional on the target.
Returns:
- A tuple (install, update) indicating whether to |install| the
- package and whether it is an |update| to an existing package.
+ A tuple (install, update, use_mismatch) indicating whether to
+ |install| the package, whether it is an |update| to an existing
+ package, and whether the package's USE flags mismatch the existing
+ package.
Raises:
ValueError: if slot is not provided.
"""
# If not checking installed packages, always install.
if not self.target_db:
- return True, False
+ return True, False, False
cp = self._GetCP(cpv)
target_pkg_info = self.target_db.get(cp, dict()).get(slot)
if target_pkg_info is not None:
- if cpv != target_pkg_info.cpv:
- attrs = package_info.SplitCPV(cpv)
- target_attrs = package_info.SplitCPV(target_pkg_info.cpv)
- logging.debug(
- "Updating %s: version (%s) different on target (%s)",
- cp,
- attrs.version,
- target_attrs.version,
- )
- return True, True
+ attrs = package_info.SplitCPV(cpv)
+ target_attrs = package_info.SplitCPV(target_pkg_info.cpv)
- if build_time != target_pkg_info.build_time:
+ def _get_attr_mismatch(
+ attr_name: str, new_attr: any, target_attr: any
+ ) -> Tuple[str, str, str]:
+ """Check if the new and target packages differ for an attribute.
+
+ Args:
+ attr_name: The name of the attribute being checked (string).
+ new_attr: The value of the given attribute for the new
+ package (string).
+ target_attr: The value of the given attribute for the target
+ (existing) package (string).
+
+ Returns:
+ A tuple (attr_name, new_attr, target_attr) composed of the
+ args if there is a mismatch, or None if the values match.
+ """
+ mismatch = new_attr != target_attr
+ if mismatch:
+ return attr_name, new_attr, target_attr
+
+ update_info = _get_attr_mismatch(
+ "version", attrs.version, target_attrs.version
+ ) or _get_attr_mismatch(
+ "build time", build_time, target_pkg_info.build_time
+ )
+
+ if update_info:
+ attr_name, new_attr, target_attr = update_info
logging.debug(
- "Updating %s: build time (%s) different on target (%s)",
- cpv,
- build_time,
- target_pkg_info.build_time,
+ "Updating %s: %s (%s) different on target (%s)",
+ cp,
+ attr_name,
+ new_attr,
+ target_attr,
)
- return True, True
+
+ binpkg_pkg_info = self.binpkgs_db.get(cp, dict()).get(slot)
+ use_mismatch = binpkg_pkg_info.use != target_pkg_info.use
+ if use_mismatch:
+ logging.warning(
+ "USE flags for package %s do not match (Existing='%s', "
+ "New='%s').",
+ cp,
+ target_pkg_info.use,
+ binpkg_pkg_info.use,
+ )
+ return True, True, use_mismatch
logging.debug(
"Not updating %s: already up-to-date (%s, built %s)",
@@ -609,20 +678,20 @@
target_pkg_info.cpv,
target_pkg_info.build_time,
)
- return False, False
+ return False, False, False
if optional:
logging.debug(
"Not installing %s: missing on target but optional", cp
)
- return False, False
+ return False, False, False
logging.debug(
"Installing %s: missing on target and non-optional (%s)", cp, cpv
)
- return True, False
+ return True, False, False
- def _ProcessDeps(self, deps, reverse):
+ def _ProcessDeps(self, deps: List[str], reverse: bool) -> None:
"""Enqueues dependencies for processing.
Args:
@@ -647,7 +716,9 @@
if num_already_seen:
logging.debug("%d dep(s) already seen", num_already_seen)
- def _ComputeInstalls(self, process_rdeps, process_rev_rdeps):
+ def _ComputeInstalls(
+ self, process_rdeps: bool, process_rev_rdeps: bool
+ ) -> Tuple[Dict[str, package_info.CPV], bool]:
"""Returns a dict of packages that need to be installed on the target.
Args:
@@ -655,12 +726,16 @@
process_rev_rdeps: Whether to trace backward dependencies as well.
Returns:
- A dictionary mapping CP values (string) to tuples containing
- a CPV (string), a slot (string), a boolean indicating whether
- the package was initially listed in the queue, and a boolean
- indicating whether this is an update to an existing package.
+ A tuple (installs, warnings_shown) where |installs| is a dictionary
+ mapping CP values (string) to tuples containing a CPV (string), a
+ slot (string), a boolean indicating whether the package was
+ initially listed in the queue, and a boolean indicating whether this
+ is an update to an existing package, and |warnings_shown| is a
+ boolean indicating whether warnings were shown that might require a
+ prompt whether to continue.
"""
installs = {}
+ warnings_shown = False
while self.queue:
dep, listed, optional = self._DeqDep()
cp, required_slot = dep
@@ -683,13 +758,14 @@
num_processed += 1
logging.debug(" Checking %s...", pkg_info.cpv)
- install, update = self._NeedsInstall(
+ install, update, use_mismatch = self._NeedsInstall(
pkg_info.cpv, slot, pkg_info.build_time, optional
)
if not install:
continue
installs[cp] = (pkg_info.cpv, slot, listed, update)
+ warnings_shown |= use_mismatch
# Add forward and backward runtime dependencies to queue.
if process_rdeps:
@@ -704,9 +780,9 @@
"No qualified bintree package corresponding to %s", cp
)
- return installs
+ return installs, warnings_shown
- def _SortInstalls(self, installs):
+ def _SortInstalls(self, installs: List[str]) -> List[str]:
"""Returns a sorted list of packages to install.
Performs a topological sort based on dependencies found in the binary
@@ -725,7 +801,7 @@
curr_path = []
sorted_installs = []
- def SortFrom(cp):
+ def SortFrom(cp: str) -> None:
"""Traverses deps recursively, emitting nodes in reverse order."""
cpv, slot, _, _ = installs[cp]
if cpv in curr_path:
@@ -748,7 +824,7 @@
return sorted_installs
- def _EnqListedPkg(self, pkg):
+ def _EnqListedPkg(self, pkg: str) -> bool:
"""Finds and enqueues a listed package."""
cp, slot = self._FindPackage(pkg)
if cp not in self.binpkgs_db:
@@ -757,7 +833,7 @@
)
self._EnqDep((cp, slot), True, False)
- def _EnqInstalledPkgs(self):
+ def _EnqInstalledPkgs(self) -> None:
"""Enqueues all available binary packages that are already installed."""
for cp, cp_slots in self.binpkgs_db.items():
target_cp_slots = self.target_db.get(cp)
@@ -768,13 +844,13 @@
def Run(
self,
- device,
- root,
- listed_pkgs,
- update,
- process_rdeps,
- process_rev_rdeps,
- ):
+ device: remote_access.RemoteDevice,
+ root: str,
+ listed_pkgs: List[str],
+ update: bool,
+ process_rdeps: bool,
+ process_rev_rdeps: bool,
+ ) -> Tuple[List[str], List[str], int, Dict[str, str], bool]:
"""Computes the list of packages that need to be installed on a target.
Args:
@@ -786,14 +862,16 @@
process_rev_rdeps: Whether to trace backward dependencies as well.
Returns:
- A tuple (sorted, listed, num_updates, install_attrs) where |sorted|
- is a list of package CPVs (string) to install on the target
- in an order that satisfies their inter-dependencies, |listed|
+ A tuple (sorted, listed, num_updates, install_attrs, warnings_shown)
+ where |sorted| is a list of package CPVs (string) to install on the
+ target in an order that satisfies their inter-dependencies, |listed|
the subset that was requested by the user, and |num_updates|
the number of packages being installed over preexisting
versions. Note that installation order should be reversed for
removal, |install_attrs| is a dictionary mapping a package
- CPV (string) to some of its extracted environment attributes.
+ CPV (string) to some of its extracted environment attributes, and
+ |warnings_shown| is a boolean indicating whether warnings were shown
+ that might require a prompt whether to continue.
"""
if process_rev_rdeps and not process_rdeps:
raise ValueError(
@@ -827,7 +905,9 @@
self._EnqListedPkg(pkg)
logging.info("Computing set of packages to install...")
- installs = self._ComputeInstalls(process_rdeps, process_rev_rdeps)
+ installs, warnings_shown = self._ComputeInstalls(
+ process_rdeps, process_rev_rdeps
+ )
num_updates = 0
listed_installs = []
@@ -855,10 +935,21 @@
if dlc_id and dlc_package:
install_attrs[pkg][_DLC_ID] = dlc_id
- return sorted_installs, listed_installs, num_updates, install_attrs
+ return (
+ sorted_installs,
+ listed_installs,
+ num_updates,
+ install_attrs,
+ warnings_shown,
+ )
-def _Emerge(device, pkg_paths, root, extra_args=None):
+def _Emerge(
+ device: remote_access.RemoteDevice,
+ pkg_paths: List[str],
+ root: str,
+ extra_args: List[str] = None,
+) -> str:
"""Copies |pkg_paths| to |device| and emerges them.
Args:
@@ -974,7 +1065,9 @@
logging.notice("Packages have been installed.")
-def _RestoreSELinuxContext(device, pkgpath, root):
+def _RestoreSELinuxContext(
+ device: remote_access.RemoteDevice, pkgpath: str, root: str
+) -> None:
"""Restore SELinux context for files in a given package.
This reads the tarball from pkgpath, and calls restorecon on device to
@@ -1010,7 +1103,9 @@
)
-def _GetPackagesByCPV(cpvs, strip, sysroot):
+def _GetPackagesByCPV(
+ cpvs: List[package_info.CPV], strip: bool, sysroot: str
+) -> List[str]:
"""Returns paths to binary packages corresponding to |cpvs|.
Args:
@@ -1058,7 +1153,7 @@
return paths
-def _GetPackagesPaths(pkgs, strip, sysroot):
+def _GetPackagesPaths(pkgs: List[str], strip: bool, sysroot: str) -> List[str]:
"""Returns paths to binary |pkgs|.
Args:
@@ -1073,7 +1168,9 @@
return _GetPackagesByCPV(cpvs, strip, sysroot)
-def _Unmerge(device, pkgs, root):
+def _Unmerge(
+ device: remote_access.RemoteDevice, pkgs: List[str], root: str
+) -> None:
"""Unmerges |pkgs| on |device|.
Args:
@@ -1107,7 +1204,7 @@
logging.notice("Packages have been uninstalled.")
-def _ConfirmDeploy(num_updates):
+def _ConfirmDeploy(num_updates: int) -> bool:
"""Returns whether we can continue deployment."""
if num_updates > _MAX_UPDATES_NUM:
logging.warning(_MAX_UPDATES_WARNING)
@@ -1116,7 +1213,21 @@
return True
-def _EmergePackages(pkgs, device, strip, sysroot, root, board, emerge_args):
+def _ConfirmUpdateDespiteWarnings() -> bool:
+ """Returns whether we can continue updating despite warnings."""
+ logging.warning("Continue despite prior warnings?")
+ return cros_build_lib.BooleanPrompt(default=False)
+
+
+def _EmergePackages(
+ pkgs: List[str],
+ device: remote_access.RemoteDevice,
+ strip: bool,
+ sysroot: str,
+ root: str,
+ board: str,
+ emerge_args: List[str],
+) -> None:
"""Call _Emerge for each package in pkgs."""
if device.IsSELinuxAvailable():
enforced = device.IsSELinuxEnforced()
@@ -1166,7 +1277,12 @@
device.run(["restart", "dlcservice"])
-def _UnmergePackages(pkgs, device, root, pkgs_attrs):
+def _UnmergePackages(
+ pkgs: List[str],
+ device: remote_access.RemoteDevice,
+ root: str,
+ pkgs_attrs: Dict[str, List[str]],
+) -> str:
"""Call _Unmege for each package in pkgs."""
dlc_uninstalled = False
_Unmerge(device, pkgs, root)
@@ -1181,7 +1297,9 @@
device.run(["restart", "dlcservice"])
-def _UninstallDLCImage(device, pkg_attrs):
+def _UninstallDLCImage(
+ device: remote_access.RemoteDevice, pkg_attrs: Dict[str, List[str]]
+):
"""Uninstall a DLC image."""
if _DLC_ID in pkg_attrs:
dlc_id = pkg_attrs[_DLC_ID]
@@ -1194,7 +1312,13 @@
return False
-def _DeployDLCImage(device, sysroot, board, dlc_id, dlc_package):
+def _DeployDLCImage(
+ device: remote_access.RemoteDevice,
+ sysroot: str,
+ board: str,
+ dlc_id: str,
+ dlc_package: str,
+):
"""Deploy (install and mount) a DLC image.
Args:
@@ -1298,8 +1422,8 @@
def _DeployDLCLoadPin(
- rootfs: os.PathLike, device: remote_access.ChromiumOSDevice
-):
+ rootfs: os.PathLike, device: remote_access.RemoteDevice
+) -> None:
"""Deploy DLC LoadPin from temp rootfs to device.
Args:
@@ -1327,7 +1451,9 @@
)
-def _GetDLCInfo(device, pkg_path, from_dut):
+def _GetDLCInfo(
+ device: remote_access.RemoteDevice, pkg_path: str, from_dut: bool
+) -> Tuple[str, str]:
"""Returns information of a DLC given its package path.
Args:
@@ -1377,22 +1503,22 @@
def Deploy(
- device,
- packages,
- board=None,
- emerge=True,
- update=False,
- deep=False,
- deep_rev=False,
- clean_binpkg=True,
- root="/",
- strip=True,
- emerge_args=None,
- ssh_private_key=None,
- ping=True,
- force=False,
- dry_run=False,
-):
+ device: remote_access.RemoteDevice,
+ packages: List[str],
+ board: str = None,
+ emerge: bool = True,
+ update: bool = False,
+ deep: bool = False,
+ deep_rev: bool = False,
+ clean_binpkg: bool = True,
+ root: str = "/",
+ strip: bool = True,
+ emerge_args: List[str] = None,
+ ssh_private_key: str = None,
+ ping: bool = True,
+ force: bool = False,
+ dry_run: bool = False,
+) -> None:
"""Deploys packages to a device.
Args:
@@ -1474,9 +1600,13 @@
# Obtain list of packages to upgrade/remove.
pkg_scanner = _InstallPackageScanner(sysroot)
- pkgs, listed, num_updates, pkgs_attrs = pkg_scanner.Run(
- device, root, packages, update, deep, deep_rev
- )
+ (
+ pkgs,
+ listed,
+ num_updates,
+ pkgs_attrs,
+ warnings_shown,
+ ) = pkg_scanner.Run(device, root, packages, update, deep, deep_rev)
if emerge:
action_str = "emerge"
else:
@@ -1512,6 +1642,13 @@
if dry_run or not _ConfirmDeploy(num_updates):
return
+ if (
+ warnings_shown
+ and not force
+ and not _ConfirmUpdateDespiteWarnings()
+ ):
+ return
+
# Select function (emerge or unmerge) and bind args.
if emerge:
func = functools.partial(