cli: Warn and prompt when USE flags have changed

When executing `cros deploy --update`, check whether the new package's
USE flags match those from the package already on the DUT. If not, warn
and prompt the user to confirm they want to proceed.

BUG=b:187783171
TEST=./run_tests cli/deploy_unittest.py and `cros deploy` xattr to a DUT

Change-Id: I46a4c2ff01e605122f5e0124bb58755b78e86edf
Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/chromite/+/4460194
Tested-by: Tim Bain <tbain@google.com>
Commit-Queue: Tim Bain <tbain@google.com>
Reviewed-by: Ram Chandrasekar <rchandrasekar@google.com>
diff --git a/cli/deploy.py b/cli/deploy.py
index 5e48648..50d04d4 100644
--- a/cli/deploy.py
+++ b/cli/deploy.py
@@ -18,6 +18,7 @@
 import os
 from pathlib import Path
 import tempfile
+from typing import Dict, List, Set, Tuple
 
 from chromite.cli import command
 from chromite.lib import build_target_lib
@@ -80,11 +81,11 @@
         "Please restart any updated",
     )
 
-    def __init__(self, emerge):
+    def __init__(self, emerge: bool):
         """Construct BrilloDeployOperation object.
 
         Args:
-            emerge: True if emerge, False is unmerge.
+            emerge: True if emerge, False if unmerge.
         """
         super().__init__()
         if emerge:
@@ -127,14 +128,28 @@
     class PkgInfo(object):
         """A record containing package information."""
 
-        __slots__ = ("cpv", "build_time", "rdeps_raw", "rdeps", "rev_rdeps")
+        __slots__ = (
+            "cpv",
+            "build_time",
+            "rdeps_raw",
+            "use",
+            "rdeps",
+            "rev_rdeps",
+        )
 
         def __init__(
-            self, cpv, build_time, rdeps_raw, rdeps=None, rev_rdeps=None
+            self,
+            cpv: package_info.CPV,
+            build_time: int,
+            rdeps_raw: str,
+            use: str,
+            rdeps: set = None,
+            rev_rdeps: set = None,
         ):
             self.cpv = cpv
             self.build_time = build_time
             self.rdeps_raw = rdeps_raw
+            self.use = use
             self.rdeps = set() if rdeps is None else rdeps
             self.rev_rdeps = set() if rev_rdeps is None else rev_rdeps
 
@@ -153,14 +168,14 @@
 vartree = trees[target_root]['vartree']
 pkg_info = []
 for cpv in vartree.dbapi.cpv_all():
-  slot, rdep_raw, build_time = vartree.dbapi.aux_get(
-      cpv, ('SLOT', 'RDEPEND', 'BUILD_TIME'))
-  pkg_info.append((cpv, slot, rdep_raw, build_time))
+  slot, rdep_raw, build_time, use = vartree.dbapi.aux_get(
+      cpv, ('SLOT', 'RDEPEND', 'BUILD_TIME', 'USE'))
+  pkg_info.append((cpv, slot, rdep_raw, build_time, use))
 
 print(json.dumps(pkg_info))
 """
 
-    def __init__(self, sysroot):
+    def __init__(self, sysroot: str):
         self.sysroot = sysroot
         # Members containing the sysroot (binpkg) and target (installed) package
         # DB.
@@ -172,7 +187,7 @@
         self.listed = None
 
     @staticmethod
-    def _GetCP(cpv):
+    def _GetCP(cpv: package_info.CPV) -> str:
         """Returns the CP value for a given CPV string."""
         attrs = package_info.SplitCPV(cpv, strict=False)
         if not attrs.cp:
@@ -180,18 +195,18 @@
         return attrs.cp
 
     @staticmethod
-    def _InDB(cp, slot, db):
+    def _InDB(cp: str, slot: str, db: Dict[str, Dict[str, PkgInfo]]) -> bool:
         """Returns whether CP and slot are found in a database (if provided)."""
         cp_slots = db.get(cp) if db else None
         return cp_slots is not None and (not slot or slot in cp_slots)
 
     @staticmethod
-    def _AtomStr(cp, slot):
+    def _AtomStr(cp: str, slot: str) -> str:
         """Returns 'CP:slot' if slot is non-empty, else just 'CP'."""
         return "%s:%s" % (cp, slot) if slot else cp
 
     @classmethod
-    def _GetVartreeSnippet(cls, root="/"):
+    def _GetVartreeSnippet(cls, root: str = "/") -> str:
         """Returns a code snippet for dumping the vartree on the target.
 
         Args:
