Support for rlz_brand_code and customization_id in factory flow.

BUG=chrome-os-partner:28445
TEST=Unit tests, manually tested on device

Change-Id: I3a0ac1b0aa3ab06a914e3015aa93533ad62243c7
Reviewed-on: https://chromium-review.googlesource.com/197475
Reviewed-by: Jon Salz <jsalz@chromium.org>
Commit-Queue: Jon Salz <jsalz@chromium.org>
Tested-by: Jon Salz <jsalz@chromium.org>
diff --git a/py/gooftool/__init__.py b/py/gooftool/__init__.py
index 736a719..ccdf722 100644
--- a/py/gooftool/__init__.py
+++ b/py/gooftool/__init__.py
@@ -28,6 +28,8 @@
 from cros.factory.privacy import FilterDict
 from cros.factory.rule import Context
 from cros.factory.system import vpd
+from cros.factory.test import branding
+from cros.factory.tools.mount_partition import MountPartition
 from cros.factory.utils.process_utils import CheckOutput, GetLines
 from cros.factory.utils.string_utils import ParseDict
 
@@ -404,6 +406,51 @@
     if any(tpm_status[k] != v for k, v in tpm_cleared_status.iteritems()):
       raise Error, 'TPM is not cleared.'
 
+  def VerifyBranding(self):
+    """Verify that branding fields are properly set.
+
+    Returns:
+      A dictionary containing rlz_brand_code and customization_id fields,
+      for testing.
+    """
+    ro_vpd = vpd.ro.GetAll()
+
+    customization_id = ro_vpd.get('customization_id')
+    logging.info('RO VPD customization_id: %r', customization_id)
+    if customization_id is not None:
+      if not branding.CUSTOMIZATION_ID_REGEXP.match(customization_id):
+        raise ValueError('Bad format for customization_id %r in RO VPD '
+                         '(expected it to match regexp %r)' % (
+            customization_id, branding.CUSTOMIZATION_ID_REGEXP.pattern))
+
+    rlz_brand_code = ro_vpd.get('rlz_brand_code')
+
+    logging.info('RO VPD rlz_brand_code: %r', rlz_brand_code)
+    if rlz_brand_code is None:
+      # It must be present as BRAND_CODE_PATH in rootfs.
+      with MountPartition(
+          self._util.GetReleaseRootPartitionPath()) as mount_path:
+        path = os.path.join(mount_path, branding.BRAND_CODE_PATH.lstrip('/'))
+        if not os.path.exists(path):
+          raise ValueError('rlz_brand_code is not present in RO VPD, and %s '
+                           'does not exist in release rootfs' % (
+              branding.BRAND_CODE_PATH))
+        with open(path) as f:
+          rlz_brand_code = f.read().strip()
+          logging.info('rlz_brand_code from rootfs: %r', rlz_brand_code)
+      rlz_brand_code_source = 'release_rootfs'
+    else:
+      rlz_brand_code_source = 'RO VPD'
+
+    if not branding.RLZ_BRAND_CODE_REGEXP.match(rlz_brand_code):
+      raise ValueError('Bad format for rlz_brand_code %r in %s '
+                       '(expected it to match regexp %r)' % (
+          rlz_brand_code, rlz_brand_code_source,
+          branding.CUSTOMIZATION_ID_REGEXP.pattern))
+
+    return dict(rlz_brand_code=rlz_brand_code,
+                customization_id=customization_id)
+
   def ClearGBBFlags(self):
     """Zero out the GBB flags, in preparation for transition to release state.
 
diff --git a/py/gooftool/gooftool.py b/py/gooftool/gooftool.py
index 8fbf247..971197c 100755
--- a/py/gooftool/gooftool.py
+++ b/py/gooftool/gooftool.py
@@ -544,6 +544,18 @@
     event_log.Log('switch_dev', type='virtual switch')
 
 
+@Command('verify_branding')
+def VerifyBranding(options):  # pylint: disable=W0613
+  """Verify that branding fields are properly set.
+
+  customization_id, if set in the RO VPD, must be of the correct format.
+
+  rlz_brand_code must be set either in the RO VPD or OEM partition, and must
+  be of the correct format.
+  """
+  return GetGooftool(options).VerifyBranding()
+
+
 @Command('write_protect')
 def EnableFwWp(options):  # pylint: disable=W0613
   """Enable then verify firmware write protection."""
@@ -654,6 +666,7 @@
   VerifyKeys(options)
   VerifyRootFs(options)
   VerifyTPM(options)
+  VerifyBranding(options)
 
 @Command('untar_stateful_files')
 def UntarStatefulFiles(dummy_options):
diff --git a/py/gooftool/gooftool_unittest.py b/py/gooftool/gooftool_unittest.py
index ca850da..22f0bb8 100755
--- a/py/gooftool/gooftool_unittest.py
+++ b/py/gooftool/gooftool_unittest.py
@@ -13,6 +13,7 @@
 import unittest
 
 from collections import namedtuple
+from contextlib import contextmanager
 from tempfile import NamedTemporaryFile
 
 import factory_common  # pylint: disable=W0611
@@ -27,6 +28,9 @@
 from cros.factory.hwdb.hwid_tool import ProbeResults  # pylint: disable=E0611
 from cros.factory.gooftool import Mismatch
 from cros.factory.gooftool import ProbedComponentResult
+from cros.factory.system import vpd
+from cros.factory.test import branding
+from cros.factory.utils import file_utils
 from cros.factory.utils.process_utils import CheckOutput
 
 _TEST_DATA_PATH = os.path.join(os.path.dirname(__file__), 'testdata')
@@ -356,6 +360,79 @@
     self._gooftool.VerifyWPSwitch()
     self.assertRaises(Error, self._gooftool.VerifyWPSwitch)
 
+  def _SetupBrandingMocks(self, ro_vpd, fake_rootfs_path):
+    """Set up mocks for VerifyBranding tests.
+
+    Args:
+      ro_vpd: The dictionary to use for the RO VPD.
+      fake_rootfs_path: A path at which we pretend to mount the release rootfs.
+    """
+
+    # Fake partition to return from MountPartition mock.
+    @contextmanager
+    def MockPartition(path):
+      yield path
+
+    self.mox.StubOutWithMock(vpd.ro, "GetAll")
+    self.mox.StubOutWithMock(gooftool, "MountPartition")
+
+    vpd.ro.GetAll().AndReturn(ro_vpd)
+    if fake_rootfs_path:
+      # Pretend that '/dev/rel' is the release rootfs path.
+      self._gooftool._util.GetReleaseRootPartitionPath().AndReturn('/dev/rel')
+      # When '/dev/rel' is mounted, return a context manager yielding
+      # fake_rootfs_path.
+      gooftool.MountPartition('/dev/rel').AndReturn(
+          MockPartition(fake_rootfs_path))
+
+  def testVerifyBranding_NoBrandCode(self):
+    self._SetupBrandingMocks({}, '/doesntexist')
+    self.mox.ReplayAll()
+    # Should fail, since rlz_brand_code isn't present anywhere
+    self.assertRaisesRegexp(ValueError, 'rlz_brand_code is not present',
+                            self._gooftool.VerifyBranding)
+
+  def testVerifyBranding_AllInVPD(self):
+    self._SetupBrandingMocks(
+        dict(rlz_brand_code='ABCD', customization_id='FOO'), None)
+    self.mox.ReplayAll()
+    self.assertEquals(dict(rlz_brand_code='ABCD', customization_id='FOO'),
+                      self._gooftool.VerifyBranding())
+
+  def testVerifyBranding_BrandCodeInVPD(self):
+    self._SetupBrandingMocks(dict(rlz_brand_code='ABCD'), None)
+    self.mox.ReplayAll()
+    self.assertEquals(dict(rlz_brand_code='ABCD', customization_id=None),
+                      self._gooftool.VerifyBranding())
+
+  def testVerifyBranding_BrandCodeInRootFS(self):
+    with file_utils.TempDirectory() as tmp:
+      # Create a /opt/oem/etc/BRAND_CODE file within the fake mounted rootfs.
+      rlz_brand_code_path = os.path.join(
+          tmp, branding.BRAND_CODE_PATH.lstrip('/'))
+      file_utils.TryMakeDirs(os.path.dirname(rlz_brand_code_path))
+      with open(rlz_brand_code_path, 'w') as f:
+        f.write('ABCD')
+
+      self._SetupBrandingMocks({}, tmp)
+      self.mox.ReplayAll()
+      self.assertEquals(dict(rlz_brand_code='ABCD', customization_id=None),
+                        self._gooftool.VerifyBranding())
+
+  def testVerifyBranding_BadBrandCode(self):
+    self._SetupBrandingMocks(dict(rlz_brand_code='ABCDx',
+                                  customization_id='FOO'), None)
+    self.mox.ReplayAll()
+    self.assertRaisesRegexp(ValueError, 'Bad format for rlz_brand_code',
+                            self._gooftool.VerifyBranding)
+
+  def testVerifyBranding_BadConfigurationId(self):
+    self._SetupBrandingMocks(dict(rlz_brand_code='ABCD',
+                                  customization_id='FOOx'), None)
+    self.mox.ReplayAll()
+    self.assertRaisesRegexp(ValueError, 'Bad format for customization_id',
+                            self._gooftool.VerifyBranding)
+
   def testCheckDevSwitchForDisabling(self):
     # 1st call: virtual switch
     self._gooftool._util.GetVBSharedDataFlags().AndReturn(0x400)
diff --git a/py/test/branding.py b/py/test/branding.py
new file mode 100644
index 0000000..1f2edb4
--- /dev/null
+++ b/py/test/branding.py
@@ -0,0 +1,17 @@
+#!/usr/bin/python
+#
+# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+
+"""Constants for branding parameters (rlz_brand_id, customization_id)."""
+
+
+import re
+
+
+RLZ_BRAND_CODE_REGEXP = re.compile('^[A-Z]{4}$')
+CUSTOMIZATION_ID_REGEXP = re.compile('^[A-Z0-9]+(-[A-Z0-9]+)?$')
+
+BRAND_CODE_PATH = '/opt/oem/etc/BRAND_CODE'
diff --git a/py/test/pytests/vpd.py b/py/test/pytests/vpd.py
index 8995dc1..a4a12c9 100644
--- a/py/test/pytests/vpd.py
+++ b/py/test/pytests/vpd.py
@@ -31,6 +31,7 @@
 
 from cros.factory import cros_locale
 from cros.factory.l10n.regions import REGIONS
+from cros.factory.test import branding
 from cros.factory.test import factory
 from cros.factory.test import registration_codes
 from cros.factory.test import shopfloor
@@ -99,6 +100,10 @@
 
 _REGEX_TYPE = type(re.compile(''))
 
+# String to indicate that rlz_brand_code and customization_id should
+# come from device data.
+FROM_DEVICE_DATA = 'FROM_DEVICE_DATA'
+
 class WriteVPDTask(FactoryTask):
   """A task to write VPD.
 
