Alex Klein | c05f3d1 | 2019-05-29 14:16:21 -0600 | [diff] [blame] | 1 | # -*- coding: utf-8 -*- |
| 2 | # Copyright 2019 The Chromium OS Authors. All rights reserved. |
| 3 | # Use of this source code is governed by a BSD-style license that can be |
| 4 | # found in the LICENSE file. |
| 5 | |
| 6 | """field_handler module tests.""" |
| 7 | |
| 8 | from __future__ import print_function |
| 9 | |
| 10 | import os |
| 11 | |
| 12 | from chromite.api import field_handler |
| 13 | from chromite.api.gen.chromite.api import build_api_test_pb2 |
| 14 | from chromite.api.gen.chromiumos import common_pb2 |
| 15 | from chromite.lib import chroot_lib |
| 16 | from chromite.lib import cros_test_lib |
| 17 | from chromite.lib import osutils |
| 18 | |
| 19 | |
| 20 | class ChrootHandlerTest(cros_test_lib.TestCase): |
| 21 | """ChrootHandler tests.""" |
| 22 | |
| 23 | def setUp(self): |
| 24 | self.path = '/chroot/dir' |
| 25 | self.cache_dir = '/cache/dir' |
| 26 | self.env = {'FEATURES': 'thing'} |
| 27 | self.expected_chroot = chroot_lib.Chroot(self.path, self.cache_dir, |
| 28 | self.env) |
| 29 | |
| 30 | def test_parse_chroot_success(self): |
| 31 | """Test successful Chroot message parse.""" |
| 32 | chroot_msg = common_pb2.Chroot() |
| 33 | chroot_msg.path = self.path |
| 34 | chroot_msg.cache_dir = self.cache_dir |
| 35 | chroot_msg.env.features.add().feature = 'thing' |
| 36 | |
| 37 | chroot_handler = field_handler.ChrootHandler(clear_field=False) |
| 38 | parsed_chroot = chroot_handler.parse_chroot(chroot_msg) |
| 39 | |
| 40 | self.assertEqual(self.expected_chroot, parsed_chroot) |
| 41 | |
| 42 | def test_handle_success(self): |
| 43 | """Test a successful Chroot message parse from a parent message.""" |
| 44 | message = build_api_test_pb2.TestRequestMessage() |
| 45 | message.chroot.path = self.path |
| 46 | message.chroot.cache_dir = self.cache_dir |
| 47 | message.chroot.env.features.add().feature = 'thing' |
| 48 | |
| 49 | # First a no-clear parse. |
| 50 | chroot_handler = field_handler.ChrootHandler(clear_field=False) |
| 51 | chroot = chroot_handler.handle(message) |
| 52 | |
| 53 | self.assertEqual(self.expected_chroot, chroot) |
| 54 | self.assertEqual(message.chroot.path, self.path) |
| 55 | |
| 56 | # A clear field parse. |
| 57 | clear_chroot_handler = field_handler.ChrootHandler(clear_field=True) |
| 58 | chroot = clear_chroot_handler.handle(message) |
| 59 | |
| 60 | self.assertEqual(self.expected_chroot, chroot) |
| 61 | self.assertFalse(message.chroot.path) |
| 62 | |
| 63 | def test_handle_empty_chroot_message(self): |
| 64 | """Test handling of an empty chroot message.""" |
| 65 | message = build_api_test_pb2.TestRequestMessage() |
| 66 | empty_chroot = chroot_lib.Chroot(env={'FEATURES': 'separatedebug'}) |
| 67 | |
| 68 | chroot_handler = field_handler.ChrootHandler(clear_field=False) |
| 69 | chroot = chroot_handler.handle(message) |
| 70 | |
| 71 | self.assertEqual(empty_chroot, chroot) |
| 72 | |
| 73 | |
| 74 | class PathHandlerTest(cros_test_lib.TempDirTestCase): |
| 75 | """PathHandler tests.""" |
| 76 | |
| 77 | def setUp(self): |
| 78 | self.source_dir = os.path.join(self.tempdir, 'source') |
| 79 | self.dest_dir = os.path.join(self.tempdir, 'destination') |
| 80 | osutils.SafeMakedirs(self.source_dir) |
| 81 | osutils.SafeMakedirs(self.dest_dir) |
| 82 | |
| 83 | self.source_file1 = os.path.join(self.source_dir, 'file1') |
| 84 | self.file1_contents = 'file 1' |
| 85 | osutils.WriteFile(self.source_file1, self.file1_contents) |
| 86 | |
| 87 | self.file2_contents = 'some data' |
| 88 | self.source_file2 = os.path.join(self.source_dir, 'file2') |
| 89 | osutils.WriteFile(self.source_file2, self.file2_contents) |
| 90 | |
| 91 | def _path_checks(self, source_file, dest_file, contents=None): |
| 92 | """Set of common checks for the copied files/directories.""" |
| 93 | # Message should now reflect the new path. |
| 94 | self.assertNotEqual(source_file, dest_file) |
| 95 | # The new path should be in the destination directory. |
| 96 | self.assertStartsWith(dest_file, self.dest_dir) |
| 97 | # The new file should exist. |
| 98 | self.assertExists(dest_file) |
| 99 | |
| 100 | if contents: |
| 101 | # The contents should be the same as the source file. |
| 102 | self.assertFileContents(dest_file, contents) |
| 103 | |
| 104 | def test_handle_file(self): |
| 105 | """Test handling of a single file.""" |
| 106 | message = build_api_test_pb2.TestRequestMessage() |
| 107 | message.path.path = self.source_file1 |
| 108 | message.path.location = common_pb2.Path.OUTSIDE |
| 109 | |
| 110 | with field_handler.handle_paths(message, self.dest_dir, delete=True): |
| 111 | new_path = message.path.path |
| 112 | self._path_checks(self.source_file1, new_path, self.file1_contents) |
| 113 | |
| 114 | # The file should have been deleted on exit with delete=True. |
| 115 | self.assertNotExists(new_path) |
| 116 | |
| 117 | def test_handle_files(self): |
| 118 | """Test handling of multiple files.""" |
| 119 | message = build_api_test_pb2.TestRequestMessage() |
| 120 | message.path.path = self.source_file1 |
| 121 | message.path.location = common_pb2.Path.OUTSIDE |
| 122 | message.another_path.path = self.source_file2 |
| 123 | message.another_path.location = common_pb2.Path.OUTSIDE |
| 124 | |
| 125 | with field_handler.handle_paths(message, self.dest_dir, delete=False): |
| 126 | new_path1 = message.path.path |
| 127 | new_path2 = message.another_path.path |
| 128 | |
| 129 | self._path_checks(self.source_file1, new_path1, self.file1_contents) |
| 130 | self._path_checks(self.source_file2, new_path2, self.file2_contents) |
| 131 | |
| 132 | # The files should still exist with delete=False. |
| 133 | self.assertExists(new_path1) |
| 134 | self.assertExists(new_path2) |
| 135 | |
| 136 | def test_handle_directory(self): |
| 137 | """Test handling of a directory.""" |
| 138 | message = build_api_test_pb2.TestRequestMessage() |
| 139 | message.path.path = self.source_dir |
| 140 | message.path.location = common_pb2.Path.OUTSIDE |
| 141 | |
| 142 | with field_handler.handle_paths(message, self.dest_dir): |
| 143 | new_path = message.path.path |
| 144 | |
| 145 | self._path_checks(self.source_dir, self.dest_dir) |
| 146 | # Make sure both directories have the same files. |
| 147 | self.assertItemsEqual(os.listdir(self.source_dir), os.listdir(new_path)) |
| 148 | |
| 149 | def test_direction(self): |
| 150 | """Test the direction argument preventing copies.""" |
| 151 | message = build_api_test_pb2.TestRequestMessage() |
| 152 | message.path.path = self.source_file1 |
| 153 | message.path.location = common_pb2.Path.INSIDE |
| 154 | |
| 155 | direction = field_handler.PathHandler.INSIDE |
| 156 | with field_handler.handle_paths(message, self.dest_dir, delete=True, |
| 157 | direction=direction): |
| 158 | self.assertEqual(self.source_file1, message.path.path) |
| 159 | |
| 160 | # It should not be deleting the file when it doesn't need to copy it even |
| 161 | # with delete=True. |
| 162 | self.assertExists(self.source_file1) |