(1 of 2) Refactor and move Autotest downloader to the Dev Server.

Added two new Dev Server handlers:
1. Given GS url, download and install the build image.
2. Return the control file for a particular build.

BUG=chromium-os:22954
TEST=manual

Change-Id: Id3914a5ee429bea8c9b64fe74a21958459669a20
Reviewed-on: https://gerrit.chromium.org/gerrit/12802
Commit-Ready: Frank Farzan <frankf@chromium.org>
Reviewed-by: Frank Farzan <frankf@chromium.org>
Tested-by: Frank Farzan <frankf@chromium.org>
diff --git a/devserver_util_test.py b/devserver_util_test.py
new file mode 100644
index 0000000..dd09082
--- /dev/null
+++ b/devserver_util_test.py
@@ -0,0 +1,260 @@
+#!/usr/bin/python
+#
+# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""Unit tests for devserver_util module."""
+
+import os
+import shutil
+import tempfile
+import unittest
+
+import devserver_util
+
+
+# Fake Dev Server Layout:
+TEST_LAYOUT = {
+    'test-board-1': ['R17-1413.0.0-a1-b1346', 'R17-18.0.0-a1-b1346'],
+    'test-board-2': ['R16-2241.0.0-a0-b2', 'R17-2.0.0-a1-b1346']
+}
+
+
+class DevServerUtilTest(unittest.TestCase):
+
+  def setUp(self):
+    self._static_dir = tempfile.mkdtemp()
+    self._outside_sandbox_dir = tempfile.mkdtemp()
+
+    for board, builds in TEST_LAYOUT.iteritems():
+      board_path = os.path.join(self._static_dir, board)
+      os.mkdir(board_path)
+      for build in builds:
+        build_path = os.path.join(board_path, build)
+        os.mkdir(build_path)
+        with open(os.path.join(build_path,
+                               devserver_util.TEST_IMAGE), 'w') as f:
+          f.write('TEST_IMAGE')
+        with open(os.path.join(build_path,
+                               devserver_util.STATEFUL_UPDATE), 'w') as f:
+          f.write('STATEFUL_UPDATE')
+        with open(os.path.join(build_path,
+                               devserver_util.ROOT_UPDATE), 'w') as f:
+          f.write('ROOT_UPDATE')
+        # AU payloads.
+        au_dir = os.path.join(build_path, devserver_util.AU_BASE)
+        nton_dir = os.path.join(au_dir, build + devserver_util.NTON_DIR_SUFFIX)
+        os.makedirs(nton_dir)
+        with open(os.path.join(nton_dir,
+                               devserver_util.ROOT_UPDATE), 'w') as f:
+          f.write('ROOT_UPDATE')
+        mton_dir = os.path.join(au_dir, build + devserver_util.MTON_DIR_SUFFIX)
+        os.makedirs(mton_dir)
+        with open(os.path.join(mton_dir,
+                               devserver_util.ROOT_UPDATE), 'w') as f:
+          f.write('ROOT_UPDATE')
+
+  def tearDown(self):
+    shutil.rmtree(self._static_dir)
+    shutil.rmtree(self._outside_sandbox_dir)
+
+  def testParsePayloadList(self):
+    archive_url_prefix = ('gs://chromeos-image-archive/x86-mario-release/'
+                          'R17-1413.0.0-a1-b1346/')
+    mton_url = (archive_url_prefix + 'chromeos_R17-1412.0.0-a1-b1345_'
+                'R17-1413.0.0-a1_x86-mario_delta_dev.bin')
+    nton_url = (archive_url_prefix + 'chromeos_R17-1413.0.0-a1_'
+                'R17-1413.0.0-a1_x86-mario_delta_dev.bin')
+    full_url = (archive_url_prefix + 'chromeos_R17-1413.0.0-a1_'
+                'x86-mario_full_dev.bin')
+    full_url_out, nton_url_out, mton_url_out = (
+        devserver_util.ParsePayloadList([full_url, nton_url, mton_url]))
+    self.assertEqual([full_url, nton_url, mton_url],
+                     [full_url_out, nton_url_out, mton_url_out])
+
+    archive_url_prefix = ('gs://chromeos-image-archive/x86-alex_he-release/'
+                          'R18-1420.0.0-a1-b541')
+    mton_url = (archive_url_prefix + 'chromeos_R18-1418.0.0-a1-b540_'
+                'R18-1420.0.0-a1_x86-alex_he_delta_dev.bin')
+    nton_url = (archive_url_prefix + 'chromeos_R18-1420.0.0-a1_'
+                'R18-1420.0.0-a1_x86-alex_he_delta_dev.bin')
+    full_url = (archive_url_prefix + 'chromeos_R18-1420.0.0-a1_'
+                'x86-alex_he_full_dev.bin')
+    full_url_out, nton_url_out, mton_url_out = (
+        devserver_util.ParsePayloadList([full_url, nton_url, mton_url]))
+    self.assertEqual([full_url, nton_url, mton_url],
+                     [full_url_out, nton_url_out, mton_url_out])
+
+  def testDownloadBuildFromGS(self):
+    self.fail('Not implemented.')
+
+  def testInstallBuild(self):
+    self.fail('Not implemented.')
+
+  def testPrepareAutotestPkgs(self):
+    self.fail('Not implemented.')
+
+  def testSafeSandboxAccess(self):
+    # Path is in sandbox.
+    self.assertTrue(
+        devserver_util.SafeSandboxAccess(
+            self._static_dir, os.path.join(self._static_dir, 'some-board')))
+
+    # Path is sandbox.
+    self.assertFalse(
+        devserver_util.SafeSandboxAccess(self._static_dir, self._static_dir))
+
+    # Path is outside the sandbox.
+    self.assertFalse(
+        devserver_util.SafeSandboxAccess(self._static_dir,
+                                         self._outside_sandbox_dir))
+
+    # Path contains '..'.
+    self.assertFalse(
+        devserver_util.SafeSandboxAccess(
+            self._static_dir, os.path.join(self._static_dir, os.pardir)))
+
+    # Path contains symbolic link references.
+    os.chdir(self._static_dir)
+    os.symlink(os.pardir, 'parent')
+    self.assertFalse(
+        devserver_util.SafeSandboxAccess(
+            self._static_dir, os.path.join(self._static_dir, os.pardir)))
+
+  def testAcquireReleaseLocks(self):
+    # Successful lock and unlock.
+    lock_file = devserver_util.AcquireLock(self._static_dir, 'test-lock')
+    self.assertTrue(os.path.exists(lock_file))
+    devserver_util.ReleaseLock(self._static_dir, 'test-lock')
+    self.assertFalse(os.path.exists(lock_file))
+
+    # Attempt to lock an existing directory.
+    devserver_util.AcquireLock(self._static_dir, 'test-lock')
+    self.assertRaises(devserver_util.DevServerUtilError,
+                      devserver_util.AcquireLock, self._static_dir, 'test-lock')
+
+  def testFindMatchingBoards(self):
+    for key in TEST_LAYOUT:
+      # Partial match with multiple boards.
+      self.assertEqual(
+          set(devserver_util.FindMatchingBoards(self._static_dir, key[:-5])),
+          set(TEST_LAYOUT.keys()))
+
+      # Absolute match.
+      self.assertEqual(
+          devserver_util.FindMatchingBoards(self._static_dir, key), [key])
+
+    # Invalid partial match.
+    self.assertEqual(
+        devserver_util.FindMatchingBoards(self._static_dir, 'asdfsadf'), [])
+
+  def testFindMatchingBuilds(self):
+    # Try a partial board and build match with single match.
+    self.assertEqual(
+        devserver_util.FindMatchingBuilds(self._static_dir, 'test-board',
+                                          'R17-1413'),
+        [('test-board-1', 'R17-1413.0.0-a1-b1346')])
+
+    # Try a partial board and build match with multiple match.
+    actual = set(devserver_util.FindMatchingBuilds(
+        self._static_dir, 'test-board', 'R17'))
+    expected = set([('test-board-1', 'R17-1413.0.0-a1-b1346'),
+                    ('test-board-1', 'R17-18.0.0-a1-b1346'),
+                    ('test-board-2', 'R17-2.0.0-a1-b1346')])
+    self.assertEqual(actual, expected)
+
+  def testGetLatestBuildVersion(self):
+    self.assertEqual(
+        devserver_util.GetLatestBuildVersion(self._static_dir, 'test-board-1'),
+        'R17-1413.0.0-a1-b1346')
+
+  def testFindBuild(self):
+    # Ensure no matching boards raises exception for latest.
+    self.assertRaises(
+        devserver_util.DevServerUtilError, devserver_util.FindBuild,
+        self._static_dir, 'aasdf', 'latest')
+
+    # Ensure no matching builds or boards raises an exception.
+    self.assertRaises(
+        devserver_util.DevServerUtilError, devserver_util.FindBuild,
+        self._static_dir, 'aasdf', 'asdgsadg')
+
+    # Ensure latest returns the proper build.
+    self.assertEqual(
+        devserver_util.FindBuild(self._static_dir, 'test-board-1', 'latest'),
+        ('test-board-1', 'R17-1413.0.0-a1-b1346'))
+
+    # Ensure specific board, build is returned.
+    self.assertEqual(
+        devserver_util.FindBuild(self._static_dir, 'test-board-1',
+                                 'R17-18.0.0-a1-b1346'),
+        ('test-board-1', 'R17-18.0.0-a1-b1346'))
+
+    # Ensure too many matches raises an exception.
+    self.assertRaises(
+        devserver_util.DevServerUtilError, devserver_util.FindBuild,
+        self._static_dir, 'test-board', 'R17')
+
+  def testCloneBuild(self):
+    test_prefix = 'abc'
+    test_tag = test_prefix + '/123'
+    abc_path = os.path.join(self._static_dir, devserver_util.DEV_BUILD_PREFIX,
+                            test_tag)
+
+    os.mkdir(os.path.join(self._static_dir, test_prefix))
+
+    # Verify leaf path is created and proper values returned.
+    board, builds = TEST_LAYOUT.items()[0]
+    dev_build = devserver_util.CloneBuild(self._static_dir, board, builds[0],
+                                          test_tag)
+    self.assertEquals(dev_build, abc_path)
+    self.assertTrue(os.path.exists(abc_path))
+    self.assertTrue(os.path.isfile(os.path.join(
+        abc_path, devserver_util.TEST_IMAGE)))
+    self.assertTrue(os.path.isfile(os.path.join(
+        abc_path, devserver_util.ROOT_UPDATE)))
+    self.assertTrue(os.path.isfile(os.path.join(
+        abc_path, devserver_util.STATEFUL_UPDATE)))
+
+    # Verify force properly removes the old directory.
+    junk_path = os.path.join(dev_build, 'junk')
+    with open(junk_path, 'w') as f:
+      f.write('hello!')
+    remote_dir = devserver_util.CloneBuild(
+        self._static_dir, board, builds[0], test_tag, force=True)
+    self.assertEquals(remote_dir, abc_path)
+    self.assertTrue(os.path.exists(abc_path))
+    self.assertTrue(os.path.isfile(os.path.join(
+        abc_path, devserver_util.TEST_IMAGE)))
+    self.assertTrue(os.path.isfile(os.path.join(
+        abc_path, devserver_util.ROOT_UPDATE)))
+    self.assertTrue(os.path.isfile(os.path.join(
+        abc_path, devserver_util.STATEFUL_UPDATE)))
+    self.assertFalse(os.path.exists(junk_path))
+
+  def testGetControlFile(self):
+    control_file_dir = os.path.join(
+        self._static_dir, 'test-board-1', 'R17-1413.0.0-a1-b1346', 'server',
+        'site_tests', 'network_VPN')
+    os.makedirs(control_file_dir)
+    with open(os.path.join(control_file_dir, 'control'), 'w') as f:
+      f.write('hello!')
+
+    control_content = devserver_util.GetControlFile(
+        self._static_dir, 'test-board-1', 'R17-1413.0.0-a1-b1346',
+        os.path.join('server', 'site_tests', 'network_VPN', 'control'))
+    self.assertEqual(control_content, 'hello!')
+
+  def testListAutoupdateTargets(self):
+    for board, builds in TEST_LAYOUT.iteritems():
+      for build in builds:
+        au_targets = devserver_util.ListAutoupdateTargets(self._static_dir,
+                                                          board, build)
+        self.assertEqual(set(au_targets),
+                         set([build + devserver_util.NTON_DIR_SUFFIX,
+                              build + devserver_util.MTON_DIR_SUFFIX]))
+
+
+if __name__ == '__main__':
+  unittest.main()