blob: ed06fc7dc047437e743fb1f99ba82d5ec15dc316 [file] [log] [blame]
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001#!/usr/bin/env python
2# Copyright (c) 2012 The Chromium Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6"""Archives a set of files to a server."""
7
8import binascii
9import hashlib
10import logging
11import optparse
12import os
13import sys
14import time
15import urllib2
16
17import run_isolated
18
19
20# The maximum number of upload attempts to try when uploading a single file.
21MAX_UPLOAD_ATTEMPTS = 5
22
23# The minimum size of files to upload directly to the blobstore.
24MIN_SIZE_FOR_DIRECT_BLOBSTORE = 20 * 8
25
26
27def encode_multipart_formdata(fields, files,
28 mime_mapper=lambda _: 'application/octet-stream'):
29 """Encodes a Multipart form data object.
30
31 Args:
32 fields: a sequence (name, value) elements for
33 regular form fields.
34 files: a sequence of (name, filename, value) elements for data to be
35 uploaded as files.
36 mime_mapper: function to return the mime type from the filename.
37 Returns:
38 content_type: for httplib.HTTP instance
39 body: for httplib.HTTP instance
40 """
41 boundary = hashlib.md5(str(time.time())).hexdigest()
42 body_list = []
43 for (key, value) in fields:
44 if isinstance(key, unicode):
45 value = key.encode('utf-8')
46 if isinstance(value, unicode):
47 value = value.encode('utf-8')
48 body_list.append('--' + boundary)
49 body_list.append('Content-Disposition: form-data; name="%s"' % key)
50 body_list.append('')
51 body_list.append(value)
52 body_list.append('--' + boundary)
53 body_list.append('')
54 for (key, filename, value) in files:
55 if isinstance(key, unicode):
56 value = key.encode('utf-8')
57 if isinstance(filename, unicode):
58 value = filename.encode('utf-8')
59 if isinstance(value, unicode):
60 value = value.encode('utf-8')
61 body_list.append('--' + boundary)
62 body_list.append('Content-Disposition: form-data; name="%s"; '
63 'filename="%s"' % (key, filename))
64 body_list.append('Content-Type: %s' % mime_mapper(filename))
65 body_list.append('')
66 body_list.append(value)
67 body_list.append('--' + boundary)
68 body_list.append('')
69 if body_list:
70 body_list[-2] += '--'
71 body = '\r\n'.join(body_list)
72 content_type = 'multipart/form-data; boundary=%s' % boundary
73 return content_type, body
74
75
76def gen_url_request(url, payload, content_type='application/octet-stream'):
77 """Returns a POST request."""
78 request = urllib2.Request(url, data=payload)
79 if payload is not None:
80 request.add_header('Content-Type', content_type)
81 request.add_header('Content-Length', len(payload))
82 return request
83
84
85def url_open(url, data, content_type='application/octet-stream'):
86 """Opens the given url with the given data, repeating up to
87 MAX_UPLOAD_ATTEMPTS times if it encounters an error.
88
89 Arguments:
90 url: The url to open.
91 data: The data to send to the url.
92
93 Returns:
94 The response from the url, or it raises an exception it it failed to get
95 a response.
96 """
97 request = gen_url_request(url, data, content_type)
98 for i in range(MAX_UPLOAD_ATTEMPTS):
99 try:
100 return urllib2.urlopen(request)
101 except urllib2.URLError as e:
102 logging.warning('Unable to connect to %s, error msg: %s', url, e)
103 time.sleep(0.5 + i)
104
105 # If we get no response from the server after max_retries, assume it
106 # is down and raise an exception
107 raise run_isolated.MappingError(
108 'Unable to connect to server, %s, to see which files are presents' %
109 url)
110
111
112def upload_hash_content_to_blobstore(generate_upload_url, content):
113 """Uploads the given hash contents directly to the blobsotre via a generated
114 url.
115
116 Arguments:
117 generate_upload_url: The url to get the new upload url from.
118 hash_contents: The contents to upload.
119 """
120 logging.debug('Generating url to directly upload file to blobstore')
121 upload_url = url_open(generate_upload_url, None).read()
122
123 if not upload_url:
124 logging.error('Unable to generate upload url')
125 return
126
127 content_type, body = encode_multipart_formdata(
128 [], [('hash_contents', 'hash_content', content)])
129 url_open(upload_url, body, content_type)
130
131
132class UploadRemote(run_isolated.Remote):
133 @staticmethod
134 def get_file_handler(base_url):
135 def upload_file(content, hash_key):
136 content_url = base_url.rstrip('/') + '/content/'
137 namespace = 'default'
138 if len(content) > MIN_SIZE_FOR_DIRECT_BLOBSTORE:
139 upload_hash_content_to_blobstore(
140 content_url + 'generate_blobstore_url/' + namespace + '/' +
141 hash_key,
142 content)
143 else:
144 url_open(content_url + '/store/' + namespace + '/' + hash_key, content)
145 return upload_file
146
147
148def update_files_to_upload(query_url, queries, files_to_upload):
149 """Queries the server to see which files from this batch already exist there.
150
151 Arguments:
152 queries: The hash files to potential upload to the server.
153 files_to_upload: Any new files that need to be upload are added to
154 this list.
155 """
156 body = ''.join(
157 (binascii.unhexlify(meta_data['sha-1']) for (_, meta_data) in queries))
158 assert (len(body) % 20) == 0, repr(body)
159
160 response = url_open(query_url, body).read()
161 if len(queries) != len(response):
162 raise run_isolated.MappingError(
163 'Got an incorrect number of responses from the server. Expected %d, '
164 'but got %d' % (len(queries), len(response)))
165
166 hit = 0
167 for i in range(len(response)):
168 if response[i] == chr(0):
169 files_to_upload.append(queries[i])
170 else:
171 hit += 1
172 logging.info('Queried %d files, %d cache hit', len(queries), hit)
173
174
175def upload_sha1_tree(base_url, indir, infiles):
176 """Uploads the given tree to the given url.
177
178 Arguments:
179 base_url: The base url, it is assume that |base_url|/has/ can be used to
180 query if an element was already uploaded, and |base_url|/store/
181 can be used to upload a new element.
182 indir: Root directory the infiles are based in.
183 infiles: dict of files to map from |indir| to |outdir|.
184 """
185 logging.info('upload tree(base_url=%s, indir=%s, files=%d)' %
186 (base_url, indir, len(infiles)))
187
188 # Generate the list of files that need to be uploaded (since some may already
189 # be on the server.
190 base_url = base_url.rstrip('/')
191 contains_hash_url = base_url + '/content/contains/default'
192 to_upload = []
193 next_queries = []
194 for relfile, metadata in infiles.iteritems():
195 if 'link' in metadata:
196 # Skip links when uploading.
197 continue
198
199 next_queries.append((relfile, metadata))
200 if len(next_queries) == 1000:
201 update_files_to_upload(contains_hash_url, next_queries, to_upload)
202 next_queries = []
203
204 if next_queries:
205 update_files_to_upload(contains_hash_url, next_queries, to_upload)
206
207
208 # Upload the required files.
209 remote_uploader = UploadRemote(base_url)
210 for relfile, metadata in to_upload:
211 # TODO(csharp): Fix crbug.com/150823 and enable the touched logic again.
212 # if metadata.get('touched_only') == True:
213 # hash_data = ''
214 infile = os.path.join(indir, relfile)
215 with open(infile, 'rb') as f:
216 hash_data = f.read()
217 priority = (run_isolated.Remote.HIGH if metadata.get('priority', '1') == '0'
218 else run_isolated.Remote.MED)
219 remote_uploader.add_item(priority, hash_data, metadata['sha-1'])
220 remote_uploader.join()
221
222 exception = remote_uploader.next_exception()
223 if exception:
224 raise exception[0], exception[1], exception[2]
225 total = len(infiles)
226 total_size = sum(metadata.get('size', 0) for metadata in infiles.itervalues())
227 logging.info(
228 'Total: %6d, %9.1fkb',
229 total,
230 sum(m.get('size', 0) for m in infiles.itervalues()) / 1024.)
231 cache_hit = set(infiles.iterkeys()) - set(x[0] for x in to_upload)
232 cache_hit_size = sum(infiles[i].get('size', 0) for i in cache_hit)
233 logging.info(
234 'cache hit: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
235 len(cache_hit),
236 cache_hit_size / 1024.,
237 len(cache_hit) * 100. / total,
238 cache_hit_size * 100. / total_size)
239 cache_miss = to_upload
240 cache_miss_size = sum(infiles[i[0]].get('size', 0) for i in cache_miss)
241 logging.info(
242 'cache miss: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
243 len(cache_miss),
244 cache_miss_size / 1024.,
245 len(cache_miss) * 100. / total,
246 cache_miss_size * 100. / total_size)
247
248
249def main():
250 parser = optparse.OptionParser(
251 usage='%prog [options] <file1..fileN> or - to read from stdin',
252 description=sys.modules[__name__].__doc__)
253 # TODO(maruel): Support both NFS and isolateserver.
254 parser.add_option('-o', '--outdir', help='Remote server to archive to')
255 parser.add_option(
256 '-v', '--verbose',
257 action='count',
258 help='Use multiple times to increase verbosity')
259
260 options, files = parser.parse_args()
261
262 levels = [logging.ERROR, logging.INFO, logging.DEBUG]
263 logging.basicConfig(
264 level=levels[min(len(levels)-1, options.verbose)],
265 format='%(levelname)5s %(module)15s(%(lineno)3d): %(message)s')
266 if files == ['-']:
267 files = sys.stdin.readlines()
268
269 if not files:
270 parser.error('Nothing to upload')
271 if not options.outdir:
272 parser.error('Nowhere to send. Please specify --outdir')
273
274 # Load the necessary metadata. This is going to be rewritten eventually to be
275 # more efficient.
276 infiles = dict(
277 (
278 f,
279 {
280 'size': os.stat(f).st_size,
281 'sha-1': hashlib.sha1(open(f, 'r').read()).hexdigest(),
282 }
283 )
284 for f in files)
285
286 with run_isolated.Profiler('Archive'):
287 upload_sha1_tree(
288 base_url=options.outdir,
289 indir=os.getcwd(),
290 infiles=infiles)
291 return 0
292
293
294if __name__ == '__main__':
295 sys.exit(main())