blob: 76570dd31034727b4c38346ea6e84fe383f9b234 [file] [log] [blame]
hinoka@chromium.org7a790542014-12-10 02:04:39 +00001#!/usr/bin/env python
2# Copyright 2014 The Chromium Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6"""Test gsutil.py."""
7
8
9import __builtin__
hinoka@chromium.org7a790542014-12-10 02:04:39 +000010import base64
hinoka@chromium.org5e879a42015-01-24 00:55:46 +000011import hashlib
hinoka@chromium.org7a790542014-12-10 02:04:39 +000012import json
13import os
hinoka@chromium.org5e879a42015-01-24 00:55:46 +000014import shutil
15import subprocess
16import sys
17import tempfile
18import unittest
primiano@chromium.orgdf351762014-12-18 11:12:34 +000019import urllib2
hinoka@chromium.org5e879a42015-01-24 00:55:46 +000020import zipfile
hinoka@chromium.org7a790542014-12-10 02:04:39 +000021
22
23# Add depot_tools to path
24THIS_DIR = os.path.dirname(os.path.abspath(__file__))
25DEPOT_TOOLS_DIR = os.path.dirname(THIS_DIR)
26sys.path.append(DEPOT_TOOLS_DIR)
27
28import gsutil
29
30
31class TestError(Exception):
32 pass
33
34
35class Buffer(object):
36 def __init__(self, data=None):
37 self.data = data or ''
38
39 def write(self, buf):
40 self.data += buf
41
42 def read(self, amount=None):
43 if not amount:
44 amount = len(self.data)
45 result = self.data[:amount]
46 self.data = self.data[amount:]
47 return result
48
49
50class FakeCall(object):
51 def __init__(self):
52 self.expectations = []
53
54 def add_expectation(self, *args, **kwargs):
55 returns = kwargs.pop('_returns', None)
56 self.expectations.append((args, kwargs, returns))
57
58 def __call__(self, *args, **kwargs):
59 if not self.expectations:
60 raise TestError('Got unexpected\n%s\n%s' % (args, kwargs))
61 exp_args, exp_kwargs, exp_returns = self.expectations.pop(0)
62 if args != exp_args or kwargs != exp_kwargs:
63 message = 'Expected:\n args: %s\n kwargs: %s\n' % (exp_args, exp_kwargs)
64 message += 'Got:\n args: %s\n kwargs: %s\n' % (args, kwargs)
65 raise TestError(message)
hinoka@chromium.org7a790542014-12-10 02:04:39 +000066 return exp_returns
67
68
69class GsutilUnitTests(unittest.TestCase):
70 def setUp(self):
71 self.fake = FakeCall()
72 self.tempdir = tempfile.mkdtemp()
primiano@chromium.orgdf351762014-12-18 11:12:34 +000073 self.old_urlopen = getattr(urllib2, 'urlopen')
hinoka@chromium.org5e879a42015-01-24 00:55:46 +000074 self.old_call = getattr(subprocess, 'call')
primiano@chromium.orgdf351762014-12-18 11:12:34 +000075 setattr(urllib2, 'urlopen', self.fake)
hinoka@chromium.org5e879a42015-01-24 00:55:46 +000076 setattr(subprocess, 'call', self.fake)
hinoka@chromium.org7a790542014-12-10 02:04:39 +000077
78 def tearDown(self):
79 self.assertEqual(self.fake.expectations, [])
80 shutil.rmtree(self.tempdir)
primiano@chromium.orgdf351762014-12-18 11:12:34 +000081 setattr(urllib2, 'urlopen', self.old_urlopen)
hinoka@chromium.org5e879a42015-01-24 00:55:46 +000082 setattr(subprocess, 'call', self.old_call)
hinoka@chromium.org7a790542014-12-10 02:04:39 +000083
84 def test_download_gsutil(self):
85 version = '4.2'
86 filename = 'gsutil_%s.zip' % version
87 full_filename = os.path.join(self.tempdir, filename)
88 fake_file = 'This is gsutil.zip'
89 fake_file2 = 'This is other gsutil.zip'
90 url = '%s%s' % (gsutil.GSUTIL_URL, filename)
91 self.fake.add_expectation(url, _returns=Buffer(fake_file))
92
93 self.assertEquals(
94 gsutil.download_gsutil(version, self.tempdir), full_filename)
95 with open(full_filename, 'r') as f:
96 self.assertEquals(fake_file, f.read())
97
98 metadata_url = gsutil.API_URL + filename
99 md5_calc = hashlib.md5()
100 md5_calc.update(fake_file)
101 b64_md5 = base64.b64encode(md5_calc.hexdigest())
102 self.fake.add_expectation(metadata_url, _returns=Buffer(json.dumps({
103 'md5Hash': b64_md5
104 })))
105 self.assertEquals(
106 gsutil.download_gsutil(version, self.tempdir), full_filename)
107 with open(full_filename, 'r') as f:
108 self.assertEquals(fake_file, f.read())
109 self.assertEquals(self.fake.expectations, [])
110
111 self.fake.add_expectation(metadata_url, _returns=Buffer(json.dumps({
112 'md5Hash': base64.b64encode('aaaaaaa') # Bad MD5
113 })))
114 self.fake.add_expectation(url, _returns=Buffer(fake_file2))
115 self.assertEquals(
116 gsutil.download_gsutil(version, self.tempdir), full_filename)
117 with open(full_filename, 'r') as f:
118 self.assertEquals(fake_file2, f.read())
119 self.assertEquals(self.fake.expectations, [])
120
121 def test_ensure_gsutil_full(self):
122 version = '4.2'
123 gsutil_dir = os.path.join(self.tempdir, 'gsutil_%s' % version, 'gsutil')
124 gsutil_bin = os.path.join(gsutil_dir, 'gsutil')
125 os.makedirs(gsutil_dir)
126
127 self.fake.add_expectation(
hinoka@chromium.org5e879a42015-01-24 00:55:46 +0000128 [sys.executable, gsutil_bin, 'version'], stdout=subprocess.PIPE,
129 stderr=subprocess.STDOUT, _returns=1)
hinoka@chromium.org7a790542014-12-10 02:04:39 +0000130
131 with open(gsutil_bin, 'w') as f:
132 f.write('Foobar')
133 zip_filename = 'gsutil_%s.zip' % version
134 url = '%s%s' % (gsutil.GSUTIL_URL, zip_filename)
135 _, tempzip = tempfile.mkstemp()
136 fake_gsutil = 'Fake gsutil'
137 with zipfile.ZipFile(tempzip, 'w') as zf:
138 zf.writestr('gsutil/gsutil', fake_gsutil)
139 with open(tempzip, 'rb') as f:
140 self.fake.add_expectation(url, _returns=Buffer(f.read()))
141 self.fake.add_expectation(
hinoka@chromium.org5e879a42015-01-24 00:55:46 +0000142 [sys.executable, gsutil_bin, 'version'], stdout=subprocess.PIPE,
143 stderr=subprocess.STDOUT, _returns=1)
hinoka@chromium.org7a790542014-12-10 02:04:39 +0000144
145 # This should delete the old bin and rewrite it with 'Fake gsutil'
146 self.assertRaises(
147 gsutil.InvalidGsutilError, gsutil.ensure_gsutil, version, self.tempdir)
148 self.assertTrue(os.path.isdir(os.path.join(self.tempdir, '.cache_dir')))
149 self.assertTrue(os.path.exists(gsutil_bin))
150 with open(gsutil_bin, 'r') as f:
151 self.assertEquals(f.read(), fake_gsutil)
152 self.assertEquals(self.fake.expectations, [])
153
154 def test_ensure_gsutil_short(self):
155 version = '4.2'
156 gsutil_dir = os.path.join(self.tempdir, 'gsutil_%s' % version, 'gsutil')
157 gsutil_bin = os.path.join(gsutil_dir, 'gsutil')
158 os.makedirs(gsutil_dir)
159
160 # Mock out call().
161 self.fake.add_expectation(
hinoka@chromium.org5e879a42015-01-24 00:55:46 +0000162 [sys.executable, gsutil_bin, 'version'],
163 stdout=subprocess.PIPE, stderr=subprocess.STDOUT, _returns=0)
hinoka@chromium.org7a790542014-12-10 02:04:39 +0000164
165 with open(gsutil_bin, 'w') as f:
166 f.write('Foobar')
167 self.assertEquals(
168 gsutil.ensure_gsutil(version, self.tempdir), gsutil_bin)
169
170if __name__ == '__main__':
171 unittest.main()