blob: aac7bb2cddf6be7ea026f439cf7a6b110b137e68 [file] [log] [blame]
Keita Suzuki77233ae2020-02-04 12:55:08 +09001# Copyright 202, Google Inc.
2# All rights reserved.
3#
4# Redistribution and use in source and binary forms, with or without
5# modification, are permitted provided that the following conditions are
6# met:
7#
8# * Redistributions of source code must retain the above copyright
9# notice, this list of conditions and the following disclaimer.
10# * Redistributions in binary form must reproduce the above
11# copyright notice, this list of conditions and the following disclaimer
12# in the documentation and/or other materials provided with the
13# distribution.
14# * Neither the name of Google Inc. nor the names of its
15# contributors may be used to endorse or promote products derived from
16# this software without specific prior written permission.
17#
18# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29"""Standalone WebsocketServer
30
31This file deals with the main module of standalone server. Although it is fine
32to import this file directly to use WebSocketServer, it is strongly recommended
33to use standalone.py, since it is intended to act as a skeleton of this module.
34"""
35
36from __future__ import absolute_import
37from six.moves import BaseHTTPServer
38from six.moves import socketserver
39import logging
40import re
41import select
42import socket
43import ssl
44import threading
45import traceback
46
47from mod_pywebsocket import dispatch
48from mod_pywebsocket import util
49from mod_pywebsocket.request_handler import WebSocketRequestHandler
50
51
52def _alias_handlers(dispatcher, websock_handlers_map_file):
53 """Set aliases specified in websock_handler_map_file in dispatcher.
54
55 Args:
56 dispatcher: dispatch.Dispatcher instance
57 websock_handler_map_file: alias map file
58 """
59
60 with open(websock_handlers_map_file) as f:
61 for line in f:
62 if line[0] == '#' or line.isspace():
63 continue
64 m = re.match('(\S+)\s+(\S+)$', line)
65 if not m:
66 logging.warning('Wrong format in map file:' + line)
67 continue
68 try:
69 dispatcher.add_resource_path_alias(m.group(1), m.group(2))
70 except dispatch.DispatchException as e:
71 logging.error(str(e))
72
73
74class WebSocketServer(socketserver.ThreadingMixIn, BaseHTTPServer.HTTPServer):
75 """HTTPServer specialized for WebSocket."""
76
77 # Overrides SocketServer.ThreadingMixIn.daemon_threads
78 daemon_threads = True
79 # Overrides BaseHTTPServer.HTTPServer.allow_reuse_address
80 allow_reuse_address = True
81
82 def __init__(self, options):
83 """Override SocketServer.TCPServer.__init__ to set SSL enabled
84 socket object to self.socket before server_bind and server_activate,
85 if necessary.
86 """
87
88 # Share a Dispatcher among request handlers to save time for
89 # instantiation. Dispatcher can be shared because it is thread-safe.
90 options.dispatcher = dispatch.Dispatcher(
91 options.websock_handlers, options.scan_dir,
92 options.allow_handlers_outside_root_dir)
93 if options.websock_handlers_map_file:
94 _alias_handlers(options.dispatcher,
95 options.websock_handlers_map_file)
96 warnings = options.dispatcher.source_warnings()
97 if warnings:
98 for warning in warnings:
99 logging.warning('Warning in source loading: %s' % warning)
100
101 self._logger = util.get_class_logger(self)
102
103 self.request_queue_size = options.request_queue_size
104 self.__ws_is_shut_down = threading.Event()
105 self.__ws_serving = False
106
107 socketserver.BaseServer.__init__(self,
108 (options.server_host, options.port),
109 WebSocketRequestHandler)
110
111 # Expose the options object to allow handler objects access it. We name
112 # it with websocket_ prefix to avoid conflict.
113 self.websocket_server_options = options
114
115 self._create_sockets()
116 self.server_bind()
117 self.server_activate()
118
119 def _create_sockets(self):
120 self.server_name, self.server_port = self.server_address
121 self._sockets = []
122 if not self.server_name:
123 # On platforms that doesn't support IPv6, the first bind fails.
124 # On platforms that supports IPv6
125 # - If it binds both IPv4 and IPv6 on call with AF_INET6, the
126 # first bind succeeds and the second fails (we'll see 'Address
127 # already in use' error).
128 # - If it binds only IPv6 on call with AF_INET6, both call are
129 # expected to succeed to listen both protocol.
130 addrinfo_array = [(socket.AF_INET6, socket.SOCK_STREAM, '', '',
131 ''),
132 (socket.AF_INET, socket.SOCK_STREAM, '', '', '')]
133 else:
134 addrinfo_array = socket.getaddrinfo(self.server_name,
135 self.server_port,
136 socket.AF_UNSPEC,
137 socket.SOCK_STREAM,
138 socket.IPPROTO_TCP)
139 for addrinfo in addrinfo_array:
140 self._logger.info('Create socket on: %r', addrinfo)
141 family, socktype, proto, canonname, sockaddr = addrinfo
142 try:
143 socket_ = socket.socket(family, socktype)
144 except Exception as e:
145 self._logger.info('Skip by failure: %r', e)
146 continue
147 server_options = self.websocket_server_options
148 if server_options.use_tls:
149 if server_options.tls_client_auth:
150 if server_options.tls_client_cert_optional:
151 client_cert_ = ssl.CERT_OPTIONAL
152 else:
153 client_cert_ = ssl.CERT_REQUIRED
154 else:
155 client_cert_ = ssl.CERT_NONE
156 socket_ = ssl.wrap_socket(
157 socket_,
158 keyfile=server_options.private_key,
159 certfile=server_options.certificate,
160 ca_certs=server_options.tls_client_ca,
161 cert_reqs=client_cert_)
162 self._sockets.append((socket_, addrinfo))
163
164 def server_bind(self):
165 """Override SocketServer.TCPServer.server_bind to enable multiple
166 sockets bind.
167 """
168
169 failed_sockets = []
170
171 for socketinfo in self._sockets:
172 socket_, addrinfo = socketinfo
173 self._logger.info('Bind on: %r', addrinfo)
174 if self.allow_reuse_address:
175 socket_.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
176 try:
177 socket_.bind(self.server_address)
178 except Exception as e:
179 self._logger.info('Skip by failure: %r', e)
180 socket_.close()
181 failed_sockets.append(socketinfo)
182 if self.server_address[1] == 0:
183 # The operating system assigns the actual port number for port
184 # number 0. This case, the second and later sockets should use
185 # the same port number. Also self.server_port is rewritten
186 # because it is exported, and will be used by external code.
187 self.server_address = (self.server_name,
188 socket_.getsockname()[1])
189 self.server_port = self.server_address[1]
190 self._logger.info('Port %r is assigned', self.server_port)
191
192 for socketinfo in failed_sockets:
193 self._sockets.remove(socketinfo)
194
195 def server_activate(self):
196 """Override SocketServer.TCPServer.server_activate to enable multiple
197 sockets listen.
198 """
199
200 failed_sockets = []
201
202 for socketinfo in self._sockets:
203 socket_, addrinfo = socketinfo
204 self._logger.info('Listen on: %r', addrinfo)
205 try:
206 socket_.listen(self.request_queue_size)
207 except Exception as e:
208 self._logger.info('Skip by failure: %r', e)
209 socket_.close()
210 failed_sockets.append(socketinfo)
211
212 for socketinfo in failed_sockets:
213 self._sockets.remove(socketinfo)
214
215 if len(self._sockets) == 0:
216 self._logger.critical(
217 'No sockets activated. Use info log level to see the reason.')
218
219 def server_close(self):
220 """Override SocketServer.TCPServer.server_close to enable multiple
221 sockets close.
222 """
223
224 for socketinfo in self._sockets:
225 socket_, addrinfo = socketinfo
226 self._logger.info('Close on: %r', addrinfo)
227 socket_.close()
228
229 def fileno(self):
230 """Override SocketServer.TCPServer.fileno."""
231
232 self._logger.critical('Not supported: fileno')
233 return self._sockets[0][0].fileno()
234
235 def handle_error(self, request, client_address):
236 """Override SocketServer.handle_error."""
237
238 self._logger.error('Exception in processing request from: %r\n%s',
239 client_address, traceback.format_exc())
240 # Note: client_address is a tuple.
241
242 def get_request(self):
243 """Override TCPServer.get_request."""
244
245 accepted_socket, client_address = self.socket.accept()
246
247 server_options = self.websocket_server_options
248 if server_options.use_tls:
249 # Print cipher in use. Handshake is done on accept.
250 self._logger.debug('Cipher: %s', accepted_socket.cipher())
251 self._logger.debug('Client cert: %r',
252 accepted_socket.getpeercert())
253
254 return accepted_socket, client_address
255
256 def serve_forever(self, poll_interval=0.5):
257 """Override SocketServer.BaseServer.serve_forever."""
258
259 self.__ws_serving = True
260 self.__ws_is_shut_down.clear()
261 handle_request = self.handle_request
262 if hasattr(self, '_handle_request_noblock'):
263 handle_request = self._handle_request_noblock
264 else:
265 self._logger.warning('Fallback to blocking request handler')
266 try:
267 while self.__ws_serving:
268 r, w, e = select.select(
269 [socket_[0] for socket_ in self._sockets], [], [],
270 poll_interval)
271 for socket_ in r:
272 self.socket = socket_
273 handle_request()
274 self.socket = None
275 finally:
276 self.__ws_is_shut_down.set()
277
278 def shutdown(self):
279 """Override SocketServer.BaseServer.shutdown."""
280
281 self.__ws_serving = False
282 self.__ws_is_shut_down.wait()
283
284
285# vi:sts=4 sw=4 et