@@ -383,8 +388,24 @@
         'all the possible values will be used to let user select a value from '
         'it.', default=[], optional=True),
     Arg('allow_multiple_l10n', bool, 'True to allow multiple locales and '
-        'keyboards.  Only supported only in M34+ FSIs, so this is disabled '
+        'keyboards.  Fully supported only in M35+ FSIs, so this is disabled '
         'by default', default=False, optional=True),
+    Arg('rlz_brand_code', str,
+        'RLZ brand code to write to RO VPD.  This may be any of:\n'
+        '\n'
+        '- A fixed string\n'
+        '- None, to not set any value at all\n'
+        '- The string `"FROM_DEVICE_DATA"`, to use a value obtained from\n'
+        '  device data.',
+        default=None, optional=True),
+    Arg('customization_id', str,
+        'Customization ID to write to RO VPD.  This may be any of:\n'
+        '\n'
+        '- A fixed string\n'
+        '- None, to not set any value at all\n'
+        '- The string `"FROM_DEVICE_DATA"`, to use a value obtained from\n'
+        '  device data.',
+        default=None, optional=True),
   ]
 
   def _ReadShopFloorDeviceData(self):
@@ -466,7 +487,40 @@
         for vpd_section, key_value_dict in (
             self.args.override_vpd_entries.iteritems()):
           self.vpd[vpd_section].update(key_value_dict)
