thunderbolt_loopback: Add retimer firmware check

Partner mentioned, that they will be flashing firmware of certain
version for the Burnside Bridge Retimer on Mass Production / shipping
devices.

We add a check to the factory test process to ensure that the retimer
firmware is actually the above mentioned version.

The retimer firmware version is readable from sysfs.

BUG=b:181360981, b:187264683
TEST=run FFT.TBTLoopback01
TEST=run FFT.TBTLoopback03

Change-Id: Ic71743470807e27091fa700f4059bfae5d89ca2f
Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/platform/factory/+/2886582
Reviewed-by: Yilin Yang (kerker) <kerker@chromium.org>
Reviewed-by: Ting-Hsuan Wang <phoebewang@chromium.org>
Tested-by: Cheng Yueh <cyueh@chromium.org>
Commit-Queue: Cheng Yueh <cyueh@chromium.org>
diff --git a/py/test/pytests/thunderbolt_loopback.py b/py/test/pytests/thunderbolt_loopback.py
index 3a6691c..8c1d607 100644
--- a/py/test/pytests/thunderbolt_loopback.py
+++ b/py/test/pytests/thunderbolt_loopback.py
@@ -64,6 +64,7 @@
   }
 """
 
+from distutils import version
 import logging
 import os
 import re
@@ -76,6 +77,7 @@
 from cros.factory.test.env import paths
 from cros.factory.test.i18n import _
 from cros.factory.testlog import testlog
+from cros.factory.test.rules import phase
 from cros.factory.test import server_proxy
 from cros.factory.test import session
 from cros.factory.test import test_case
@@ -85,6 +87,7 @@
 
 
 _LOOPBACK_TEST_PATH = '/sys/kernel/debug/thunderbolt'
+_RETIMER_VERSION_PATH = '/sys/bus/thunderbolt/devices/0-0:%s.1/nvm_version'
 _CONTROLLER_PORTS = ('0-1.*', '0-3.*', '1-1.*', '1-3.*')
 _RE_ADP = re.compile(r'^.*\d+-(\d+)\.\d+$')
 _RE_MARGIN_LOOPBACK = re.compile(
@@ -150,6 +153,12 @@
       Arg('lane_margining', bool, 'Collet lane margining data.', default=False),
       Arg('lane_margining_timeout_secs', (int, float),
           'Timeout for colleting lane margining data.', default=10),
+      Arg('min_retimer_version', str,
+          ('The minimum Retimer firmware version. Set to null to disable the '
+           'check.'), default=None),
+      Arg('max_retimer_version', str,
+          ('The maximum Retimer firmware version. Set to null to disable the '
+           'check.'), default=None),
   ]
 
   def setUp(self):
@@ -286,12 +295,13 @@
     logging.info('echo %s > %s', content, filename)
     self._dut.WriteFile(filename, content)
 
-  def _TestLaneMargining(self, device_path, log_result):
-    match = _RE_ADP.match(device_path)
-    if not match:
-      raise Exception('device_path is not in expected format.')
-    ADP = match.group(1)
-    session.console.info('The ADP is at %r.', ADP)
+  def _TestLaneMargining(self, ADP: str, log_result: dict):
+    """Uses tdtl tool to collect lane margining data.
+
+    Args:
+      ADP: A string we pass to tdtl tool.
+      log_result: A dict to save the result.
+    """
     log_result.update({'ADP': int(ADP)})
     # self._dut.CheckOutput do not support env and timeout
     # process_utils.Spawn do not support timeout
@@ -373,6 +383,36 @@
     self.ui.SetState(_('Insert the loopback card.'))
     device_path = sync_utils.WaitFor(self._FindLoopbackPath,
                                      self.args.timeout_secs, poll_interval=0.5)
+    match = _RE_ADP.match(device_path)
+    if not match:
+      raise Exception('device_path is not in expected format.')
+    ADP = match.group(1)
+    session.console.info('The ADP is at %r.', ADP)
+
+    phase.AssertStartingAtPhase(phase.PVT, self.args.min_retimer_version,
+                                'min_retimer_version must be specified.')
+    retimer_version = None
+    if self.args.min_retimer_version or self.args.max_retimer_version:
+      retimer_version_path = _RETIMER_VERSION_PATH % ADP
+      logging.info('cat %s', retimer_version_path)
+      # We need to wait 20 seconds. See b/181360981#comment6.
+      version_string = sync_utils.WaitFor(
+          lambda: self._dut.ReadFile(retimer_version_path), 21, poll_interval=1)
+      retimer_version = version.LooseVersion(version_string.strip())
+      logging.info('retimer_version %s', retimer_version)
+
+    if self.args.min_retimer_version:
+      min_retimer_version = version.LooseVersion(self.args.min_retimer_version)
+      if retimer_version < min_retimer_version:
+        raise RuntimeError('retimer_version %s < min_retimer_version %s' %
+                           (retimer_version, min_retimer_version))
+
+    if self.args.max_retimer_version:
+      max_retimer_version = version.LooseVersion(self.args.max_retimer_version)
+      if retimer_version > max_retimer_version:
+        raise RuntimeError('retimer_version %s > max_retimer_version %s' %
+                           (retimer_version, max_retimer_version))
+
     self.ui.SetState(_('Test is in progress, please do not move the device.'))
     session.console.info('The loopback card path is at %r.', device_path)
     device_test_path = self._dut.path.join(device_path, _DMA_TEST)
@@ -416,7 +456,7 @@
       try:
         stop_lane_margining_timer = self.ui.StartCountdownTimer(
             self.args.lane_margining_timeout_secs)
-        self._TestLaneMargining(device_path, log_result)
+        self._TestLaneMargining(ADP, log_result)
         stop_lane_margining_timer.set()
       except subprocess.TimeoutExpired:
         logging.exception('_TestLaneMargining timeout')