@@ -203,7 +218,9 @@
         return cls._GET_VARTREE % {"root": root}
 
     @classmethod
-    def _StripDepAtom(cls, dep_atom, installed_db=None):
+    def _StripDepAtom(
+        cls, dep_atom: str, installed_db: Dict[str, Dict[str, PkgInfo]] = None
+    ) -> Tuple[str, str]:
         """Strips a dependency atom and returns a (CP, slot) pair."""
         # TODO(garnold) This is a gross simplification of ebuild dependency
         # semantics, stripping and ignoring various qualifiers (versions, slots,
@@ -248,7 +265,12 @@
         return cp, slot
 
     @classmethod
-    def _ProcessDepStr(cls, dep_str, installed_db, avail_db):
+    def _ProcessDepStr(
+        cls,
+        dep_str: str,
+        installed_db: Dict[str, Dict[str, PkgInfo]],
+        avail_db: Dict[str, Dict[str, PkgInfo]],
+    ) -> set:
         """Resolves and returns a list of dependencies from a dependency string.
 
         This parses a dependency string and returns a list of package names and
@@ -269,7 +291,9 @@
             ValueError: the dependencies string is malformed.
         """
 
-        def ProcessSubDeps(dep_exp, disjunct):
+        def ProcessSubDeps(
+            dep_exp: Set[Tuple[str, str]], disjunct: bool
+        ) -> Set[Tuple[str, str]]:
             """Parses and processes a dependency (sub)expression."""
             deps = set()
             default_deps = set()
@@ -327,8 +351,12 @@
             raise ValueError("%s: %s" % (e, dep_str))
 
     def _BuildDB(
-        self, cpv_info, process_rdeps, process_rev_rdeps, installed_db=None
-    ):
+        self,
+        cpv_info: List[Tuple[Dict[str, package_info.CPV], str, str, int, str]],
+        process_rdeps: bool,
+        process_rev_rdeps: bool,
+        installed_db: Dict[str, Dict[str, PkgInfo]] = None,
+    ) -> Dict[str, Dict[str, PkgInfo]]:
         """Returns a database of packages given a list of CPV info.
 
         Args:
@@ -349,7 +377,7 @@
         """
         db = {}
         logging.debug("Populating package DB...")
-        for cpv, slot, rdeps_raw, build_time in cpv_info:
+        for cpv, slot, rdeps_raw, build_time, use in cpv_info:
             cp = self._GetCP(cpv)
             cp_slots = db.setdefault(cp, dict())
             if slot in cp_slots:
@@ -364,7 +392,7 @@
                 build_time,
                 rdeps_raw,
             )
-            cp_slots[slot] = self.PkgInfo(cpv, build_time, rdeps_raw)
+            cp_slots[slot] = self.PkgInfo(cpv, build_time, rdeps_raw, use)
 
         avail_db = db
         if installed_db is None:
@@ -417,7 +445,13 @@
 
         return db
 
-    def _InitTargetVarDB(self, device, root, process_rdeps, process_rev_rdeps):
+    def _InitTargetVarDB(
+        self,
+        device: remote_access.RemoteDevice,
+        root: str,
+        process_rdeps: bool,
+        process_rev_rdeps: bool,
+    ) -> None:
         """Initializes a dictionary of packages installed on |device|."""
         get_vartree_script = self._GetVartreeSnippet(root)
         try:
