blob: c6156c54969bfc62321296a2ec1a18591db8fe49 [file] [log] [blame]
Takuto Ikutaf48103c2019-10-24 05:23:19 +00001#!/usr/bin/env vpython3
maruel@chromium.org561d4b22013-09-26 21:08:08 +00002# coding=utf-8
maruelea586f32016-04-05 11:11:33 -07003# Copyright 2013 The LUCI Authors. All rights reserved.
maruelf1f5e2a2016-05-25 17:10:39 -07004# Use of this source code is governed under the Apache License, Version 2.0
5# that can be found in the LICENSE file.
maruel@chromium.org561d4b22013-09-26 21:08:08 +00006
maruel9cdd7612015-12-02 13:40:52 -08007import getpass
Takuto Ikutaf48103c2019-10-24 05:23:19 +00008import io
maruel@chromium.org561d4b22013-09-26 21:08:08 +00009import os
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -040010import subprocess
maruel@chromium.org561d4b22013-09-26 21:08:08 +000011import sys
Marc-Antoine Ruel76cfcee2019-04-01 23:16:36 +000012import tempfile
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -040013import time
Junji Watanabe16d51522020-07-01 01:59:07 +000014import unittest
15
Marc-Antoine Ruel76cfcee2019-04-01 23:16:36 +000016# Mutates sys.path.
17import test_env
maruel@chromium.org561d4b22013-09-26 21:08:08 +000018
Marc-Antoine Ruel76cfcee2019-04-01 23:16:36 +000019# third_party/
Marc-Antoine Ruelf1d827c2014-11-24 15:22:25 -050020from depot_tools import auto_stub
Marc-Antoine Ruel76cfcee2019-04-01 23:16:36 +000021
maruel@chromium.org561d4b22013-09-26 21:08:08 +000022from utils import file_path
maruel4e732992015-10-16 10:17:21 -070023from utils import fs
Junji Watanabedff19e62021-08-10 02:22:11 +000024from utils import subprocess42
maruel@chromium.org561d4b22013-09-26 21:08:08 +000025
26
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -040027def write_content(filepath, content):
maruel4e732992015-10-16 10:17:21 -070028 with fs.open(filepath, 'wb') as f:
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -040029 f.write(content)
30
31
Marc-Antoine Ruelf1d827c2014-11-24 15:22:25 -050032class FilePathTest(auto_stub.TestCase):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -040033 def setUp(self):
34 super(FilePathTest, self).setUp()
35 self._tempdir = None
36
37 def tearDown(self):
maruel4b14f042015-10-06 12:08:08 -070038 try:
39 if self._tempdir:
maruel4e732992015-10-16 10:17:21 -070040 for dirpath, dirnames, filenames in fs.walk(
maruel4b14f042015-10-06 12:08:08 -070041 self._tempdir, topdown=True):
42 for filename in filenames:
43 file_path.set_read_only(os.path.join(dirpath, filename), False)
44 for dirname in dirnames:
45 file_path.set_read_only(os.path.join(dirpath, dirname), False)
46 file_path.rmtree(self._tempdir)
47 finally:
48 super(FilePathTest, self).tearDown()
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -040049
50 @property
51 def tempdir(self):
52 if not self._tempdir:
Junji Watanabe8827e522022-01-13 07:54:00 +000053 self._tempdir = tempfile.mkdtemp(prefix='file_path_test')
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -040054 return self._tempdir
55
vadimshe42aeba2016-06-03 12:32:21 -070056 def test_atomic_replace_new_file(self):
57 path = os.path.join(self.tempdir, 'new_file')
Takuto Ikutaf48103c2019-10-24 05:23:19 +000058 file_path.atomic_replace(path, b'blah')
vadimshe42aeba2016-06-03 12:32:21 -070059 with open(path, 'rb') as f:
Takuto Ikutaf48103c2019-10-24 05:23:19 +000060 self.assertEqual(b'blah', f.read())
Junji Watanabe8827e522022-01-13 07:54:00 +000061 self.assertEqual(['new_file'], os.listdir(self.tempdir))
vadimshe42aeba2016-06-03 12:32:21 -070062
63 def test_atomic_replace_existing_file(self):
64 path = os.path.join(self.tempdir, 'existing_file')
65 with open(path, 'wb') as f:
Takuto Ikutaf48103c2019-10-24 05:23:19 +000066 f.write(b'existing body')
67 file_path.atomic_replace(path, b'new body')
vadimshe42aeba2016-06-03 12:32:21 -070068 with open(path, 'rb') as f:
Takuto Ikutaf48103c2019-10-24 05:23:19 +000069 self.assertEqual(b'new body', f.read())
Junji Watanabe8827e522022-01-13 07:54:00 +000070 self.assertEqual(['existing_file'], os.listdir(self.tempdir))
vadimshe42aeba2016-06-03 12:32:21 -070071
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -040072 def assertFileMode(self, filepath, mode, umask=None):
Marc-Antoine Ruel76cfcee2019-04-01 23:16:36 +000073 umask = test_env.umask() if umask is None else umask
maruel4e732992015-10-16 10:17:21 -070074 actual = fs.stat(filepath).st_mode
Junji Watanabeab2102a2022-01-12 01:44:04 +000075 expected = mode & ~umask # pylint: disable=invalid-unary-operand-type
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -040076 self.assertEqual(
77 expected,
78 actual,
79 (filepath, oct(expected), oct(actual), oct(umask)))
80
81 def assertMaskedFileMode(self, filepath, mode):
82 """It's usually when the file was first marked read only."""
Takuto Ikuta7669fb52019-10-23 06:07:30 +000083 self.assertFileMode(filepath, mode, 0 if sys.platform == 'win32' else 0o77)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -040084
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -040085 def test_delete_wd_rf(self):
86 # Confirms that a RO file in a RW directory can be deleted on non-Windows.
87 dir_foo = os.path.join(self.tempdir, 'foo')
88 file_bar = os.path.join(dir_foo, 'bar')
Takuto Ikuta7669fb52019-10-23 06:07:30 +000089 fs.mkdir(dir_foo, 0o777)
Takuto Ikutaf48103c2019-10-24 05:23:19 +000090 write_content(file_bar, b'bar')
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -040091 file_path.set_read_only(dir_foo, False)
92 file_path.set_read_only(file_bar, True)
Takuto Ikuta7669fb52019-10-23 06:07:30 +000093 self.assertFileMode(dir_foo, 0o40777)
94 self.assertMaskedFileMode(file_bar, 0o100444)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -040095 if sys.platform == 'win32':
96 # On Windows, a read-only file can't be deleted.
97 with self.assertRaises(OSError):
maruel4e732992015-10-16 10:17:21 -070098 fs.remove(file_bar)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -040099 else:
maruel4e732992015-10-16 10:17:21 -0700100 fs.remove(file_bar)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -0400101
102 def test_delete_rd_wf(self):
103 # Confirms that a Rw file in a RO directory can be deleted on Windows only.
104 dir_foo = os.path.join(self.tempdir, 'foo')
105 file_bar = os.path.join(dir_foo, 'bar')
Takuto Ikuta7669fb52019-10-23 06:07:30 +0000106 fs.mkdir(dir_foo, 0o777)
Takuto Ikutaf48103c2019-10-24 05:23:19 +0000107 write_content(file_bar, b'bar')
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -0400108 file_path.set_read_only(dir_foo, True)
109 file_path.set_read_only(file_bar, False)
Takuto Ikuta7669fb52019-10-23 06:07:30 +0000110 self.assertMaskedFileMode(dir_foo, 0o40555)
111 self.assertFileMode(file_bar, 0o100666)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -0400112 if sys.platform == 'win32':
113 # A read-only directory has a convoluted meaning on Windows, it means that
114 # the directory is "personalized". This is used as a signal by Windows
115 # Explorer to tell it to look into the directory for desktop.ini.
116 # See http://support.microsoft.com/kb/326549 for more details.
117 # As such, it is important to not try to set the read-only bit on
118 # directories on Windows since it has no effect other than trigger
119 # Windows Explorer to look for desktop.ini, which is unnecessary.
maruel4e732992015-10-16 10:17:21 -0700120 fs.remove(file_bar)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -0400121 else:
122 with self.assertRaises(OSError):
maruel4e732992015-10-16 10:17:21 -0700123 fs.remove(file_bar)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -0400124
125 def test_delete_rd_rf(self):
126 # Confirms that a RO file in a RO directory can't be deleted.
127 dir_foo = os.path.join(self.tempdir, 'foo')
128 file_bar = os.path.join(dir_foo, 'bar')
Takuto Ikuta7669fb52019-10-23 06:07:30 +0000129 fs.mkdir(dir_foo, 0o777)
Takuto Ikutaf48103c2019-10-24 05:23:19 +0000130 write_content(file_bar, b'bar')
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -0400131 file_path.set_read_only(dir_foo, True)
132 file_path.set_read_only(file_bar, True)
Takuto Ikuta7669fb52019-10-23 06:07:30 +0000133 self.assertMaskedFileMode(dir_foo, 0o40555)
134 self.assertMaskedFileMode(file_bar, 0o100444)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -0400135 with self.assertRaises(OSError):
136 # It fails for different reason depending on the OS. See the test cases
137 # above.
maruel4e732992015-10-16 10:17:21 -0700138 fs.remove(file_bar)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -0400139
140 def test_hard_link_mode(self):
141 # Creates a hard link, see if the file mode changed on the node or the
142 # directory entry.
143 dir_foo = os.path.join(self.tempdir, 'foo')
144 file_bar = os.path.join(dir_foo, 'bar')
145 file_link = os.path.join(dir_foo, 'link')
Takuto Ikuta7669fb52019-10-23 06:07:30 +0000146 fs.mkdir(dir_foo, 0o777)
Takuto Ikutaf48103c2019-10-24 05:23:19 +0000147 write_content(file_bar, b'bar')
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -0400148 file_path.hardlink(file_bar, file_link)
Takuto Ikuta7669fb52019-10-23 06:07:30 +0000149 self.assertFileMode(file_bar, 0o100666)
150 self.assertFileMode(file_link, 0o100666)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -0400151 file_path.set_read_only(file_bar, True)
Takuto Ikuta7669fb52019-10-23 06:07:30 +0000152 self.assertMaskedFileMode(file_bar, 0o100444)
153 self.assertMaskedFileMode(file_link, 0o100444)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -0400154 # This is bad news for Windows; on Windows, the file must be writeable to be
155 # deleted, but the file node is modified. This means that every hard links
156 # must be reset to be read-only after deleting one of the hard link
157 # directory entry.
158
Takuto Ikutae3f70382019-04-03 14:52:06 +0000159 def test_ensure_tree(self):
160 dir_foo = os.path.join(self.tempdir, 'foo')
Takuto Ikuta7669fb52019-10-23 06:07:30 +0000161 file_path.ensure_tree(dir_foo, 0o777)
Takuto Ikutae3f70382019-04-03 14:52:06 +0000162
163 self.assertTrue(os.path.isdir(dir_foo))
164
165 # Do not raise OSError with errno.EEXIST
Takuto Ikuta7669fb52019-10-23 06:07:30 +0000166 file_path.ensure_tree(dir_foo, 0o777)
Takuto Ikutae3f70382019-04-03 14:52:06 +0000167
Junji Watanabedff19e62021-08-10 02:22:11 +0000168 def _make_tree(self):
169 root = os.path.join(self.tempdir, 'root')
170 child_dir = os.path.join(root, 'child')
171 grand_child_dir = os.path.join(child_dir, 'grand_child')
172 dirs = [root, child_dir, grand_child_dir]
173 os.makedirs(grand_child_dir)
174 files = [
175 os.path.join(root, 'file1'),
176 os.path.join(child_dir, 'file2'),
177 os.path.join(grand_child_dir, 'file3'),
178 ]
179 for f in files:
180 open(f, 'w').close()
181 return root, dirs, files
182
183 @unittest.skipIf(sys.platform == 'win32', 'posix only')
184 def test_rmtree(self):
185 root, dirs, _ = self._make_tree()
186
187 # Emulate fs.rmtree() permission error.
188 can_delete = set()
189
190 def fs_rmtree_mock(_path, onerror):
191 for d in dirs:
192 if d not in can_delete:
193 onerror(None, None, (None, None, None))
194
195 def chmod_mock(path, _mode):
196 can_delete.add(path)
197
198 self.mock(fs, 'rmtree', fs_rmtree_mock)
199 if hasattr(os, 'lchmod'):
200 self.mock(fs, 'lchmod', chmod_mock)
201 else:
202 self.mock(fs, 'chmod', chmod_mock)
203
204 file_path.rmtree(root)
205
206 @unittest.skipIf(sys.platform == 'win32', 'posix only')
207 def test_rmtree_with_sudo_chmod(self):
208 root, dirs, _ = self._make_tree()
209
210 # Emulate fs.rmtree() permission error.
211 can_delete = set()
212
213 def fs_rmtree_mock(_path, onerror):
214 for d in dirs:
215 if d not in can_delete:
216 onerror(None, None, (None, None, None))
217
218 self.mock(fs, 'rmtree', fs_rmtree_mock)
219
220 # pylint: disable=unused-argument
221 def subprocess_mock(cmd, stdin=None):
222 path = cmd[4]
223 can_delete.add(path)
224
225 self.mock(file_path, 'set_read_only_swallow', lambda *_: OSError('error'))
226 self.mock(subprocess42, 'call', subprocess_mock)
227
228 file_path.rmtree(root)
229
Marc-Antoine Ruel0a795bd2015-01-16 20:32:10 -0500230 def test_rmtree_unicode(self):
231 subdir = os.path.join(self.tempdir, 'hi')
maruel4e732992015-10-16 10:17:21 -0700232 fs.mkdir(subdir)
Junji Watanabe8827e522022-01-13 07:54:00 +0000233 filepath = os.path.join(subdir,
234 '\u0627\u0644\u0635\u064A\u0646\u064A\u0629')
maruel4e732992015-10-16 10:17:21 -0700235 with fs.open(filepath, 'wb') as f:
Takuto Ikutaf48103c2019-10-24 05:23:19 +0000236 f.write(b'hi')
Marc-Antoine Ruel0a795bd2015-01-16 20:32:10 -0500237 # In particular, it fails when the input argument is a str.
238 file_path.rmtree(str(subdir))
239
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -0400240 if sys.platform == 'win32':
Junji Watanabedff19e62021-08-10 02:22:11 +0000241 def test_rmtree_win(self):
242 root, _, files = self._make_tree()
243
244 # Emulate fs.rmtree() permission error.
245 can_delete = set()
246
247 def fs_rmtree_mock(_path, onerror):
248 for f in files:
249 if f not in can_delete:
250 onerror(None, None, (None, None, None))
251
252 def chmod_mock(path, _mode):
253 can_delete.add(path)
254
255 self.mock(fs, 'rmtree', fs_rmtree_mock)
256 self.mock(fs, 'chmod', chmod_mock)
257
258 file_path.rmtree(root)
259
Junji Watanabed94a5ab2021-08-06 23:03:29 +0000260 def test_rmtree_outliving_processes(self):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -0400261 # Mock our sleep for faster test case execution.
262 sleeps = []
263 self.mock(time, 'sleep', sleeps.append)
Takuto Ikutaf48103c2019-10-24 05:23:19 +0000264 self.mock(sys, 'stderr', io.StringIO())
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -0400265
266 # Open a child process, so the file is locked.
267 subdir = os.path.join(self.tempdir, 'to_be_deleted')
maruel4e732992015-10-16 10:17:21 -0700268 fs.mkdir(subdir)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -0400269 script = 'import time; open(\'a\', \'w\'); time.sleep(60)'
270 proc = subprocess.Popen([sys.executable, '-c', script], cwd=subdir)
271 try:
272 # Wait until the file exist.
maruel4e732992015-10-16 10:17:21 -0700273 while not fs.isfile(os.path.join(subdir, 'a')):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -0400274 self.assertEqual(None, proc.poll())
275 file_path.rmtree(subdir)
Junji Watanabed94a5ab2021-08-06 23:03:29 +0000276 self.assertEqual([4, 2], sleeps)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -0400277 # sys.stderr.getvalue() would return a fair amount of output but it is
278 # not completely deterministic so we're not testing it here.
279 finally:
280 proc.wait()
281
Marc-Antoine Ruela275b292014-11-25 15:17:21 -0500282 def test_filter_processes_dir_win(self):
283 python_dir = os.path.dirname(sys.executable)
Marc-Antoine Ruel0eb2eb22019-01-29 21:00:16 +0000284 processes = file_path._filter_processes_dir_win(
285 file_path._enum_processes_win(), python_dir)
Marc-Antoine Ruela275b292014-11-25 15:17:21 -0500286 self.assertTrue(processes)
287 proc_names = [proc.ExecutablePath for proc in processes]
288 # Try to find at least one python process.
289 self.assertTrue(
290 any(proc == sys.executable for proc in proc_names), proc_names)
291
292 def test_filter_processes_tree_win(self):
293 # Create a grand-child.
294 script = (
295 'import subprocess,sys;'
296 'proc = subprocess.Popen('
297 '[sys.executable, \'-u\', \'-c\', \'import time; print(1); '
298 'time.sleep(60)\'], stdout=subprocess.PIPE); '
299 # Signal grand child is ready.
300 'print(proc.stdout.read(1)); '
301 # Wait for parent to have completed the test.
302 'sys.stdin.read(1); '
303 'proc.kill()'
304 )
305 proc = subprocess.Popen(
306 [sys.executable, '-u', '-c', script],
307 stdin=subprocess.PIPE,
308 stdout=subprocess.PIPE)
309 try:
310 proc.stdout.read(1)
311 processes = file_path.filter_processes_tree_win(
Marc-Antoine Ruel0eb2eb22019-01-29 21:00:16 +0000312 file_path._enum_processes_win())
Marc-Antoine Ruela275b292014-11-25 15:17:21 -0500313 self.assertEqual(3, len(processes), processes)
314 proc.stdin.write('a')
315 proc.wait()
316 except Exception:
317 proc.kill()
318 finally:
319 proc.wait()
320
maruel@chromium.org561d4b22013-09-26 21:08:08 +0000321 if sys.platform != 'win32':
322 def test_symlink(self):
323 # This test will fail if the checkout is in a symlink.
Marc-Antoine Ruel76cfcee2019-04-01 23:16:36 +0000324 actual = file_path.split_at_symlink(None, test_env.CLIENT_DIR)
325 expected = (test_env.CLIENT_DIR, None, None)
maruel@chromium.org561d4b22013-09-26 21:08:08 +0000326 self.assertEqual(expected, actual)
327
328 actual = file_path.split_at_symlink(
Marc-Antoine Ruel76cfcee2019-04-01 23:16:36 +0000329 None, os.path.join(test_env.TESTS_DIR, 'trace_inputs'))
330 expected = (os.path.join(test_env.TESTS_DIR, 'trace_inputs'), None, None)
331 self.assertEqual(expected, actual)
332
333 actual = file_path.split_at_symlink(
334 None, os.path.join(test_env.TESTS_DIR, 'trace_inputs', 'files2'))
maruel@chromium.org561d4b22013-09-26 21:08:08 +0000335 expected = (
Marc-Antoine Ruel76cfcee2019-04-01 23:16:36 +0000336 os.path.join(test_env.TESTS_DIR, 'trace_inputs'), 'files2', '')
maruel@chromium.org561d4b22013-09-26 21:08:08 +0000337 self.assertEqual(expected, actual)
338
339 actual = file_path.split_at_symlink(
Marc-Antoine Ruel76cfcee2019-04-01 23:16:36 +0000340 test_env.CLIENT_DIR, os.path.join('tests', 'trace_inputs', 'files2'))
maruel@chromium.org561d4b22013-09-26 21:08:08 +0000341 expected = (
342 os.path.join('tests', 'trace_inputs'), 'files2', '')
343 self.assertEqual(expected, actual)
344 actual = file_path.split_at_symlink(
Marc-Antoine Ruel76cfcee2019-04-01 23:16:36 +0000345 test_env.CLIENT_DIR,
346 os.path.join('tests', 'trace_inputs', 'files2', 'bar'))
347 expected = (os.path.join('tests', 'trace_inputs'), 'files2', '/bar')
maruel@chromium.org561d4b22013-09-26 21:08:08 +0000348 self.assertEqual(expected, actual)
349
maruel9cdd7612015-12-02 13:40:52 -0800350 else:
Junji Watanabee1d6df22020-11-12 08:13:23 +0000351
maruel9cdd7612015-12-02 13:40:52 -0800352 def test_undeleteable_chmod(self):
353 # Create a file and a directory with an empty ACL. Then try to delete it.
354 dirpath = os.path.join(self.tempdir, 'd')
355 filepath = os.path.join(dirpath, 'f')
356 os.mkdir(dirpath)
357 with open(filepath, 'w') as f:
358 f.write('hi')
359 os.chmod(filepath, 0)
360 os.chmod(dirpath, 0)
361 file_path.rmtree(dirpath)
362
363 def test_undeleteable_owner(self):
364 # Create a file and a directory with an empty ACL. Then try to delete it.
365 dirpath = os.path.join(self.tempdir, 'd')
366 filepath = os.path.join(dirpath, 'f')
367 os.mkdir(dirpath)
368 with open(filepath, 'w') as f:
369 f.write('hi')
370 import win32security
371 user, _domain, _type = win32security.LookupAccountName(
372 '', getpass.getuser())
373 sd = win32security.SECURITY_DESCRIPTOR()
374 sd.Initialize()
375 sd.SetSecurityDescriptorOwner(user, False)
376 # Create an empty DACL, which removes all rights.
377 dacl = win32security.ACL()
378 dacl.Initialize()
379 sd.SetSecurityDescriptorDacl(1, dacl, 0)
380 win32security.SetFileSecurity(
381 fs.extend(filepath), win32security.DACL_SECURITY_INFORMATION, sd)
382 win32security.SetFileSecurity(
383 fs.extend(dirpath), win32security.DACL_SECURITY_INFORMATION, sd)
384 file_path.rmtree(dirpath)
385
Takuto Ikuta995da062021-03-17 05:01:59 +0000386 def _check_get_recursive_size(self, symlink='symlink'):
387 # Test that _get_recursive_size calculates file size recursively.
388 with open(os.path.join(self.tempdir, '1'), 'w') as f:
389 f.write('0')
390 self.assertEqual(file_path.get_recursive_size(self.tempdir), 1)
391
392 with open(os.path.join(self.tempdir, '2'), 'w') as f:
393 f.write('01')
394 self.assertEqual(file_path.get_recursive_size(self.tempdir), 3)
395
396 nested_dir = os.path.join(self.tempdir, 'dir1', 'dir2')
397 os.makedirs(nested_dir)
398 with open(os.path.join(nested_dir, '4'), 'w') as f:
399 f.write('0123')
400 self.assertEqual(file_path.get_recursive_size(self.tempdir), 7)
401
Junji Watanabe9baeedc2021-09-21 04:52:09 +0000402 # Add an unreadable directory.
403 secure_dir = os.path.join(self.tempdir, 'dir_secure')
404 os.makedirs(secure_dir, mode=0o000)
405 self.assertEqual(file_path.get_recursive_size(self.tempdir), 7)
406
Takuto Ikuta995da062021-03-17 05:01:59 +0000407 symlink_dir = os.path.join(self.tempdir, 'symlink_dir')
408 symlink_file = os.path.join(self.tempdir, 'symlink_file')
409 if symlink == 'symlink':
410
411 if sys.platform == 'win32':
412 subprocess.check_call('cmd /c mklink /d %s %s' %
413 (symlink_dir, nested_dir))
414 subprocess.check_call('cmd /c mklink %s %s' %
415 (symlink_file, os.path.join(self.tempdir, '1')))
416 else:
417 os.symlink(nested_dir, symlink_dir)
418 os.symlink(os.path.join(self.tempdir, '1'), symlink_file)
419
420 elif symlink == 'junction':
421 # junction should be ignored.
422 subprocess.check_call('cmd /c mklink /j %s %s' %
423 (symlink_dir, nested_dir))
424
425 # This is invalid junction, junction can be made only for directory.
426 subprocess.check_call('cmd /c mklink /j %s %s' %
427 (symlink_file, os.path.join(self.tempdir, '1')))
428 elif symlink == 'hardlink':
429 # hardlink can be made only for file.
430 subprocess.check_call('cmd /c mklink /h %s %s' %
431 (symlink_file, os.path.join(self.tempdir, '1')))
432 else:
433 assert False, ("symlink should be one of symlink, "
434 "junction or hardlink, but: %s" % symlink)
435
436 if symlink == 'hardlink':
437 # hardlinked file is double counted.
438 self.assertEqual(file_path.get_recursive_size(self.tempdir), 8)
439 else:
440 # symlink and junction should be ignored.
441 self.assertEqual(file_path.get_recursive_size(self.tempdir), 7)
442
443 def test_get_recursive_size(self):
444 self._check_get_recursive_size()
445
446 @unittest.skipUnless(sys.platform == 'win32', 'Windows specific')
447 def test_get_recursive_size_win_junction(self):
448 self._check_get_recursive_size(symlink='junction')
449
450 @unittest.skipUnless(sys.platform == 'win32', 'Windows specific')
451 def test_get_recursive_size_win_hardlink(self):
452 self._check_get_recursive_size(symlink='hardlink')
453
maruel@chromium.org561d4b22013-09-26 21:08:08 +0000454
455if __name__ == '__main__':
Marc-Antoine Ruel76cfcee2019-04-01 23:16:36 +0000456 test_env.main()