blob: a27b534ff1ef3fbb7ce3c38006d8fdd4f5b7f24a [file] [log] [blame]
#!/usr/bin/python2.4
# Copyright (c) 2006-2008 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""This is a simple HTTP server used for testing Chrome.
It supports several test URLs, as specified by the handlers in TestPageHandler.
It defaults to living on localhost:8888.
It can use https if you specify the flag --https=CERT where CERT is the path
to a pem file containing the certificate and private key that should be used.
To shut it down properly, visit localhost:8888/kill.
"""
import base64
import BaseHTTPServer
import cgi
import optparse
import os
import re
import shutil
import SocketServer
import sys
import time
import tlslite
import tlslite.api
import pyftpdlib.ftpserver
try:
import hashlib
_new_md5 = hashlib.md5
except ImportError:
import md5
_new_md5 = md5.new
SERVER_HTTP = 0
SERVER_FTP = 1
debug_output = sys.stderr
def debug(str):
debug_output.write(str + "\n")
debug_output.flush()
class StoppableHTTPServer(BaseHTTPServer.HTTPServer):
"""This is a specialization of of BaseHTTPServer to allow it
to be exited cleanly (by setting its "stop" member to True)."""
def serve_forever(self):
self.stop = False
self.nonce = None
while not self.stop:
self.handle_request()
self.socket.close()
class HTTPSServer(tlslite.api.TLSSocketServerMixIn, StoppableHTTPServer):
"""This is a specialization of StoppableHTTPerver that add https support."""
def __init__(self, server_address, request_hander_class, cert_path):
s = open(cert_path).read()
x509 = tlslite.api.X509()
x509.parse(s)
self.cert_chain = tlslite.api.X509CertChain([x509])
s = open(cert_path).read()
self.private_key = tlslite.api.parsePEMKey(s, private=True)
self.session_cache = tlslite.api.SessionCache()
StoppableHTTPServer.__init__(self, server_address, request_hander_class)
def handshake(self, tlsConnection):
"""Creates the SSL connection."""
try:
tlsConnection.handshakeServer(certChain=self.cert_chain,
privateKey=self.private_key,
sessionCache=self.session_cache)
tlsConnection.ignoreAbruptClose = True
return True
except tlslite.api.TLSError, error:
print "Handshake failure:", str(error)
return False
class TestPageHandler(BaseHTTPServer.BaseHTTPRequestHandler):
def __init__(self, request, client_address, socket_server):
self._connect_handlers = [
self.RedirectConnectHandler,
self.ServerAuthConnectHandler,
self.DefaultConnectResponseHandler]
self._get_handlers = [
self.KillHandler,
self.NoCacheMaxAgeTimeHandler,
self.NoCacheTimeHandler,
self.CacheTimeHandler,
self.CacheExpiresHandler,
self.CacheProxyRevalidateHandler,
self.CachePrivateHandler,
self.CachePublicHandler,
self.CacheSMaxAgeHandler,
self.CacheMustRevalidateHandler,
self.CacheMustRevalidateMaxAgeHandler,
self.CacheNoStoreHandler,
self.CacheNoStoreMaxAgeHandler,
self.CacheNoTransformHandler,
self.DownloadHandler,
self.DownloadFinishHandler,
self.EchoHeader,
self.EchoAllHandler,
self.FileHandler,
self.RealFileWithCommonHeaderHandler,
self.RealBZ2FileWithCommonHeaderHandler,
self.AuthBasicHandler,
self.AuthDigestHandler,
self.SlowServerHandler,
self.ContentTypeHandler,
self.ServerRedirectHandler,
self.ClientRedirectHandler,
self.DefaultResponseHandler]
self._post_handlers = [
self.WriteFile,
self.EchoTitleHandler,
self.EchoAllHandler,
self.EchoHandler] + self._get_handlers
self._mime_types = {
'gif': 'image/gif',
'jpeg' : 'image/jpeg',
'jpg' : 'image/jpeg'
}
self._default_mime_type = 'text/html'
BaseHTTPServer.BaseHTTPRequestHandler.__init__(self, request,
client_address,
socket_server)
def _ShouldHandleRequest(self, handler_name):
"""Determines if the path can be handled by the handler.
We consider a handler valid if the path begins with the
handler name. It can optionally be followed by "?*", "/*".
"""
pattern = re.compile('%s($|\?|/).*' % handler_name)
return pattern.match(self.path)
def GetMIMETypeFromName(self, file_name):
"""Returns the mime type for the specified file_name. So far it only looks
at the file extension."""
(shortname, extension) = os.path.splitext(file_name)
if len(extension) == 0:
# no extension.
return self._default_mime_type
# extension starts with a dot, so we need to remove it
return self._mime_types.get(extension[1:], self._default_mime_type)
def KillHandler(self):
"""This request handler kills the server, for use when we're done"
with the a particular test."""
if (self.path.find("kill") < 0):
return False
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.send_header('Cache-Control', 'max-age=0')
self.end_headers()
self.wfile.write("Time to die")
self.server.stop = True
return True
def NoCacheMaxAgeTimeHandler(self):
"""This request handler yields a page with the title set to the current
system time, and no caching requested."""
if not self._ShouldHandleRequest("/nocachetime/maxage"):
return False
self.send_response(200)
self.send_header('Cache-Control', 'max-age=0')
self.send_header('Content-type', 'text/html')
self.end_headers()
self.wfile.write('<html><head><title>%s</title></head></html>' %
time.time())
return True
def NoCacheTimeHandler(self):
"""This request handler yields a page with the title set to the current
system time, and no caching requested."""
if not self._ShouldHandleRequest("/nocachetime"):
return False
self.send_response(200)
self.send_header('Cache-Control', 'no-cache')
self.send_header('Content-type', 'text/html')
self.end_headers()
self.wfile.write('<html><head><title>%s</title></head></html>' %
time.time())
return True
def CacheTimeHandler(self):
"""This request handler yields a page with the title set to the current
system time, and allows caching for one minute."""
if not self._ShouldHandleRequest("/cachetime"):
return False
self.send_response(200)
self.send_header('Cache-Control', 'max-age=60')
self.send_header('Content-type', 'text/html')
self.end_headers()
self.wfile.write('<html><head><title>%s</title></head></html>' %
time.time())
return True
def CacheExpiresHandler(self):
"""This request handler yields a page with the title set to the current
system time, and set the page to expire on 1 Jan 2099."""
if not self._ShouldHandleRequest("/cache/expires"):
return False
self.send_response(200)
self.send_header('Expires', 'Thu, 1 Jan 2099 00:00:00 GMT')
self.send_header('Content-type', 'text/html')
self.end_headers()
self.wfile.write('<html><head><title>%s</title></head></html>' %
time.time())
return True
def CacheProxyRevalidateHandler(self):
"""This request handler yields a page with the title set to the current
system time, and allows caching for 60 seconds"""
if not self._ShouldHandleRequest("/cache/proxy-revalidate"):
return False
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.send_header('Cache-Control', 'max-age=60, proxy-revalidate')
self.end_headers()
self.wfile.write('<html><head><title>%s</title></head></html>' %
time.time())
return True
def CachePrivateHandler(self):
"""This request handler yields a page with the title set to the current
system time, and allows caching for 5 seconds."""
if not self._ShouldHandleRequest("/cache/private"):
return False
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.send_header('Cache-Control', 'max-age=5, private')
self.end_headers()
self.wfile.write('<html><head><title>%s</title></head></html>' %
time.time())
return True
def CachePublicHandler(self):
"""This request handler yields a page with the title set to the current
system time, and allows caching for 5 seconds."""
if not self._ShouldHandleRequest("/cache/public"):
return False
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.send_header('Cache-Control', 'max-age=5, public')
self.end_headers()
self.wfile.write('<html><head><title>%s</title></head></html>' %
time.time())
return True
def CacheSMaxAgeHandler(self):
"""This request handler yields a page with the title set to the current
system time, and does not allow for caching."""
if not self._ShouldHandleRequest("/cache/s-maxage"):
return False
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.send_header('Cache-Control', 'public, s-maxage = 60, max-age = 0')
self.end_headers()
self.wfile.write('<html><head><title>%s</title></head></html>' %
time.time())
return True
def CacheMustRevalidateHandler(self):
"""This request handler yields a page with the title set to the current
system time, and does not allow caching."""
if not self._ShouldHandleRequest("/cache/must-revalidate"):
return False
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.send_header('Cache-Control', 'must-revalidate')
self.end_headers()
self.wfile.write('<html><head><title>%s</title></head></html>' %
time.time())
return True
def CacheMustRevalidateMaxAgeHandler(self):
"""This request handler yields a page with the title set to the current
system time, and does not allow caching event though max-age of 60
seconds is specified."""
if not self._ShouldHandleRequest("/cache/must-revalidate/max-age"):
return False
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.send_header('Cache-Control', 'max-age=60, must-revalidate')
self.end_headers()
self.wfile.write('<html><head><title>%s</title></head></html>' %
time.time())
return True
def CacheNoStoreHandler(self):
"""This request handler yields a page with the title set to the current
system time, and does not allow the page to be stored."""
if not self._ShouldHandleRequest("/cache/no-store"):
return False
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.send_header('Cache-Control', 'no-store')
self.end_headers()
self.wfile.write('<html><head><title>%s</title></head></html>' %
time.time())
return True
def CacheNoStoreMaxAgeHandler(self):
"""This request handler yields a page with the title set to the current
system time, and does not allow the page to be stored even though max-age
of 60 seconds is specified."""
if not self._ShouldHandleRequest("/cache/no-store/max-age"):
return False
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.send_header('Cache-Control', 'max-age=60, no-store')
self.end_headers()
self.wfile.write('<html><head><title>%s</title></head></html>' %
time.time())
return True
def CacheNoTransformHandler(self):
"""This request handler yields a page with the title set to the current
system time, and does not allow the content to transformed during
user-agent caching"""
if not self._ShouldHandleRequest("/cache/no-transform"):
return False
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.send_header('Cache-Control', 'no-transform')
self.end_headers()
self.wfile.write('<html><head><title>%s</title></head></html>' %
time.time())
return True
def EchoHeader(self):
"""This handler echoes back the value of a specific request header."""
if not self._ShouldHandleRequest("/echoheader"):
return False
query_char = self.path.find('?')
if query_char != -1:
header_name = self.path[query_char+1:]
self.send_response(200)
self.send_header('Content-type', 'text/plain')
self.send_header('Cache-control', 'max-age=60000')
# insert a vary header to properly indicate that the cachability of this
# request is subject to value of the request header being echoed.
if len(header_name) > 0:
self.send_header('Vary', header_name)
self.end_headers()
if len(header_name) > 0:
self.wfile.write(self.headers.getheader(header_name))
return True
def EchoHandler(self):
"""This handler just echoes back the payload of the request, for testing
form submission."""
if not self._ShouldHandleRequest("/echo"):
return False
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
length = int(self.headers.getheader('content-length'))
request = self.rfile.read(length)
self.wfile.write(request)
return True
def WriteFile(self):
"""This is handler dumps the content of POST request to a disk file into
the data_dir/dump. Sub-directories are not supported."""
prefix='/writefile/'
if not self.path.startswith(prefix):
return False
file_name = self.path[len(prefix):]
# do not allow fancy chars in file name
re.sub('[^a-zA-Z0-9_.-]+', '', file_name)
if len(file_name) and file_name[0] != '.':
path = os.path.join(self.server.data_dir, 'dump', file_name);
length = int(self.headers.getheader('content-length'))
request = self.rfile.read(length)
f = open(path, "wb")
f.write(request);
f.close()
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
self.wfile.write('<html>%s</html>' % file_name)
return True
def EchoTitleHandler(self):
"""This handler is like Echo, but sets the page title to the request."""
if not self._ShouldHandleRequest("/echotitle"):
return False
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
length = int(self.headers.getheader('content-length'))
request = self.rfile.read(length)
self.wfile.write('<html><head><title>')
self.wfile.write(request)
self.wfile.write('</title></head></html>')
return True
def EchoAllHandler(self):
"""This handler yields a (more) human-readable page listing information
about the request header & contents."""
if not self._ShouldHandleRequest("/echoall"):
return False
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
self.wfile.write('<html><head><style>'
'pre { border: 1px solid black; margin: 5px; padding: 5px }'
'</style></head><body>'
'<div style="float: right">'
'<a href="http://localhost:8888/echo">back to referring page</a></div>'
'<h1>Request Body:</h1><pre>')
if self.command == 'POST':
length = int(self.headers.getheader('content-length'))
qs = self.rfile.read(length)
params = cgi.parse_qs(qs, keep_blank_values=1)
for param in params:
self.wfile.write('%s=%s\n' % (param, params[param][0]))
self.wfile.write('</pre>')
self.wfile.write('<h1>Request Headers:</h1><pre>%s</pre>' % self.headers)
self.wfile.write('</body></html>')
return True
def DownloadHandler(self):
"""This handler sends a downloadable file with or without reporting
the size (6K)."""
if self.path.startswith("/download-unknown-size"):
send_length = False
elif self.path.startswith("/download-known-size"):
send_length = True
else:
return False
#
# The test which uses this functionality is attempting to send
# small chunks of data to the client. Use a fairly large buffer
# so that we'll fill chrome's IO buffer enough to force it to
# actually write the data.
# See also the comments in the client-side of this test in
# download_uitest.cc
#
size_chunk1 = 35*1024
size_chunk2 = 10*1024
self.send_response(200)
self.send_header('Content-type', 'application/octet-stream')
self.send_header('Cache-Control', 'max-age=0')
if send_length:
self.send_header('Content-Length', size_chunk1 + size_chunk2)
self.end_headers()
# First chunk of data:
self.wfile.write("*" * size_chunk1)
self.wfile.flush()
# handle requests until one of them clears this flag.
self.server.waitForDownload = True
while self.server.waitForDownload:
self.server.handle_request()
# Second chunk of data:
self.wfile.write("*" * size_chunk2)
return True
def DownloadFinishHandler(self):
"""This handler just tells the server to finish the current download."""
if not self._ShouldHandleRequest("/download-finish"):
return False
self.server.waitForDownload = False
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.send_header('Cache-Control', 'max-age=0')
self.end_headers()
return True
def FileHandler(self):
"""This handler sends the contents of the requested file. Wow, it's like
a real webserver!"""
prefix = self.server.file_root_url
if not self.path.startswith(prefix):
return False
file = self.path[len(prefix):]
entries = file.split('/');
path = os.path.join(self.server.data_dir, *entries)
if os.path.isdir(path):
path = os.path.join(path, 'index.html')
if not os.path.isfile(path):
print "File not found " + file + " full path:" + path
self.send_error(404)
return True
f = open(path, "rb")
data = f.read()
f.close()
# If file.mock-http-headers exists, it contains the headers we
# should send. Read them in and parse them.
headers_path = path + '.mock-http-headers'
if os.path.isfile(headers_path):
f = open(headers_path, "r")
# "HTTP/1.1 200 OK"
response = f.readline()
status_code = re.findall('HTTP/\d+.\d+ (\d+)', response)[0]
self.send_response(int(status_code))
for line in f:
# "name: value"
name, value = re.findall('(\S+):\s*(.*)', line)[0]
self.send_header(name, value)
f.close()
else:
# Could be more generic once we support mime-type sniffing, but for
# now we need to set it explicitly.
self.send_response(200)
self.send_header('Content-type', self.GetMIMETypeFromName(file))
self.send_header('Content-Length', len(data))
self.end_headers()
self.wfile.write(data)
return True
def RealFileWithCommonHeaderHandler(self):
"""This handler sends the contents of the requested file without the pseudo
http head!"""
prefix='/realfiles/'
if not self.path.startswith(prefix):
return False
file = self.path[len(prefix):]
path = os.path.join(self.server.data_dir, file)
try:
f = open(path, "rb")
data = f.read()
f.close()
# just simply set the MIME as octal stream
self.send_response(200)
self.send_header('Content-type', 'application/octet-stream')
self.end_headers()
self.wfile.write(data)
except:
self.send_error(404)
return True
def RealBZ2FileWithCommonHeaderHandler(self):
"""This handler sends the bzip2 contents of the requested file with
corresponding Content-Encoding field in http head!"""
prefix='/realbz2files/'
if not self.path.startswith(prefix):
return False
parts = self.path.split('?')
file = parts[0][len(prefix):]
path = os.path.join(self.server.data_dir, file) + '.bz2'
if len(parts) > 1:
options = parts[1]
else:
options = ''
try:
self.send_response(200)
accept_encoding = self.headers.get("Accept-Encoding")
if accept_encoding.find("bzip2") != -1:
f = open(path, "rb")
data = f.read()
f.close()
self.send_header('Content-Encoding', 'bzip2')
self.send_header('Content-type', 'application/x-bzip2')
self.end_headers()
if options == 'incremental-header':
self.wfile.write(data[:1])
self.wfile.flush()
time.sleep(1.0)
self.wfile.write(data[1:])
else:
self.wfile.write(data)
else:
"""client do not support bzip2 format, send pseudo content
"""
self.send_header('Content-type', 'text/html; charset=ISO-8859-1')
self.end_headers()
self.wfile.write("you do not support bzip2 encoding")
except:
self.send_error(404)
return True
def AuthBasicHandler(self):
"""This handler tests 'Basic' authentication. It just sends a page with
title 'user/pass' if you succeed."""
if not self._ShouldHandleRequest("/auth-basic"):
return False
username = userpass = password = b64str = ""
set_cookie_if_challenged = self.path.find('?set-cookie-if-challenged') > 0
auth = self.headers.getheader('authorization')
try:
if not auth:
raise Exception('no auth')
b64str = re.findall(r'Basic (\S+)', auth)[0]
userpass = base64.b64decode(b64str)
username, password = re.findall(r'([^:]+):(\S+)', userpass)[0]
if password != 'secret':
raise Exception('wrong password')
except Exception, e:
# Authentication failed.
self.send_response(401)
self.send_header('WWW-Authenticate', 'Basic realm="testrealm"')
self.send_header('Content-type', 'text/html')
if set_cookie_if_challenged:
self.send_header('Set-Cookie', 'got_challenged=true')
self.end_headers()
self.wfile.write('<html><head>')
self.wfile.write('<title>Denied: %s</title>' % e)
self.wfile.write('</head><body>')
self.wfile.write('auth=%s<p>' % auth)
self.wfile.write('b64str=%s<p>' % b64str)
self.wfile.write('username: %s<p>' % username)
self.wfile.write('userpass: %s<p>' % userpass)
self.wfile.write('password: %s<p>' % password)
self.wfile.write('You sent:<br>%s<p>' % self.headers)
self.wfile.write('</body></html>')
return True
# Authentication successful. (Return a cachable response to allow for
# testing cached pages that require authentication.)
if_none_match = self.headers.getheader('if-none-match')
if if_none_match == "abc":
self.send_response(304)
self.end_headers()
else:
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.send_header('Cache-control', 'max-age=60000')
self.send_header('Etag', 'abc')
self.end_headers()
self.wfile.write('<html><head>')
self.wfile.write('<title>%s/%s</title>' % (username, password))
self.wfile.write('</head><body>')
self.wfile.write('auth=%s<p>' % auth)
self.wfile.write('You sent:<br>%s<p>' % self.headers)
self.wfile.write('</body></html>')
return True
def AuthDigestHandler(self):
"""This handler tests 'Digest' authentication. It just sends a page with
title 'user/pass' if you succeed."""
if not self._ShouldHandleRequest("/auth-digest"):
return False
# Periodically generate a new nonce. Technically we should incorporate
# the request URL into this, but we don't care for testing.
nonce_life = 10
stale = False
if (not self.server.nonce or
(time.time() - self.server.nonce_time > nonce_life)):
if self.server.nonce:
stale = True
self.server.nonce_time = time.time()
self.server.nonce = \
_new_md5(time.ctime(self.server.nonce_time) +
'privatekey').hexdigest()
nonce = self.server.nonce
opaque = _new_md5('opaque').hexdigest()
password = 'secret'
realm = 'testrealm'
auth = self.headers.getheader('authorization')
pairs = {}
try:
if not auth:
raise Exception('no auth')
if not auth.startswith('Digest'):
raise Exception('not digest')
# Pull out all the name="value" pairs as a dictionary.
pairs = dict(re.findall(r'(\b[^ ,=]+)="?([^",]+)"?', auth))
# Make sure it's all valid.
if pairs['nonce'] != nonce:
raise Exception('wrong nonce')
if pairs['opaque'] != opaque:
raise Exception('wrong opaque')
# Check the 'response' value and make sure it matches our magic hash.
# See http://www.ietf.org/rfc/rfc2617.txt
hash_a1 = _new_md5(
':'.join([pairs['username'], realm, password])).hexdigest()
hash_a2 = _new_md5(':'.join([self.command, pairs['uri']])).hexdigest()
if 'qop' in pairs and 'nc' in pairs and 'cnonce' in pairs:
response = _new_md5(':'.join([hash_a1, nonce, pairs['nc'],
pairs['cnonce'], pairs['qop'], hash_a2])).hexdigest()
else:
response = _new_md5(':'.join([hash_a1, nonce, hash_a2])).hexdigest()
if pairs['response'] != response:
raise Exception('wrong password')
except Exception, e:
# Authentication failed.
self.send_response(401)
hdr = ('Digest '
'realm="%s", '
'domain="/", '
'qop="auth", '
'algorithm=MD5, '
'nonce="%s", '
'opaque="%s"') % (realm, nonce, opaque)
if stale:
hdr += ', stale="TRUE"'
self.send_header('WWW-Authenticate', hdr)
self.send_header('Content-type', 'text/html')
self.end_headers()
self.wfile.write('<html><head>')
self.wfile.write('<title>Denied: %s</title>' % e)
self.wfile.write('</head><body>')
self.wfile.write('auth=%s<p>' % auth)
self.wfile.write('pairs=%s<p>' % pairs)
self.wfile.write('You sent:<br>%s<p>' % self.headers)
self.wfile.write('We are replying:<br>%s<p>' % hdr)
self.wfile.write('</body></html>')
return True
# Authentication successful.
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
self.wfile.write('<html><head>')
self.wfile.write('<title>%s/%s</title>' % (pairs['username'], password))
self.wfile.write('</head><body>')
self.wfile.write('auth=%s<p>' % auth)
self.wfile.write('pairs=%s<p>' % pairs)
self.wfile.write('</body></html>')
return True
def SlowServerHandler(self):
"""Wait for the user suggested time before responding. The syntax is
/slow?0.5 to wait for half a second."""
if not self._ShouldHandleRequest("/slow"):
return False
query_char = self.path.find('?')
wait_sec = 1.0
if query_char >= 0:
try:
wait_sec = int(self.path[query_char + 1:])
except ValueError:
pass
time.sleep(wait_sec)
self.send_response(200)
self.send_header('Content-type', 'text/plain')
self.end_headers()
self.wfile.write("waited %d seconds" % wait_sec)
return True
def ContentTypeHandler(self):
"""Returns a string of html with the given content type. E.g.,
/contenttype?text/css returns an html file with the Content-Type
header set to text/css."""
if not self._ShouldHandleRequest("/contenttype"):
return False
query_char = self.path.find('?')
content_type = self.path[query_char + 1:].strip()
if not content_type:
content_type = 'text/html'
self.send_response(200)
self.send_header('Content-Type', content_type)
self.end_headers()
self.wfile.write("<html>\n<body>\n<p>HTML text</p>\n</body>\n</html>\n");
return True
def ServerRedirectHandler(self):
"""Sends a server redirect to the given URL. The syntax is
'/server-redirect?http://foo.bar/asdf' to redirect to
'http://foo.bar/asdf'"""
test_name = "/server-redirect"
if not self._ShouldHandleRequest(test_name):
return False
query_char = self.path.find('?')
if query_char < 0 or len(self.path) <= query_char + 1:
self.sendRedirectHelp(test_name)
return True
dest = self.path[query_char + 1:]
self.send_response(301) # moved permanently
self.send_header('Location', dest)
self.send_header('Content-type', 'text/html')
self.end_headers()
self.wfile.write('<html><head>')
self.wfile.write('</head><body>Redirecting to %s</body></html>' % dest)
return True
def ClientRedirectHandler(self):
"""Sends a client redirect to the given URL. The syntax is
'/client-redirect?http://foo.bar/asdf' to redirect to
'http://foo.bar/asdf'"""
test_name = "/client-redirect"
if not self._ShouldHandleRequest(test_name):
return False
query_char = self.path.find('?');
if query_char < 0 or len(self.path) <= query_char + 1:
self.sendRedirectHelp(test_name)
return True
dest = self.path[query_char + 1:]
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
self.wfile.write('<html><head>')
self.wfile.write('<meta http-equiv="refresh" content="0;url=%s">' % dest)
self.wfile.write('</head><body>Redirecting to %s</body></html>' % dest)
return True
def DefaultResponseHandler(self):
"""This is the catch-all response handler for requests that aren't handled
by one of the special handlers above.
Note that we specify the content-length as without it the https connection
is not closed properly (and the browser keeps expecting data)."""
contents = "Default response given for path: " + self.path
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.send_header("Content-Length", len(contents))
self.end_headers()
self.wfile.write(contents)
return True
def RedirectConnectHandler(self):
"""Sends a redirect to the CONNECT request for www.redirect.com. This
response is not specified by the RFC, so the browser should not follow
the redirect."""
if (self.path.find("www.redirect.com") < 0):
return False
dest = "http://www.destination.com/foo.js"
self.send_response(302) # moved temporarily
self.send_header('Location', dest)
self.send_header('Connection', 'close')
self.end_headers()
return True
def ServerAuthConnectHandler(self):
"""Sends a 401 to the CONNECT request for www.server-auth.com. This
response doesn't make sense because the proxy server cannot request
server authentication."""
if (self.path.find("www.server-auth.com") < 0):
return False
challenge = 'Basic realm="WallyWorld"'
self.send_response(401) # unauthorized
self.send_header('WWW-Authenticate', challenge)
self.send_header('Connection', 'close')
self.end_headers()
return True
def DefaultConnectResponseHandler(self):
"""This is the catch-all response handler for CONNECT requests that aren't
handled by one of the special handlers above. Real Web servers respond
with 400 to CONNECT requests."""
contents = "Your client has issued a malformed or illegal request."
self.send_response(400) # bad request
self.send_header('Content-type', 'text/html')
self.send_header("Content-Length", len(contents))
self.end_headers()
self.wfile.write(contents)
return True
def do_CONNECT(self):
for handler in self._connect_handlers:
if handler():
return
def do_GET(self):
for handler in self._get_handlers:
if handler():
return
def do_POST(self):
for handler in self._post_handlers:
if handler():
return
# called by the redirect handling function when there is no parameter
def sendRedirectHelp(self, redirect_name):
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
self.wfile.write('<html><body><h1>Error: no redirect destination</h1>')
self.wfile.write('Use <pre>%s?http://dest...</pre>' % redirect_name)
self.wfile.write('</body></html>')
def MakeDumpDir(data_dir):
"""Create directory named 'dump' where uploaded data via HTTP POST request
will be stored. If the directory already exists all files and subdirectories
will be deleted."""
dump_dir = os.path.join(data_dir, 'dump');
if os.path.isdir(dump_dir):
shutil.rmtree(dump_dir)
os.mkdir(dump_dir)
def MakeDataDir():
if options.data_dir:
if not os.path.isdir(options.data_dir):
print 'specified data dir not found: ' + options.data_dir + ' exiting...'
return None
my_data_dir = options.data_dir
else:
# Create the default path to our data dir, relative to the exe dir.
my_data_dir = os.path.dirname(sys.argv[0])
my_data_dir = os.path.join(my_data_dir, "..", "..", "..", "..",
"test", "data")
#TODO(ibrar): Must use Find* funtion defined in google\tools
#i.e my_data_dir = FindUpward(my_data_dir, "test", "data")
return my_data_dir
def main(options, args):
# redirect output to a log file so it doesn't spam the unit test output
logfile = open('testserver.log', 'w')
sys.stderr = sys.stdout = logfile
port = options.port
if options.server_type == SERVER_HTTP:
if options.cert:
# let's make sure the cert file exists.
if not os.path.isfile(options.cert):
print 'specified cert file not found: ' + options.cert + ' exiting...'
return
server = HTTPSServer(('127.0.0.1', port), TestPageHandler, options.cert)
print 'HTTPS server started on port %d...' % port
else:
server = StoppableHTTPServer(('127.0.0.1', port), TestPageHandler)
print 'HTTP server started on port %d...' % port
server.data_dir = MakeDataDir()
server.file_root_url = options.file_root_url
MakeDumpDir(server.data_dir)
# means FTP Server
else:
my_data_dir = MakeDataDir()
def line_logger(msg):
if (msg.find("kill") >= 0):
server.stop = True
print 'shutting down server'
sys.exit(0)
# Instantiate a dummy authorizer for managing 'virtual' users
authorizer = pyftpdlib.ftpserver.DummyAuthorizer()
# Define a new user having full r/w permissions and a read-only
# anonymous user
authorizer.add_user('chrome', 'chrome', my_data_dir, perm='elradfmw')
authorizer.add_anonymous(my_data_dir)
# Instantiate FTP handler class
ftp_handler = pyftpdlib.ftpserver.FTPHandler
ftp_handler.authorizer = authorizer
pyftpdlib.ftpserver.logline = line_logger
# Define a customized banner (string returned when client connects)
ftp_handler.banner = ("pyftpdlib %s based ftpd ready." %
pyftpdlib.ftpserver.__ver__)
# Instantiate FTP server class and listen to 127.0.0.1:port
address = ('127.0.0.1', port)
server = pyftpdlib.ftpserver.FTPServer(address, ftp_handler)
print 'FTP server started on port %d...' % port
try:
server.serve_forever()
except KeyboardInterrupt:
print 'shutting down server'
server.stop = True
if __name__ == '__main__':
option_parser = optparse.OptionParser()
option_parser.add_option("-f", '--ftp', action='store_const',
const=SERVER_FTP, default=SERVER_HTTP,
dest='server_type',
help='FTP or HTTP server default HTTP')
option_parser.add_option('', '--port', default='8888', type='int',
help='Port used by the server')
option_parser.add_option('', '--data-dir', dest='data_dir',
help='Directory from which to read the files')
option_parser.add_option('', '--https', dest='cert',
help='Specify that https should be used, specify '
'the path to the cert containing the private key '
'the server should use')
option_parser.add_option('', '--file-root-url', default='/files/',
help='Specify a root URL for files served.')
options, args = option_parser.parse_args()
sys.exit(main(options, args))