+
+    self.ReadBrandingFields()
+
     self.tasks += [WriteVPDTask(self)]
 
+  def ReadBrandingFields(self):
+    cached_device_data = None
+
+    for attr, regexp in (
+      ('rlz_brand_code', branding.RLZ_BRAND_CODE_REGEXP),
+      ('customization_id', branding.CUSTOMIZATION_ID_REGEXP)):
+      arg_value = getattr(self.args, attr)
+
+      if arg_value is None:
+        continue
+
+      if arg_value == FROM_DEVICE_DATA:
+        if cached_device_data is None:
+          cached_device_data = shopfloor.GetDeviceData()
+        value = cached_device_data.get(attr)
+        if value is None:
+          raise ValueError('%s not present in device data' % attr)
+      else:
+        # Fixed string; just use the value directly.
+        value = arg_value
+
+      # Check the format.
+      if not regexp.match(value):
+        raise ValueError('Bad format for %s %r '
+                         '(expected it to match regexp %r)' % (
+            attr, value, regexp.pattern))
+
+      # We're good to go!
+      self.vpd['ro'][attr] = value
+
   def runTest(self):
     FactoryTaskManager(self.ui, self.tasks).Run()
diff --git a/py/test/pytests/vpd_unittest.py b/py/test/pytests/vpd_unittest.py
new file mode 100755
index 0000000..e52cf59
--- /dev/null
+++ b/py/test/pytests/vpd_unittest.py
@@ -0,0 +1,73 @@
+#!/usr/bin/python
+
+# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import mox
+import unittest
+
+import factory_common  # pylint: disable=W0611
+
+from cros.factory.common import Obj
+from cros.factory.test import shopfloor
+from cros.factory.test.pytests import vpd
+
+
+class VPDBrandingFieldsTest(unittest.TestCase):
+  def setUp(self):
+    self.test_case = vpd.VPDTest()
+    self.test_case.vpd = dict(ro={})
+    self.device_data = {}
+    self.mox = mox.Mox()
+    self.mox.StubOutWithMock(shopfloor, 'GetDeviceData')
+
+  def tearDown(self):
+    self.mox.VerifyAll()
+    self.mox.UnsetStubs()
+
+  def testFixed(self):
+    self.test_case.args = Obj(rlz_brand_code='ABCD', customization_id='FOO')
+    self.mox.ReplayAll()
+    self.test_case.ReadBrandingFields()
+    self.assertEquals(dict(rlz_brand_code='ABCD', customization_id='FOO'),
+                      self.test_case.vpd['ro'])
+
+  def testBrandCodeOnly(self):
+    self.test_case.args = Obj(rlz_brand_code='ABCD', customization_id=None)
+    self.mox.ReplayAll()
+    self.test_case.ReadBrandingFields()
+    self.assertEquals(dict(rlz_brand_code='ABCD'), self.test_case.vpd['ro'])
+
+  def testConfigurationIdOnly(self):
+    self.test_case.args = Obj(rlz_brand_code=None, customization_id='FOO')
+    self.mox.ReplayAll()
+    self.test_case.ReadBrandingFields()
+    self.assertEquals(dict(customization_id='FOO'), self.test_case.vpd['ro'])
+
+  def testFromShopFloor(self):
+    self.test_case.args = Obj(rlz_brand_code=vpd.FROM_DEVICE_DATA,
+                              customization_id=vpd.FROM_DEVICE_DATA)
+    shopfloor.GetDeviceData().AndReturn(dict(rlz_brand_code='ABCD',
+                                             customization_id='FOO-BAR'))
+    self.mox.ReplayAll()
+    self.test_case.ReadBrandingFields()
+    self.assertEquals(dict(rlz_brand_code='ABCD', customization_id='FOO-BAR'),
+                      self.test_case.vpd['ro'])
+
+  def testBadBrandCode(self):
+    self.test_case.args = Obj(rlz_brand_code='ABCDx')
+    self.mox.ReplayAll()
+    self.assertRaisesRegexp(ValueError, 'Bad format for rlz_brand_code',
+                            self.test_case.ReadBrandingFields)
+
+  def testBadConfigurationId(self):
+    self.test_case.args = Obj(rlz_brand_code=None, customization_id='FOO-BARx')
+    self.mox.ReplayAll()
+    self.assertRaisesRegexp(ValueError, 'Bad format for customization_id',
+                            self.test_case.ReadBrandingFields)
+
+
+
+if __name__ == '__main__':
+  unittest.main()
diff --git a/py/test/test_lists/generic.py b/py/test/test_lists/generic.py
index 60449f4..a4cdd86 100644
--- a/py/test/test_lists/generic.py
+++ b/py/test/test_lists/generic.py
@@ -82,6 +82,18 @@
   min_charge_pct = 87
   max_charge_pct = 88
 
+  # A value that may be used for rlz_brand_code or customization_id to indicate
+  # that these values should be read from device data.
+  FROM_DEVICE_DATA = 'FROM_DEVICE_DATA'
+
+  # How to obtain the rlz_brand_code and customization_id VPD values.
+  # See the "Branding" page in the documentation bundle for more
+  # information.  For testing, you can use rlz_brand_code = 'ZZCR' and
+  # customization_id = None.  Note that this is only supported in M35
+  # and above.
+  rlz_brand_code = None
+  customization_id = None
+
   #####
   #
   # Parameters for SMT (surface-mount technology) tests.
diff --git a/py/test/test_lists/generic_run_in.py b/py/test/test_lists/generic_run_in.py
index 4011d9c..d1ec85b 100644
--- a/py/test/test_lists/generic_run_in.py
+++ b/py/test/test_lists/generic_run_in.py
@@ -304,6 +304,8 @@
             pytest_name='vpd',
             dargs=dict(
                 use_shopfloor_device_data=True,
+                rlz_brand_code=args.rlz_brand_code,
+                customization_id=args.customization_id,
                 extra_device_data_fields=[('ro', 'color')]))
 
         # For 3G model only. Some modem can only do testing in Generic UMTS