@@ -435,7 +469,7 @@
         except ValueError as e:
             raise self.VartreeError(str(e))
 
-    def _InitBinpkgDB(self, process_rdeps):
+    def _InitBinpkgDB(self, process_rdeps: bool) -> None:
         """Initializes a dictionary of binpkgs for updating the target."""
         # Get build root trees; portage indexes require a trailing '/'.
         build_root = os.path.join(self.sysroot, "")
@@ -445,10 +479,10 @@
         bintree = trees[build_root]["bintree"]
         binpkgs_info = []
         for cpv in bintree.dbapi.cpv_all():
-            slot, rdep_raw, build_time = bintree.dbapi.aux_get(
-                cpv, ["SLOT", "RDEPEND", "BUILD_TIME"]
+            slot, rdep_raw, build_time, use = bintree.dbapi.aux_get(
+                cpv, ["SLOT", "RDEPEND", "BUILD_TIME", "USE"]
             )
-            binpkgs_info.append((cpv, slot, rdep_raw, build_time))
+            binpkgs_info.append((cpv, slot, rdep_raw, build_time, use))
 
         try:
             self.binpkgs_db = self._BuildDB(
@@ -457,13 +491,13 @@
         except ValueError as e:
             raise self.BintreeError(str(e))
 
-    def _InitDepQueue(self):
+    def _InitDepQueue(self) -> None:
         """Initializes the dependency work queue."""
         self.queue = set()
         self.seen = {}
         self.listed = set()
 
-    def _EnqDep(self, dep, listed, optional):
+    def _EnqDep(self, dep: str, listed: bool, optional: bool) -> bool:
         """Enqueues a dependency if not seen before or if set non-optional."""
         if dep in self.seen and (optional or not self.seen[dep]):
             return False
@@ -474,7 +508,7 @@
             self.listed.add(dep)
         return True
 
-    def _DeqDep(self):
+    def _DeqDep(self) -> Tuple[str, bool, bool]:
         """Dequeues and returns a dependency, its listed and optional flags.
 
         This returns listed packages first, if any are present, to ensure that
@@ -490,7 +524,7 @@
 
         return dep, listed, self.seen[dep]
 
-    def _FindPackageMatches(self, cpv_pattern):
+    def _FindPackageMatches(self, cpv_pattern: str) -> List[Tuple[str, str]]:
         """Returns list of binpkg (CP, slot) pairs that match |cpv_pattern|.
 
         This is breaking |cpv_pattern| into its C, P and V components, each of
@@ -527,7 +561,7 @@
 
         return matches
 
-    def _FindPackage(self, pkg):
+    def _FindPackage(self, pkg: str) -> Tuple[str, str]:
         """Returns the (CP, slot) pair for a package matching |pkg|.
 
         Args:
@@ -560,7 +594,9 @@
 
         return matches[idx]
 
-    def _NeedsInstall(self, cpv, slot, build_time, optional):
+    def _NeedsInstall(
+        self, cpv: str, slot: str, build_time: int, optional: bool
+    ) -> Tuple[bool, bool, bool]:
         """Returns whether a package needs to be installed on the target.
 
         Args:
@@ -570,38 +606,71 @@
             optional: Whether package is optional on the target.
 
         Returns:
-            A tuple (install, update) indicating whether to |install| the
-            package and whether it is an |update| to an existing package.
+            A tuple (install, update, use_mismatch) indicating whether to
+            |install| the package, whether it is an |update| to an existing
+            package, and whether the package's USE flags mismatch the existing
+            package.
 
         Raises:
             ValueError: if slot is not provided.
         """
         # If not checking installed packages, always install.
         if not self.target_db:
-            return True, False
+            return True, False, False
 
         cp = self._GetCP(cpv)
         target_pkg_info = self.target_db.get(cp, dict()).get(slot)
         if target_pkg_info is not None:
-            if cpv != target_pkg_info.cpv:
-                attrs = package_info.SplitCPV(cpv)
-                target_attrs = package_info.SplitCPV(target_pkg_info.cpv)
-                logging.debug(
-                    "Updating %s: version (%s) different on target (%s)",
-                    cp,
-                    attrs.version,
-                    target_attrs.version,
-                )
-                return True, True
+            attrs = package_info.SplitCPV(cpv)
+            target_attrs = package_info.SplitCPV(target_pkg_info.cpv)
 
-            if build_time != target_pkg_info.build_time:
+            def _get_attr_mismatch(
+                attr_name: str, new_attr: any, target_attr: any
+            ) -> Tuple[str, str, str]:
+                """Check if the new and target packages differ for an attribute.
+
+                Args:
+                    attr_name: The name of the attribute being checked (string).
+                    new_attr: The value of the given attribute for the new
+                    package (string).
+                    target_attr: The value of the given attribute for the target
+                    (existing) package (string).
+
+                Returns:
+                    A tuple (attr_name, new_attr, target_attr) composed of the
+                    args if there is a mismatch, or None if the values match.
+                """
+                mismatch = new_attr != target_attr
+                if mismatch:
+                    return attr_name, new_attr, target_attr
+
+            update_info = _get_attr_mismatch(
+                "version", attrs.version, target_attrs.version
+            ) or _get_attr_mismatch(
+                "build time", build_time, target_pkg_info.build_time
+            )
+
+            if update_info:
+                attr_name, new_attr, target_attr = update_info
                 logging.debug(
-                    "Updating %s: build time (%s) different on target (%s)",
-                    cpv,
-                    build_time,
-                    target_pkg_info.build_time,
+                    "Updating %s: %s (%s) different on target (%s)",
+                    cp,
+                    attr_name,
+                    new_attr,
+                    target_attr,
                 )
-                return True, True
+
+                binpkg_pkg_info = self.binpkgs_db.get(cp, dict()).get(slot)
+                use_mismatch = binpkg_pkg_info.use != target_pkg_info.use
+                if use_mismatch:
+                    logging.warning(
+                        "USE flags for package %s do not match (Existing='%s', "
+                        "New='%s').",
+                        cp,
+                        target_pkg_info.use,
+                        binpkg_pkg_info.use,
+                    )
+                return True, True, use_mismatch
 
             logging.debug(
                 "Not updating %s: already up-to-date (%s, built %s)",
@@ -609,20 +678,20 @@
                 target_pkg_info.cpv,
                 target_pkg_info.build_time,
             )
-            return False, False
+            return False, False, False
 
         if optional:
             logging.debug(
                 "Not installing %s: missing on target but optional", cp
             )
-            return False, False
+            return False, False, False
 
         logging.debug(
             "Installing %s: missing on target and non-optional (%s)", cp, cpv
         )
-        return True, False
+        return True, False, False
 
-    def _ProcessDeps(self, deps, reverse):
+    def _ProcessDeps(self, deps: List[str], reverse: bool) -> None:
         """Enqueues dependencies for processing.
 
         Args:
@@ -647,7 +716,9 @@
         if num_already_seen:
             logging.debug("%d dep(s) already seen", num_already_seen)
 
-    def _ComputeInstalls(self, process_rdeps, process_rev_rdeps):
+    def _ComputeInstalls(
+        self, process_rdeps: bool, process_rev_rdeps: bool
+    ) -> Tuple[Dict[str, package_info.CPV], bool]:
         """Returns a dict of packages that need to be installed on the target.
 
         Args:
@@ -655,12 +726,16 @@
             process_rev_rdeps: Whether to trace backward dependencies as well.
 
         Returns:
-            A dictionary mapping CP values (string) to tuples containing
-            a CPV (string), a slot (string), a boolean indicating whether
-            the package was initially listed in the queue, and a boolean
-            indicating whether this is an update to an existing package.
+            A tuple (installs, warnings_shown) where |installs| is a dictionary
+            mapping CP values (string) to tuples containing a CPV (string), a
+            slot (string), a boolean indicating whether the package was
+            initially listed in the queue, and a boolean indicating whether this
+            is an update to an existing package, and |warnings_shown| is a
+            boolean indicating whether warnings were shown that might require a
+            prompt whether to continue.
         """
         installs = {}
+        warnings_shown = False
         while self.queue:
             dep, listed, optional = self._DeqDep()
             cp, required_slot = dep
@@ -683,13 +758,14 @@
                 num_processed += 1
                 logging.debug(" Checking %s...", pkg_info.cpv)
 
-                install, update = self._NeedsInstall(
+                install, update, use_mismatch = self._NeedsInstall(
                     pkg_info.cpv, slot, pkg_info.build_time, optional
                 )
                 if not install:
                     continue
 
                 installs[cp] = (pkg_info.cpv, slot, listed, update)
+                warnings_shown |= use_mismatch
 
                 # Add forward and backward runtime dependencies to queue.
                 if process_rdeps:
@@ -704,9 +780,9 @@
                     "No qualified bintree package corresponding to %s", cp
                 )
 
-        return installs
+        return installs, warnings_shown
 
-    def _SortInstalls(self, installs):
+    def _SortInstalls(self, installs: List[str]) -> List[str]:
         """Returns a sorted list of packages to install.
 
         Performs a topological sort based on dependencies found in the binary
@@ -725,7 +801,7 @@
         curr_path = []
         sorted_installs = []
 
-        def SortFrom(cp):
+        def SortFrom(cp: str) -> None:
             """Traverses deps recursively, emitting nodes in reverse order."""
             cpv, slot, _, _ = installs[cp]
             if cpv in curr_path:
@@ -748,7 +824,7 @@
 
         return sorted_installs
 
-    def _EnqListedPkg(self, pkg):
+    def _EnqListedPkg(self, pkg: str) -> bool:
         """Finds and enqueues a listed package."""
         cp, slot = self._FindPackage(pkg)
         if cp not in self.binpkgs_db:
@@ -757,7 +833,7 @@
             )
         self._EnqDep((cp, slot), True, False)
 
-    def _EnqInstalledPkgs(self):
+    def _EnqInstalledPkgs(self) -> None:
         """Enqueues all available binary packages that are already installed."""
         for cp, cp_slots in self.binpkgs_db.items():
             target_cp_slots = self.target_db.get(cp)
@@ -768,13 +844,13 @@
 
     def Run(
         self,
-        device,
-        root,
-        listed_pkgs,
-        update,
-        process_rdeps,
-        process_rev_rdeps,
-    ):
+        device: remote_access.RemoteDevice,
+        root: str,
+        listed_pkgs: List[str],
+        update: bool,
+        process_rdeps: bool,
+        process_rev_rdeps: bool,
+    ) -> Tuple[List[str], List[str], int, Dict[str, str], bool]:
         """Computes the list of packages that need to be installed on a target.
 
         Args:
@@ -786,14 +862,16 @@
             process_rev_rdeps: Whether to trace backward dependencies as well.
 
         Returns:
-            A tuple (sorted, listed, num_updates, install_attrs) where |sorted|
-            is a list of package CPVs (string) to install on the target
-            in an order that satisfies their inter-dependencies, |listed|
+            A tuple (sorted, listed, num_updates, install_attrs, warnings_shown)
+            where |sorted| is a list of package CPVs (string) to install on the
+            target in an order that satisfies their inter-dependencies, |listed|
             the subset that was requested by the user, and |num_updates|
             the number of packages being installed over preexisting
             versions. Note that installation order should be reversed for
             removal, |install_attrs| is a dictionary mapping a package
-            CPV (string) to some of its extracted environment attributes.
+            CPV (string) to some of its extracted environment attributes, and
+            |warnings_shown| is a boolean indicating whether warnings were shown
+            that might require a prompt whether to continue.
         """
         if process_rev_rdeps and not process_rdeps:
             raise ValueError(
@@ -827,7 +905,9 @@
                 self._EnqListedPkg(pkg)
 
         logging.info("Computing set of packages to install...")
-        installs = self._ComputeInstalls(process_rdeps, process_rev_rdeps)
+        installs, warnings_shown = self._ComputeInstalls(
+            process_rdeps, process_rev_rdeps
+        )
 
         num_updates = 0
         listed_installs = []
@@ -855,10 +935,21 @@
             if dlc_id and dlc_package:
                 install_attrs[pkg][_DLC_ID] = dlc_id
 
-        return sorted_installs, listed_installs, num_updates, install_attrs
+        return (
+            sorted_installs,
+            listed_installs,
+            num_updates,
+            install_attrs,
+            warnings_shown,
+        )
 
 
-def _Emerge(device, pkg_paths, root, extra_args=None):
+def _Emerge(
+    device: remote_access.RemoteDevice,
+    pkg_paths: List[str],
+    root: str,
+    extra_args: List[str] = None,
+) -> str:
     """Copies |pkg_paths| to |device| and emerges them.
 
     Args:
@@ -974,7 +1065,9 @@
         logging.notice("Packages have been installed.")
 
 
-def _RestoreSELinuxContext(device, pkgpath, root):
+def _RestoreSELinuxContext(
+    device: remote_access.RemoteDevice, pkgpath: str, root: str
+) -> None:
     """Restore SELinux context for files in a given package.
 
     This reads the tarball from pkgpath, and calls restorecon on device to
@@ -1010,7 +1103,9 @@
     )
 
 
-def _GetPackagesByCPV(cpvs, strip, sysroot):
+def _GetPackagesByCPV(
+    cpvs: List[package_info.CPV], strip: bool, sysroot: str
+) -> List[str]:
     """Returns paths to binary packages corresponding to |cpvs|.
 
     Args:
@@ -1058,7 +1153,7 @@
     return paths
 
 
-def _GetPackagesPaths(pkgs, strip, sysroot):
+def _GetPackagesPaths(pkgs: List[str], strip: bool, sysroot: str) -> List[str]:
     """Returns paths to binary |pkgs|.
 
     Args:
@@ -1073,7 +1168,9 @@
     return _GetPackagesByCPV(cpvs, strip, sysroot)
 
 
-def _Unmerge(device, pkgs, root):
+def _Unmerge(
+    device: remote_access.RemoteDevice, pkgs: List[str], root: str
+) -> None:
     """Unmerges |pkgs| on |device|.
 
     Args:
@@ -1107,7 +1204,7 @@
         logging.notice("Packages have been uninstalled.")
 
 
-def _ConfirmDeploy(num_updates):
+def _ConfirmDeploy(num_updates: int) -> bool:
     """Returns whether we can continue deployment."""
     if num_updates > _MAX_UPDATES_NUM:
         logging.warning(_MAX_UPDATES_WARNING)
@@ -1116,7 +1213,21 @@
     return True
 
 
-def _EmergePackages(pkgs, device, strip, sysroot, root, board, emerge_args):
+def _ConfirmUpdateDespiteWarnings() -> bool:
+    """Returns whether we can continue updating despite warnings."""
+    logging.warning("Continue despite prior warnings?")
+    return cros_build_lib.BooleanPrompt(default=False)
+
+
+def _EmergePackages(
+    pkgs: List[str],
+    device: remote_access.RemoteDevice,
+    strip: bool,
+    sysroot: str,
+    root: str,
+    board: str,
+    emerge_args: List[str],
+) -> None:
     """Call _Emerge for each package in pkgs."""
     if device.IsSELinuxAvailable():
         enforced = device.IsSELinuxEnforced()
@@ -1166,7 +1277,12 @@
         device.run(["restart", "dlcservice"])
 
 
-def _UnmergePackages(pkgs, device, root, pkgs_attrs):
+def _UnmergePackages(
+    pkgs: List[str],
+    device: remote_access.RemoteDevice,
+    root: str,
+    pkgs_attrs: Dict[str, List[str]],
+) -> str:
     """Call _Unmege for each package in pkgs."""
     dlc_uninstalled = False
     _Unmerge(device, pkgs, root)
@@ -1181,7 +1297,9 @@
         device.run(["restart", "dlcservice"])
 
 
-def _UninstallDLCImage(device, pkg_attrs):
+def _UninstallDLCImage(
+    device: remote_access.RemoteDevice, pkg_attrs: Dict[str, List[str]]
+):
     """Uninstall a DLC image."""
     if _DLC_ID in pkg_attrs:
         dlc_id = pkg_attrs[_DLC_ID]
@@ -1194,7 +1312,13 @@
         return False
 
 
-def _DeployDLCImage(device, sysroot, board, dlc_id, dlc_package):
+def _DeployDLCImage(
+    device: remote_access.RemoteDevice,
+    sysroot: str,
+    board: str,
+    dlc_id: str,
+    dlc_package: str,
+):
     """Deploy (install and mount) a DLC image.
 
     Args:
@@ -1298,8 +1422,8 @@
 
 
 def _DeployDLCLoadPin(
-    rootfs: os.PathLike, device: remote_access.ChromiumOSDevice
-):
+    rootfs: os.PathLike, device: remote_access.RemoteDevice
+) -> None:
     """Deploy DLC LoadPin from temp rootfs to device.
 
     Args:
@@ -1327,7 +1451,9 @@
             )
 
 
-def _GetDLCInfo(device, pkg_path, from_dut):
+def _GetDLCInfo(
+    device: remote_access.RemoteDevice, pkg_path: str, from_dut: bool
+) -> Tuple[str, str]:
     """Returns information of a DLC given its package path.
 
     Args:
@@ -1377,22 +1503,22 @@
 
 
 def Deploy(
-    device,
-    packages,
-    board=None,
-    emerge=True,
-    update=False,
-    deep=False,
-    deep_rev=False,
-    clean_binpkg=True,
-    root="/",
-    strip=True,
-    emerge_args=None,
-    ssh_private_key=None,
-    ping=True,
-    force=False,
-    dry_run=False,
-):
+    device: remote_access.RemoteDevice,
+    packages: List[str],
+    board: str = None,
+    emerge: bool = True,
+    update: bool = False,
+    deep: bool = False,
+    deep_rev: bool = False,
+    clean_binpkg: bool = True,
+    root: str = "/",
+    strip: bool = True,
+    emerge_args: List[str] = None,
+    ssh_private_key: str = None,
+    ping: bool = True,
+    force: bool = False,
+    dry_run: bool = False,
+) -> None:
     """Deploys packages to a device.
 
     Args:
@@ -1474,9 +1600,13 @@
 
             # Obtain list of packages to upgrade/remove.
             pkg_scanner = _InstallPackageScanner(sysroot)
-            pkgs, listed, num_updates, pkgs_attrs = pkg_scanner.Run(
-                device, root, packages, update, deep, deep_rev
-            )
+            (
+                pkgs,
+                listed,
+                num_updates,
+                pkgs_attrs,
+                warnings_shown,
+            ) = pkg_scanner.Run(device, root, packages, update, deep, deep_rev)
             if emerge:
                 action_str = "emerge"
             else:
@@ -1512,6 +1642,13 @@
             if dry_run or not _ConfirmDeploy(num_updates):
                 return
 
+            if (
+                warnings_shown
+                and not force
+                and not _ConfirmUpdateDespiteWarnings()
+            ):
+                return
+
             # Select function (emerge or unmerge) and bind args.
             if emerge:
                 func = functools.partial(