[3] | 1 | # (c) 2005 Ian Bicking and contributors; written for Paste (http://pythonpaste.org) |
---|
| 2 | # Licensed under the MIT license: http://www.opensource.org/licenses/mit-license.php |
---|
| 3 | # (c) 2005 Clark C. Evans |
---|
| 4 | # This module is part of the Python Paste Project and is released under |
---|
| 5 | # the MIT License: http://www.opensource.org/licenses/mit-license.php |
---|
| 6 | # This code was written with funding by http://prometheusresearch.com |
---|
| 7 | """ |
---|
| 8 | WSGI HTTP Server |
---|
| 9 | |
---|
| 10 | This is a minimalistic WSGI server using Python's built-in BaseHTTPServer; |
---|
| 11 | if pyOpenSSL is installed, it also provides SSL capabilities. |
---|
| 12 | """ |
---|
| 13 | |
---|
| 14 | # @@: add in protection against HTTP/1.0 clients who claim to |
---|
| 15 | # be 1.1 but do not send a Content-Length |
---|
| 16 | |
---|
| 17 | # @@: add support for chunked encoding, this is not a 1.1 server |
---|
| 18 | # till this is completed. |
---|
| 19 | |
---|
| 20 | import atexit |
---|
| 21 | import traceback |
---|
| 22 | import socket, sys, threading, urlparse, Queue, urllib |
---|
| 23 | import posixpath |
---|
| 24 | import time |
---|
| 25 | import thread |
---|
| 26 | import os |
---|
| 27 | from itertools import count |
---|
| 28 | from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer |
---|
| 29 | from SocketServer import ThreadingMixIn |
---|
| 30 | from paste.util import converters |
---|
| 31 | import logging |
---|
| 32 | try: |
---|
| 33 | from paste.util import killthread |
---|
| 34 | except ImportError: |
---|
| 35 | # Not available, probably no ctypes |
---|
| 36 | killthread = None |
---|
| 37 | |
---|
| 38 | __all__ = ['WSGIHandlerMixin', 'WSGIServer', 'WSGIHandler', 'serve'] |
---|
| 39 | __version__ = "0.5" |
---|
| 40 | |
---|
| 41 | class ContinueHook(object): |
---|
| 42 | """ |
---|
| 43 | When a client request includes a 'Expect: 100-continue' header, then |
---|
| 44 | it is the responsibility of the server to send 100 Continue when it |
---|
| 45 | is ready for the content body. This allows authentication, access |
---|
| 46 | levels, and other exceptions to be detected *before* bandwith is |
---|
| 47 | spent on the request body. |
---|
| 48 | |
---|
| 49 | This is a rfile wrapper that implements this functionality by |
---|
| 50 | sending 100 Continue to the client immediately after the user |
---|
| 51 | requests the content via a read() operation on the rfile stream. |
---|
| 52 | After this response is sent, it becomes a pass-through object. |
---|
| 53 | """ |
---|
| 54 | |
---|
| 55 | def __init__(self, rfile, write): |
---|
| 56 | self._ContinueFile_rfile = rfile |
---|
| 57 | self._ContinueFile_write = write |
---|
| 58 | for attr in ('close', 'closed', 'fileno', 'flush', |
---|
| 59 | 'mode', 'bufsize', 'softspace'): |
---|
| 60 | if hasattr(rfile, attr): |
---|
| 61 | setattr(self, attr, getattr(rfile, attr)) |
---|
| 62 | for attr in ('read', 'readline', 'readlines'): |
---|
| 63 | if hasattr(rfile, attr): |
---|
| 64 | setattr(self, attr, getattr(self, '_ContinueFile_' + attr)) |
---|
| 65 | |
---|
| 66 | def _ContinueFile_send(self): |
---|
| 67 | self._ContinueFile_write("HTTP/1.1 100 Continue\r\n\r\n") |
---|
| 68 | rfile = self._ContinueFile_rfile |
---|
| 69 | for attr in ('read', 'readline', 'readlines'): |
---|
| 70 | if hasattr(rfile, attr): |
---|
| 71 | setattr(self, attr, getattr(rfile, attr)) |
---|
| 72 | |
---|
| 73 | def _ContinueFile_read(self, size=-1): |
---|
| 74 | self._ContinueFile_send() |
---|
| 75 | return self._ContinueFile_rfile.readline(size) |
---|
| 76 | |
---|
| 77 | def _ContinueFile_readline(self, size=-1): |
---|
| 78 | self._ContinueFile_send() |
---|
| 79 | return self._ContinueFile_rfile.readline(size) |
---|
| 80 | |
---|
| 81 | def _ContinueFile_readlines(self, sizehint=0): |
---|
| 82 | self._ContinueFile_send() |
---|
| 83 | return self._ContinueFile_rfile.readlines(sizehint) |
---|
| 84 | |
---|
| 85 | class WSGIHandlerMixin: |
---|
| 86 | """ |
---|
| 87 | WSGI mix-in for HTTPRequestHandler |
---|
| 88 | |
---|
| 89 | This class is a mix-in to provide WSGI functionality to any |
---|
| 90 | HTTPRequestHandler derivative (as provided in Python's BaseHTTPServer). |
---|
| 91 | This assumes a ``wsgi_application`` handler on ``self.server``. |
---|
| 92 | """ |
---|
| 93 | lookup_addresses = True |
---|
| 94 | |
---|
| 95 | def log_request(self, *args, **kwargs): |
---|
| 96 | """ disable success request logging |
---|
| 97 | |
---|
| 98 | Logging transactions should not be part of a WSGI server, |
---|
| 99 | if you want logging; look at paste.translogger |
---|
| 100 | """ |
---|
| 101 | pass |
---|
| 102 | |
---|
| 103 | def log_message(self, *args, **kwargs): |
---|
| 104 | """ disable error message logging |
---|
| 105 | |
---|
| 106 | Logging transactions should not be part of a WSGI server, |
---|
| 107 | if you want logging; look at paste.translogger |
---|
| 108 | """ |
---|
| 109 | pass |
---|
| 110 | |
---|
| 111 | def version_string(self): |
---|
| 112 | """ behavior that BaseHTTPServer should have had """ |
---|
| 113 | if not self.sys_version: |
---|
| 114 | return self.server_version |
---|
| 115 | else: |
---|
| 116 | return self.server_version + ' ' + self.sys_version |
---|
| 117 | |
---|
| 118 | def wsgi_write_chunk(self, chunk): |
---|
| 119 | """ |
---|
| 120 | Write a chunk of the output stream; send headers if they |
---|
| 121 | have not already been sent. |
---|
| 122 | """ |
---|
| 123 | if not self.wsgi_headers_sent and not self.wsgi_curr_headers: |
---|
| 124 | raise RuntimeError( |
---|
| 125 | "Content returned before start_response called") |
---|
| 126 | if not self.wsgi_headers_sent: |
---|
| 127 | self.wsgi_headers_sent = True |
---|
| 128 | (status, headers) = self.wsgi_curr_headers |
---|
| 129 | code, message = status.split(" ", 1) |
---|
| 130 | self.send_response(int(code), message) |
---|
| 131 | # |
---|
| 132 | # HTTP/1.1 compliance; either send Content-Length or |
---|
| 133 | # signal that the connection is being closed. |
---|
| 134 | # |
---|
| 135 | send_close = True |
---|
| 136 | for (k, v) in headers: |
---|
| 137 | lk = k.lower() |
---|
| 138 | if 'content-length' == lk: |
---|
| 139 | send_close = False |
---|
| 140 | if 'connection' == lk: |
---|
| 141 | if 'close' == v.lower(): |
---|
| 142 | self.close_connection = 1 |
---|
| 143 | send_close = False |
---|
| 144 | self.send_header(k, v) |
---|
| 145 | if send_close: |
---|
| 146 | self.close_connection = 1 |
---|
| 147 | self.send_header('Connection', 'close') |
---|
| 148 | |
---|
| 149 | self.end_headers() |
---|
| 150 | self.wfile.write(chunk) |
---|
| 151 | |
---|
| 152 | def wsgi_start_response(self, status, response_headers, exc_info=None): |
---|
| 153 | if exc_info: |
---|
| 154 | try: |
---|
| 155 | if self.wsgi_headers_sent: |
---|
| 156 | raise exc_info[0], exc_info[1], exc_info[2] |
---|
| 157 | else: |
---|
| 158 | # In this case, we're going to assume that the |
---|
| 159 | # higher-level code is currently handling the |
---|
| 160 | # issue and returning a resonable response. |
---|
| 161 | # self.log_error(repr(exc_info)) |
---|
| 162 | pass |
---|
| 163 | finally: |
---|
| 164 | exc_info = None |
---|
| 165 | elif self.wsgi_curr_headers: |
---|
| 166 | assert 0, "Attempt to set headers a second time w/o an exc_info" |
---|
| 167 | self.wsgi_curr_headers = (status, response_headers) |
---|
| 168 | return self.wsgi_write_chunk |
---|
| 169 | |
---|
| 170 | def wsgi_setup(self, environ=None): |
---|
| 171 | """ |
---|
| 172 | Setup the member variables used by this WSGI mixin, including |
---|
| 173 | the ``environ`` and status member variables. |
---|
| 174 | |
---|
| 175 | After the basic environment is created; the optional ``environ`` |
---|
| 176 | argument can be used to override any settings. |
---|
| 177 | """ |
---|
| 178 | |
---|
| 179 | (scheme, netloc, path, query, fragment) = urlparse.urlsplit(self.path) |
---|
| 180 | path = urllib.unquote(path) |
---|
| 181 | endslash = path.endswith('/') |
---|
| 182 | path = posixpath.normpath(path) |
---|
| 183 | if endslash and path != '/': |
---|
| 184 | # Put the slash back... |
---|
| 185 | path += '/' |
---|
| 186 | (server_name, server_port) = self.server.server_address |
---|
| 187 | |
---|
| 188 | rfile = self.rfile |
---|
| 189 | if 'HTTP/1.1' == self.protocol_version and \ |
---|
| 190 | '100-continue' == self.headers.get('Expect','').lower(): |
---|
| 191 | rfile = ContinueHook(rfile, self.wfile.write) |
---|
| 192 | else: |
---|
| 193 | # We can put in the protection to keep from over-reading the |
---|
| 194 | # file |
---|
| 195 | try: |
---|
| 196 | content_length = int(self.headers.get('Content-Length', '0')) |
---|
| 197 | except ValueError: |
---|
| 198 | content_length = 0 |
---|
| 199 | if not hasattr(self.connection, 'get_context'): |
---|
| 200 | # @@: LimitedLengthFile is currently broken in connection |
---|
| 201 | # with SSL (sporatic errors that are diffcult to trace, but |
---|
| 202 | # ones that go away when you don't use LimitedLengthFile) |
---|
| 203 | rfile = LimitedLengthFile(rfile, content_length) |
---|
| 204 | |
---|
| 205 | remote_address = self.client_address[0] |
---|
| 206 | self.wsgi_environ = { |
---|
| 207 | 'wsgi.version': (1,0) |
---|
| 208 | ,'wsgi.url_scheme': 'http' |
---|
| 209 | ,'wsgi.input': rfile |
---|
| 210 | ,'wsgi.errors': sys.stderr |
---|
| 211 | ,'wsgi.multithread': True |
---|
| 212 | ,'wsgi.multiprocess': False |
---|
| 213 | ,'wsgi.run_once': False |
---|
| 214 | # CGI variables required by PEP-333 |
---|
| 215 | ,'REQUEST_METHOD': self.command |
---|
| 216 | ,'SCRIPT_NAME': '' # application is root of server |
---|
| 217 | ,'PATH_INFO': path |
---|
| 218 | ,'QUERY_STRING': query |
---|
| 219 | ,'CONTENT_TYPE': self.headers.get('Content-Type', '') |
---|
| 220 | ,'CONTENT_LENGTH': self.headers.get('Content-Length', '0') |
---|
| 221 | ,'SERVER_NAME': server_name |
---|
| 222 | ,'SERVER_PORT': str(server_port) |
---|
| 223 | ,'SERVER_PROTOCOL': self.request_version |
---|
| 224 | # CGI not required by PEP-333 |
---|
| 225 | ,'REMOTE_ADDR': remote_address |
---|
| 226 | } |
---|
| 227 | if scheme: |
---|
| 228 | self.wsgi_environ['paste.httpserver.proxy.scheme'] = scheme |
---|
| 229 | if netloc: |
---|
| 230 | self.wsgi_environ['paste.httpserver.proxy.host'] = netloc |
---|
| 231 | |
---|
| 232 | if self.lookup_addresses: |
---|
| 233 | # @@: make lookup_addreses actually work, at this point |
---|
| 234 | # it has been address_string() is overriden down in |
---|
| 235 | # file and hence is a noop |
---|
| 236 | if remote_address.startswith("192.168.") \ |
---|
| 237 | or remote_address.startswith("10.") \ |
---|
| 238 | or remote_address.startswith("172.16."): |
---|
| 239 | pass |
---|
| 240 | else: |
---|
| 241 | address_string = None # self.address_string() |
---|
| 242 | if address_string: |
---|
| 243 | self.wsgi_environ['REMOTE_HOST'] = address_string |
---|
| 244 | |
---|
| 245 | if hasattr(self.server, 'thread_pool'): |
---|
| 246 | # Now that we know what the request was for, we should |
---|
| 247 | # tell the thread pool what its worker is working on |
---|
| 248 | self.server.thread_pool.worker_tracker[thread.get_ident()][1] = self.wsgi_environ |
---|
| 249 | self.wsgi_environ['paste.httpserver.thread_pool'] = self.server.thread_pool |
---|
| 250 | |
---|
| 251 | for k, v in self.headers.items(): |
---|
| 252 | key = 'HTTP_' + k.replace("-","_").upper() |
---|
| 253 | if key in ('HTTP_CONTENT_TYPE','HTTP_CONTENT_LENGTH'): |
---|
| 254 | continue |
---|
| 255 | self.wsgi_environ[key] = ','.join(self.headers.getheaders(k)) |
---|
| 256 | |
---|
| 257 | if hasattr(self.connection,'get_context'): |
---|
| 258 | self.wsgi_environ['wsgi.url_scheme'] = 'https' |
---|
| 259 | # @@: extract other SSL parameters from pyOpenSSL at... |
---|
| 260 | # http://www.modssl.org/docs/2.8/ssl_reference.html#ToC25 |
---|
| 261 | |
---|
| 262 | if environ: |
---|
| 263 | assert isinstance(environ, dict) |
---|
| 264 | self.wsgi_environ.update(environ) |
---|
| 265 | if 'on' == environ.get('HTTPS'): |
---|
| 266 | self.wsgi_environ['wsgi.url_scheme'] = 'https' |
---|
| 267 | |
---|
| 268 | self.wsgi_curr_headers = None |
---|
| 269 | self.wsgi_headers_sent = False |
---|
| 270 | |
---|
| 271 | def wsgi_connection_drop(self, exce, environ=None): |
---|
| 272 | """ |
---|
| 273 | Override this if you're interested in socket exceptions, such |
---|
| 274 | as when the user clicks 'Cancel' during a file download. |
---|
| 275 | """ |
---|
| 276 | pass |
---|
| 277 | |
---|
| 278 | def wsgi_execute(self, environ=None): |
---|
| 279 | """ |
---|
| 280 | Invoke the server's ``wsgi_application``. |
---|
| 281 | """ |
---|
| 282 | |
---|
| 283 | self.wsgi_setup(environ) |
---|
| 284 | |
---|
| 285 | try: |
---|
| 286 | result = self.server.wsgi_application(self.wsgi_environ, |
---|
| 287 | self.wsgi_start_response) |
---|
| 288 | try: |
---|
| 289 | for chunk in result: |
---|
| 290 | self.wsgi_write_chunk(chunk) |
---|
| 291 | if not self.wsgi_headers_sent: |
---|
| 292 | self.wsgi_write_chunk('') |
---|
| 293 | finally: |
---|
| 294 | if hasattr(result,'close'): |
---|
| 295 | result.close() |
---|
| 296 | result = None |
---|
| 297 | except socket.error, exce: |
---|
| 298 | self.wsgi_connection_drop(exce, environ) |
---|
| 299 | return |
---|
| 300 | except: |
---|
| 301 | if not self.wsgi_headers_sent: |
---|
| 302 | error_msg = "Internal Server Error\n" |
---|
| 303 | self.wsgi_curr_headers = ( |
---|
| 304 | '500 Internal Server Error', |
---|
| 305 | [('Content-type', 'text/plain'), |
---|
| 306 | ('Content-length', str(len(error_msg)))]) |
---|
| 307 | self.wsgi_write_chunk("Internal Server Error\n") |
---|
| 308 | raise |
---|
| 309 | |
---|
| 310 | # |
---|
| 311 | # SSL Functionality |
---|
| 312 | # |
---|
| 313 | # This implementation was motivated by Sebastien Martini's SSL example |
---|
| 314 | # http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/442473 |
---|
| 315 | # |
---|
| 316 | try: |
---|
| 317 | from OpenSSL import SSL, tsafe |
---|
| 318 | SocketErrors = (socket.error, SSL.ZeroReturnError, SSL.SysCallError) |
---|
| 319 | except ImportError: |
---|
| 320 | # Do not require pyOpenSSL to be installed, but disable SSL |
---|
| 321 | # functionality in that case. |
---|
| 322 | SSL = None |
---|
| 323 | SocketErrors = (socket.error,) |
---|
| 324 | class SecureHTTPServer(HTTPServer): |
---|
| 325 | def __init__(self, server_address, RequestHandlerClass, |
---|
| 326 | ssl_context=None): |
---|
| 327 | assert not ssl_context, "pyOpenSSL not installed" |
---|
| 328 | HTTPServer.__init__(self, server_address, RequestHandlerClass) |
---|
| 329 | else: |
---|
| 330 | |
---|
| 331 | class _ConnFixer(object): |
---|
| 332 | """ wraps a socket connection so it implements makefile """ |
---|
| 333 | def __init__(self, conn): |
---|
| 334 | self.__conn = conn |
---|
| 335 | def makefile(self, mode, bufsize): |
---|
| 336 | return socket._fileobject(self.__conn, mode, bufsize) |
---|
| 337 | def __getattr__(self, attrib): |
---|
| 338 | return getattr(self.__conn, attrib) |
---|
| 339 | |
---|
| 340 | class SecureHTTPServer(HTTPServer): |
---|
| 341 | """ |
---|
| 342 | Provides SSL server functionality on top of the BaseHTTPServer |
---|
| 343 | by overriding _private_ members of Python's standard |
---|
| 344 | distribution. The interface for this instance only changes by |
---|
| 345 | adding a an optional ssl_context attribute to the constructor: |
---|
| 346 | |
---|
| 347 | cntx = SSL.Context(SSL.SSLv23_METHOD) |
---|
| 348 | cntx.use_privatekey_file("host.pem") |
---|
| 349 | cntx.use_certificate_file("host.pem") |
---|
| 350 | |
---|
| 351 | """ |
---|
| 352 | |
---|
| 353 | def __init__(self, server_address, RequestHandlerClass, |
---|
| 354 | ssl_context=None): |
---|
| 355 | # This overrides the implementation of __init__ in python's |
---|
| 356 | # SocketServer.TCPServer (which BaseHTTPServer.HTTPServer |
---|
| 357 | # does not override, thankfully). |
---|
| 358 | HTTPServer.__init__(self, server_address, RequestHandlerClass) |
---|
| 359 | self.socket = socket.socket(self.address_family, |
---|
| 360 | self.socket_type) |
---|
| 361 | self.ssl_context = ssl_context |
---|
| 362 | if ssl_context: |
---|
| 363 | class TSafeConnection(tsafe.Connection): |
---|
| 364 | def settimeout(self, *args): |
---|
| 365 | self._lock.acquire() |
---|
| 366 | try: |
---|
| 367 | return self._ssl_conn.settimeout(*args) |
---|
| 368 | finally: |
---|
| 369 | self._lock.release() |
---|
| 370 | self.socket = TSafeConnection(ssl_context, self.socket) |
---|
| 371 | self.server_bind() |
---|
| 372 | self.server_activate() |
---|
| 373 | |
---|
| 374 | def get_request(self): |
---|
| 375 | # The default SSL request object does not seem to have a |
---|
| 376 | # ``makefile(mode, bufsize)`` method as expected by |
---|
| 377 | # Socketserver.StreamRequestHandler. |
---|
| 378 | (conn, info) = self.socket.accept() |
---|
| 379 | if self.ssl_context: |
---|
| 380 | conn = _ConnFixer(conn) |
---|
| 381 | return (conn, info) |
---|
| 382 | |
---|
| 383 | def _auto_ssl_context(): |
---|
| 384 | import OpenSSL, time, random |
---|
| 385 | pkey = OpenSSL.crypto.PKey() |
---|
| 386 | pkey.generate_key(OpenSSL.crypto.TYPE_RSA, 768) |
---|
| 387 | |
---|
| 388 | cert = OpenSSL.crypto.X509() |
---|
| 389 | |
---|
| 390 | cert.set_serial_number(random.randint(0, sys.maxint)) |
---|
| 391 | cert.gmtime_adj_notBefore(0) |
---|
| 392 | cert.gmtime_adj_notAfter(60 * 60 * 24 * 365) |
---|
| 393 | cert.get_subject().CN = '*' |
---|
| 394 | cert.get_subject().O = 'Dummy Certificate' |
---|
| 395 | cert.get_issuer().CN = 'Untrusted Authority' |
---|
| 396 | cert.get_issuer().O = 'Self-Signed' |
---|
| 397 | cert.set_pubkey(pkey) |
---|
| 398 | cert.sign(pkey, 'md5') |
---|
| 399 | |
---|
| 400 | ctx = SSL.Context(SSL.SSLv23_METHOD) |
---|
| 401 | ctx.use_privatekey(pkey) |
---|
| 402 | ctx.use_certificate(cert) |
---|
| 403 | |
---|
| 404 | return ctx |
---|
| 405 | |
---|
| 406 | class WSGIHandler(WSGIHandlerMixin, BaseHTTPRequestHandler): |
---|
| 407 | """ |
---|
| 408 | A WSGI handler that overrides POST, GET and HEAD to delegate |
---|
| 409 | requests to the server's ``wsgi_application``. |
---|
| 410 | """ |
---|
| 411 | server_version = 'PasteWSGIServer/' + __version__ |
---|
| 412 | |
---|
| 413 | def handle_one_request(self): |
---|
| 414 | """Handle a single HTTP request. |
---|
| 415 | |
---|
| 416 | You normally don't need to override this method; see the class |
---|
| 417 | __doc__ string for information on how to handle specific HTTP |
---|
| 418 | commands such as GET and POST. |
---|
| 419 | |
---|
| 420 | """ |
---|
| 421 | self.raw_requestline = self.rfile.readline() |
---|
| 422 | if not self.raw_requestline: |
---|
| 423 | self.close_connection = 1 |
---|
| 424 | return |
---|
| 425 | if not self.parse_request(): # An error code has been sent, just exit |
---|
| 426 | return |
---|
| 427 | self.wsgi_execute() |
---|
| 428 | |
---|
| 429 | def handle(self): |
---|
| 430 | # don't bother logging disconnects while handling a request |
---|
| 431 | try: |
---|
| 432 | BaseHTTPRequestHandler.handle(self) |
---|
| 433 | except SocketErrors, exce: |
---|
| 434 | self.wsgi_connection_drop(exce) |
---|
| 435 | |
---|
| 436 | def address_string(self): |
---|
| 437 | """Return the client address formatted for logging. |
---|
| 438 | |
---|
| 439 | This is overridden so that no hostname lookup is done. |
---|
| 440 | """ |
---|
| 441 | return '' |
---|
| 442 | |
---|
| 443 | class LimitedLengthFile(object): |
---|
| 444 | def __init__(self, file, length): |
---|
| 445 | self.file = file |
---|
| 446 | self.length = length |
---|
| 447 | self._consumed = 0 |
---|
| 448 | |
---|
| 449 | def __repr__(self): |
---|
| 450 | base_repr = repr(self.file) |
---|
| 451 | return base_repr[:-1] + ' length=%s>' % self.length |
---|
| 452 | |
---|
| 453 | def read(self, length=None): |
---|
| 454 | left = self.length - self._consumed |
---|
| 455 | if length is None: |
---|
| 456 | length = left |
---|
| 457 | else: |
---|
| 458 | length = min(length, left) |
---|
| 459 | # next two lines are hnecessary only if read(0) blocks |
---|
| 460 | if not left: |
---|
| 461 | return '' |
---|
| 462 | data = self.file.read(length) |
---|
| 463 | self._consumed += len(data) |
---|
| 464 | return data |
---|
| 465 | |
---|
| 466 | def readline(self, *args): |
---|
| 467 | data = self.file.readline(self.length - self._consumed) |
---|
| 468 | self._consumed += len(data) |
---|
| 469 | return data |
---|
| 470 | |
---|
| 471 | def readlines(self, hint=None): |
---|
| 472 | data = self.file.readlines(hint) |
---|
| 473 | for chunk in data: |
---|
| 474 | self._consumed += len(chunk) |
---|
| 475 | return data |
---|
| 476 | |
---|
| 477 | def __iter__(self): |
---|
| 478 | return self |
---|
| 479 | |
---|
| 480 | def next(self): |
---|
| 481 | if self.length - self._consumed <= 0: |
---|
| 482 | raise StopIteration |
---|
| 483 | return self.readline() |
---|
| 484 | |
---|
| 485 | ## Optional methods ## |
---|
| 486 | |
---|
| 487 | def seek(self, place): |
---|
| 488 | self.file.seek(place) |
---|
| 489 | self._consumed = place |
---|
| 490 | |
---|
| 491 | def tell(self): |
---|
| 492 | if hasattr(self.file, 'tell'): |
---|
| 493 | return self.file.tell() |
---|
| 494 | else: |
---|
| 495 | return self._consumed |
---|
| 496 | |
---|
| 497 | class ThreadPool(object): |
---|
| 498 | """ |
---|
| 499 | Generic thread pool with a queue of callables to consume. |
---|
| 500 | |
---|
| 501 | Keeps a notion of the status of its worker threads: |
---|
| 502 | |
---|
| 503 | idle: worker thread with nothing to do |
---|
| 504 | |
---|
| 505 | busy: worker thread doing its job |
---|
| 506 | |
---|
| 507 | hung: worker thread that's been doing a job for too long |
---|
| 508 | |
---|
| 509 | dying: a hung thread that has been killed, but hasn't died quite |
---|
| 510 | yet. |
---|
| 511 | |
---|
| 512 | zombie: what was a worker thread that we've tried to kill but |
---|
| 513 | isn't dead yet. |
---|
| 514 | |
---|
| 515 | At any time you can call track_threads, to get a dictionary with |
---|
| 516 | these keys and lists of thread_ids that fall in that status. All |
---|
| 517 | keys will be present, even if they point to emty lists. |
---|
| 518 | |
---|
| 519 | hung threads are threads that have been busy more than |
---|
| 520 | hung_thread_limit seconds. Hung threads are killed when they live |
---|
| 521 | longer than kill_thread_limit seconds. A thread is then |
---|
| 522 | considered dying for dying_limit seconds, if it is still alive |
---|
| 523 | after that it is considered a zombie. |
---|
| 524 | |
---|
| 525 | When there are no idle workers and a request comes in, another |
---|
| 526 | worker *may* be spawned. If there are less than spawn_if_under |
---|
| 527 | threads in the busy state, another thread will be spawned. So if |
---|
| 528 | the limit is 5, and there are 4 hung threads and 6 busy threads, |
---|
| 529 | no thread will be spawned. |
---|
| 530 | |
---|
| 531 | When there are more than max_zombie_threads_before_die zombie |
---|
| 532 | threads, a SystemExit exception will be raised, stopping the |
---|
| 533 | server. Use 0 or None to never raise this exception. Zombie |
---|
| 534 | threads *should* get cleaned up, but killing threads is no |
---|
| 535 | necessarily reliable. This is turned off by default, since it is |
---|
| 536 | only a good idea if you've deployed the server with some process |
---|
| 537 | watching from above (something similar to daemontools or zdaemon). |
---|
| 538 | |
---|
| 539 | Each worker thread only processes ``max_requests`` tasks before it |
---|
| 540 | dies and replaces itself with a new worker thread. |
---|
| 541 | """ |
---|
| 542 | |
---|
| 543 | |
---|
| 544 | SHUTDOWN = object() |
---|
| 545 | |
---|
| 546 | def __init__( |
---|
| 547 | self, nworkers, name="ThreadPool", daemon=False, |
---|
| 548 | max_requests=100, # threads are killed after this many requests |
---|
| 549 | hung_thread_limit=30, # when a thread is marked "hung" |
---|
| 550 | kill_thread_limit=1800, # when you kill that hung thread |
---|
| 551 | dying_limit=300, # seconds that a kill should take to go into effect (longer than this and the thread is a "zombie") |
---|
| 552 | spawn_if_under=5, # spawn if there's too many hung threads |
---|
| 553 | max_zombie_threads_before_die=0, # when to give up on the process |
---|
| 554 | hung_check_period=100, # every 100 requests check for hung workers |
---|
| 555 | logger=None, # Place to log messages to |
---|
| 556 | error_email=None, # Person(s) to notify if serious problem occurs |
---|
| 557 | ): |
---|
| 558 | """ |
---|
| 559 | Create thread pool with `nworkers` worker threads. |
---|
| 560 | """ |
---|
| 561 | self.nworkers = nworkers |
---|
| 562 | self.max_requests = max_requests |
---|
| 563 | self.name = name |
---|
| 564 | self.queue = Queue.Queue() |
---|
| 565 | self.workers = [] |
---|
| 566 | self.daemon = daemon |
---|
| 567 | if logger is None: |
---|
| 568 | logger = logging.getLogger('paste.httpserver.ThreadPool') |
---|
| 569 | if isinstance(logger, basestring): |
---|
| 570 | logger = logging.getLogger(logger) |
---|
| 571 | self.logger = logger |
---|
| 572 | self.error_email = error_email |
---|
| 573 | self._worker_count = count() |
---|
| 574 | |
---|
| 575 | assert (not kill_thread_limit |
---|
| 576 | or kill_thread_limit >= hung_thread_limit), ( |
---|
| 577 | "kill_thread_limit (%s) should be higher than hung_thread_limit (%s)" |
---|
| 578 | % (kill_thread_limit, hung_thread_limit)) |
---|
| 579 | if not killthread: |
---|
| 580 | kill_thread_limit = 0 |
---|
| 581 | self.logger.info( |
---|
| 582 | "Cannot use kill_thread_limit as ctypes/killthread is not available") |
---|
| 583 | self.kill_thread_limit = kill_thread_limit |
---|
| 584 | self.dying_limit = dying_limit |
---|
| 585 | self.hung_thread_limit = hung_thread_limit |
---|
| 586 | assert spawn_if_under <= nworkers, ( |
---|
| 587 | "spawn_if_under (%s) should be less than nworkers (%s)" |
---|
| 588 | % (spawn_if_under, nworkers)) |
---|
| 589 | self.spawn_if_under = spawn_if_under |
---|
| 590 | self.max_zombie_threads_before_die = max_zombie_threads_before_die |
---|
| 591 | self.hung_check_period = hung_check_period |
---|
| 592 | self.requests_since_last_hung_check = 0 |
---|
| 593 | # Used to keep track of what worker is doing what: |
---|
| 594 | self.worker_tracker = {} |
---|
| 595 | # Used to keep track of the workers not doing anything: |
---|
| 596 | self.idle_workers = [] |
---|
| 597 | # Used to keep track of threads that have been killed, but maybe aren't dead yet: |
---|
| 598 | self.dying_threads = {} |
---|
| 599 | # This is used to track when we last had to add idle workers; |
---|
| 600 | # we shouldn't cull extra workers until some time has passed |
---|
| 601 | # (hung_thread_limit) since workers were added: |
---|
| 602 | self._last_added_new_idle_workers = 0 |
---|
| 603 | if not daemon: |
---|
| 604 | atexit.register(self.shutdown) |
---|
| 605 | for i in range(self.nworkers): |
---|
| 606 | self.add_worker_thread(message='Initial worker pool') |
---|
| 607 | |
---|
| 608 | def add_task(self, task): |
---|
| 609 | """ |
---|
| 610 | Add a task to the queue |
---|
| 611 | """ |
---|
| 612 | self.logger.debug('Added task (%i tasks queued)', self.queue.qsize()) |
---|
| 613 | if self.hung_check_period: |
---|
| 614 | self.requests_since_last_hung_check += 1 |
---|
| 615 | if self.requests_since_last_hung_check > self.hung_check_period: |
---|
| 616 | self.requests_since_last_hung_check = 0 |
---|
| 617 | self.kill_hung_threads() |
---|
| 618 | if not self.idle_workers and self.spawn_if_under: |
---|
| 619 | # spawn_if_under can come into effect... |
---|
| 620 | busy = 0 |
---|
| 621 | now = time.time() |
---|
| 622 | self.logger.debug('No idle workers for task; checking if we need to make more workers') |
---|
| 623 | for worker in self.workers: |
---|
| 624 | if not hasattr(worker, 'thread_id'): |
---|
| 625 | # Not initialized |
---|
| 626 | continue |
---|
| 627 | time_started, info = self.worker_tracker.get(worker.thread_id, |
---|
| 628 | (None, None)) |
---|
| 629 | if time_started is not None: |
---|
| 630 | if now - time_started < self.hung_thread_limit: |
---|
| 631 | busy += 1 |
---|
| 632 | if busy < self.spawn_if_under: |
---|
| 633 | self.logger.info( |
---|
| 634 | 'No idle tasks, and only %s busy tasks; adding %s more ' |
---|
| 635 | 'workers', busy, self.spawn_if_under-busy) |
---|
| 636 | self._last_added_new_idle_workers = time.time() |
---|
| 637 | for i in range(self.spawn_if_under - busy): |
---|
| 638 | self.add_worker_thread(message='Response to lack of idle workers') |
---|
| 639 | else: |
---|
| 640 | self.logger.debug( |
---|
| 641 | 'No extra workers needed (%s busy workers)', |
---|
| 642 | busy) |
---|
| 643 | if (len(self.workers) > self.nworkers |
---|
| 644 | and len(self.idle_workers) > 3 |
---|
| 645 | and time.time()-self._last_added_new_idle_workers > self.hung_thread_limit): |
---|
| 646 | # We've spawned worers in the past, but they aren't needed |
---|
| 647 | # anymore; kill off some |
---|
| 648 | self.logger.info( |
---|
| 649 | 'Culling %s extra workers (%s idle workers present)', |
---|
| 650 | len(self.workers)-self.nworkers, len(self.idle_workers)) |
---|
| 651 | self.logger.debug( |
---|
| 652 | 'Idle workers: %s', self.idle_workers) |
---|
| 653 | for i in range(len(self.workers) - self.nworkers): |
---|
| 654 | self.queue.put(self.SHUTDOWN) |
---|
| 655 | self.queue.put(task) |
---|
| 656 | |
---|
| 657 | def track_threads(self): |
---|
| 658 | """ |
---|
| 659 | Return a dict summarizing the threads in the pool (as |
---|
| 660 | described in the ThreadPool docstring). |
---|
| 661 | """ |
---|
| 662 | result = dict(idle=[], busy=[], hung=[], dying=[], zombie=[]) |
---|
| 663 | now = time.time() |
---|
| 664 | for worker in self.workers: |
---|
| 665 | if not hasattr(worker, 'thread_id'): |
---|
| 666 | # The worker hasn't fully started up, we should just |
---|
| 667 | # ignore it |
---|
| 668 | continue |
---|
| 669 | time_started, info = self.worker_tracker.get(worker.thread_id, |
---|
| 670 | (None, None)) |
---|
| 671 | if time_started is not None: |
---|
| 672 | if now - time_started > self.hung_thread_limit: |
---|
| 673 | result['hung'].append(worker) |
---|
| 674 | else: |
---|
| 675 | result['busy'].append(worker) |
---|
| 676 | else: |
---|
| 677 | result['idle'].append(worker) |
---|
| 678 | for thread_id, (time_killed, worker) in self.dying_threads.items(): |
---|
| 679 | if not self.thread_exists(thread_id): |
---|
| 680 | # Cull dying threads that are actually dead and gone |
---|
| 681 | self.logger.info('Killed thread %s no longer around', |
---|
| 682 | thread_id) |
---|
| 683 | try: |
---|
| 684 | del self.dying_threads[thread_id] |
---|
| 685 | except KeyError: |
---|
| 686 | pass |
---|
| 687 | continue |
---|
| 688 | if now - time_killed > self.dying_limit: |
---|
| 689 | result['zombie'].append(worker) |
---|
| 690 | else: |
---|
| 691 | result['dying'].append(worker) |
---|
| 692 | return result |
---|
| 693 | |
---|
| 694 | def kill_worker(self, thread_id): |
---|
| 695 | """ |
---|
| 696 | Removes the worker with the given thread_id from the pool, and |
---|
| 697 | replaces it with a new worker thread. |
---|
| 698 | |
---|
| 699 | This should only be done for mis-behaving workers. |
---|
| 700 | """ |
---|
| 701 | if killthread is None: |
---|
| 702 | raise RuntimeError( |
---|
| 703 | "Cannot kill worker; killthread/ctypes not available") |
---|
| 704 | thread_obj = threading._active.get(thread_id) |
---|
| 705 | killthread.async_raise(thread_id, SystemExit) |
---|
| 706 | try: |
---|
| 707 | del self.worker_tracker[thread_id] |
---|
| 708 | except KeyError: |
---|
| 709 | pass |
---|
| 710 | self.logger.info('Killing thread %s', thread_id) |
---|
| 711 | if thread_obj in self.workers: |
---|
| 712 | self.workers.remove(thread_obj) |
---|
| 713 | self.dying_threads[thread_id] = (time.time(), thread_obj) |
---|
| 714 | self.add_worker_thread(message='Replacement for killed thread %s' % thread_id) |
---|
| 715 | |
---|
| 716 | def thread_exists(self, thread_id): |
---|
| 717 | """ |
---|
| 718 | Returns true if a thread with this id is still running |
---|
| 719 | """ |
---|
| 720 | return thread_id in threading._active |
---|
| 721 | |
---|
| 722 | def add_worker_thread(self, *args, **kwargs): |
---|
| 723 | index = self._worker_count.next() |
---|
| 724 | worker = threading.Thread(target=self.worker_thread_callback, |
---|
| 725 | args=args, kwargs=kwargs, |
---|
| 726 | name=("worker %d" % index)) |
---|
| 727 | worker.setDaemon(self.daemon) |
---|
| 728 | worker.start() |
---|
| 729 | |
---|
| 730 | def kill_hung_threads(self): |
---|
| 731 | """ |
---|
| 732 | Tries to kill any hung threads |
---|
| 733 | """ |
---|
| 734 | if not self.kill_thread_limit: |
---|
| 735 | # No killing should occur |
---|
| 736 | return |
---|
| 737 | now = time.time() |
---|
| 738 | max_time = 0 |
---|
| 739 | total_time = 0 |
---|
| 740 | idle_workers = 0 |
---|
| 741 | starting_workers = 0 |
---|
| 742 | working_workers = 0 |
---|
| 743 | killed_workers = 0 |
---|
| 744 | for worker in self.workers: |
---|
| 745 | if not hasattr(worker, 'thread_id'): |
---|
| 746 | # Not setup yet |
---|
| 747 | starting_workers += 1 |
---|
| 748 | continue |
---|
| 749 | time_started, info = self.worker_tracker.get(worker.thread_id, |
---|
| 750 | (None, None)) |
---|
| 751 | if time_started is None: |
---|
| 752 | # Must be idle |
---|
| 753 | idle_workers += 1 |
---|
| 754 | continue |
---|
| 755 | working_workers += 1 |
---|
| 756 | max_time = max(max_time, now-time_started) |
---|
| 757 | total_time += now-time_started |
---|
| 758 | if now - time_started > self.kill_thread_limit: |
---|
| 759 | self.logger.warning( |
---|
| 760 | 'Thread %s hung (working on task for %i seconds)', |
---|
| 761 | worker.thread_id, now - time_started) |
---|
| 762 | try: |
---|
| 763 | import pprint |
---|
| 764 | info_desc = pprint.pformat(info) |
---|
| 765 | except: |
---|
| 766 | out = StringIO() |
---|
| 767 | traceback.print_exc(file=out) |
---|
| 768 | info_desc = 'Error:\n%s' % out.getvalue() |
---|
| 769 | self.notify_problem( |
---|
| 770 | "Killing worker thread (id=%(thread_id)s) because it has been \n" |
---|
| 771 | "working on task for %(time)s seconds (limit is %(limit)s)\n" |
---|
| 772 | "Info on task:\n" |
---|
| 773 | "%(info)s" |
---|
| 774 | % dict(thread_id=worker.thread_id, |
---|
| 775 | time=now - time_started, |
---|
| 776 | limit=self.kill_thread_limit, |
---|
| 777 | info=info_desc)) |
---|
| 778 | self.kill_worker(worker.thread_id) |
---|
| 779 | killed_workers += 1 |
---|
| 780 | if working_workers: |
---|
| 781 | ave_time = float(total_time) / working_workers |
---|
| 782 | ave_time = '%.2fsec' % ave_time |
---|
| 783 | else: |
---|
| 784 | ave_time = 'N/A' |
---|
| 785 | self.logger.info( |
---|
| 786 | "kill_hung_threads status: %s threads (%s working, %s idle, %s starting) " |
---|
| 787 | "ave time %s, max time %.2fsec, killed %s workers" |
---|
| 788 | % (idle_workers + starting_workers + working_workers, |
---|
| 789 | working_workers, idle_workers, starting_workers, |
---|
| 790 | ave_time, max_time, killed_workers)) |
---|
| 791 | self.check_max_zombies() |
---|
| 792 | |
---|
| 793 | def check_max_zombies(self): |
---|
| 794 | """ |
---|
| 795 | Check if we've reached max_zombie_threads_before_die; if so |
---|
| 796 | then kill the entire process. |
---|
| 797 | """ |
---|
| 798 | if not self.max_zombie_threads_before_die: |
---|
| 799 | return |
---|
| 800 | found = [] |
---|
| 801 | now = time.time() |
---|
| 802 | for thread_id, (time_killed, worker) in self.dying_threads.items(): |
---|
| 803 | if not self.thread_exists(thread_id): |
---|
| 804 | # Cull dying threads that are actually dead and gone |
---|
| 805 | try: |
---|
| 806 | del self.dying_threads[thread_id] |
---|
| 807 | except KeyError: |
---|
| 808 | pass |
---|
| 809 | continue |
---|
| 810 | if now - time_killed > self.dying_limit: |
---|
| 811 | found.append(thread_id) |
---|
| 812 | if found: |
---|
| 813 | self.logger.info('Found %s zombie threads', found) |
---|
| 814 | if len(found) > self.max_zombie_threads_before_die: |
---|
| 815 | self.logger.fatal( |
---|
| 816 | 'Exiting process because %s zombie threads is more than %s limit', |
---|
| 817 | len(found), self.max_zombie_threads_before_die) |
---|
| 818 | self.notify_problem( |
---|
| 819 | "Exiting process because %(found)s zombie threads " |
---|
| 820 | "(more than limit of %(limit)s)\n" |
---|
| 821 | "Bad threads (ids):\n" |
---|
| 822 | " %(ids)s\n" |
---|
| 823 | % dict(found=len(found), |
---|
| 824 | limit=self.max_zombie_threads_before_die, |
---|
| 825 | ids="\n ".join(map(str, found))), |
---|
| 826 | subject="Process restart (too many zombie threads)") |
---|
| 827 | self.shutdown(10) |
---|
| 828 | print 'Shutting down', threading.currentThread() |
---|
| 829 | raise ServerExit(3) |
---|
| 830 | |
---|
| 831 | def worker_thread_callback(self, message=None): |
---|
| 832 | """ |
---|
| 833 | Worker thread should call this method to get and process queued |
---|
| 834 | callables. |
---|
| 835 | """ |
---|
| 836 | thread_obj = threading.currentThread() |
---|
| 837 | thread_id = thread_obj.thread_id = thread.get_ident() |
---|
| 838 | self.workers.append(thread_obj) |
---|
| 839 | self.idle_workers.append(thread_id) |
---|
| 840 | requests_processed = 0 |
---|
| 841 | add_replacement_worker = False |
---|
| 842 | self.logger.debug('Started new worker %s: %s', thread_id, message) |
---|
| 843 | try: |
---|
| 844 | while True: |
---|
| 845 | if self.max_requests and self.max_requests < requests_processed: |
---|
| 846 | # Replace this thread then die |
---|
| 847 | self.logger.debug('Thread %s processed %i requests (limit %s); stopping thread' |
---|
| 848 | % (thread_id, requests_processed, self.max_requests)) |
---|
| 849 | add_replacement_worker = True |
---|
| 850 | break |
---|
| 851 | runnable = self.queue.get() |
---|
| 852 | if runnable is ThreadPool.SHUTDOWN: |
---|
| 853 | self.logger.debug('Worker %s asked to SHUTDOWN', thread_id) |
---|
| 854 | break |
---|
| 855 | try: |
---|
| 856 | self.idle_workers.remove(thread_id) |
---|
| 857 | except ValueError: |
---|
| 858 | pass |
---|
| 859 | self.worker_tracker[thread_id] = [time.time(), None] |
---|
| 860 | requests_processed += 1 |
---|
| 861 | try: |
---|
| 862 | try: |
---|
| 863 | runnable() |
---|
| 864 | except: |
---|
| 865 | # We are later going to call sys.exc_clear(), |
---|
| 866 | # removing all remnants of any exception, so |
---|
| 867 | # we should log it now. But ideally no |
---|
| 868 | # exception should reach this level |
---|
| 869 | print >> sys.stderr, ( |
---|
| 870 | 'Unexpected exception in worker %r' % runnable) |
---|
| 871 | traceback.print_exc() |
---|
| 872 | if thread_id in self.dying_threads: |
---|
| 873 | # That last exception was intended to kill me |
---|
| 874 | break |
---|
| 875 | finally: |
---|
| 876 | try: |
---|
| 877 | del self.worker_tracker[thread_id] |
---|
| 878 | except KeyError: |
---|
| 879 | pass |
---|
| 880 | sys.exc_clear() |
---|
| 881 | self.idle_workers.append(thread_id) |
---|
| 882 | finally: |
---|
| 883 | try: |
---|
| 884 | del self.worker_tracker[thread_id] |
---|
| 885 | except KeyError: |
---|
| 886 | pass |
---|
| 887 | try: |
---|
| 888 | self.idle_workers.remove(thread_id) |
---|
| 889 | except ValueError: |
---|
| 890 | pass |
---|
| 891 | try: |
---|
| 892 | self.workers.remove(thread_obj) |
---|
| 893 | except ValueError: |
---|
| 894 | pass |
---|
| 895 | try: |
---|
| 896 | del self.dying_threads[thread_id] |
---|
| 897 | except KeyError: |
---|
| 898 | pass |
---|
| 899 | if add_replacement_worker: |
---|
| 900 | self.add_worker_thread(message='Voluntary replacement for thread %s' % thread_id) |
---|
| 901 | |
---|
| 902 | def shutdown(self, force_quit_timeout=0): |
---|
| 903 | """ |
---|
| 904 | Shutdown the queue (after finishing any pending requests). |
---|
| 905 | """ |
---|
| 906 | self.logger.info('Shutting down threadpool') |
---|
| 907 | # Add a shutdown request for every worker |
---|
| 908 | for i in range(len(self.workers)): |
---|
| 909 | self.queue.put(ThreadPool.SHUTDOWN) |
---|
| 910 | # Wait for each thread to terminate |
---|
| 911 | hung_workers = [] |
---|
| 912 | for worker in self.workers: |
---|
| 913 | worker.join(0.5) |
---|
| 914 | if worker.isAlive(): |
---|
| 915 | hung_workers.append(worker) |
---|
| 916 | zombies = [] |
---|
| 917 | for thread_id in self.dying_threads: |
---|
| 918 | if self.thread_exists(thread_id): |
---|
| 919 | zombies.append(thread_id) |
---|
| 920 | if hung_workers or zombies: |
---|
| 921 | self.logger.info("%s workers didn't stop properly, and %s zombies", |
---|
| 922 | len(hung_workers), len(zombies)) |
---|
| 923 | if hung_workers: |
---|
| 924 | for worker in hung_workers: |
---|
| 925 | self.kill_worker(worker.thread_id) |
---|
| 926 | self.logger.info('Workers killed forcefully') |
---|
| 927 | if force_quit_timeout: |
---|
| 928 | hung = [] |
---|
| 929 | timed_out = False |
---|
| 930 | need_force_quit = bool(zombies) |
---|
| 931 | for workers in self.workers: |
---|
| 932 | if not timed_out and worker.isAlive(): |
---|
| 933 | timed_out = True |
---|
| 934 | worker.join(force_quit_timeout) |
---|
| 935 | if worker.isAlive(): |
---|
| 936 | print "Worker %s won't die" % worker |
---|
| 937 | need_force_quit = True |
---|
| 938 | if need_force_quit: |
---|
| 939 | import atexit |
---|
| 940 | # Remove the threading atexit callback |
---|
| 941 | for callback in list(atexit._exithandlers): |
---|
| 942 | func = getattr(callback[0], 'im_func', None) |
---|
| 943 | if not func: |
---|
| 944 | continue |
---|
| 945 | globs = getattr(func, 'func_globals', {}) |
---|
| 946 | mod = globs.get('__name__') |
---|
| 947 | if mod == 'threading': |
---|
| 948 | atexit._exithandlers.remove(callback) |
---|
| 949 | atexit._run_exitfuncs() |
---|
| 950 | print 'Forcefully exiting process' |
---|
| 951 | os._exit(3) |
---|
| 952 | else: |
---|
| 953 | self.logger.info('All workers eventually killed') |
---|
| 954 | else: |
---|
| 955 | self.logger.info('All workers stopped') |
---|
| 956 | |
---|
| 957 | def notify_problem(self, msg, subject=None, spawn_thread=True): |
---|
| 958 | """ |
---|
| 959 | Called when there's a substantial problem. msg contains the |
---|
| 960 | body of the notification, subject the summary. |
---|
| 961 | |
---|
| 962 | If spawn_thread is true, then the email will be send in |
---|
| 963 | another thread (so this doesn't block). |
---|
| 964 | """ |
---|
| 965 | if not self.error_email: |
---|
| 966 | return |
---|
| 967 | if spawn_thread: |
---|
| 968 | t = threading.Thread( |
---|
| 969 | target=self.notify_problem, |
---|
| 970 | args=(msg, subject, False)) |
---|
| 971 | t.start() |
---|
| 972 | return |
---|
| 973 | from_address = 'errors@localhost' |
---|
| 974 | if not subject: |
---|
| 975 | subject = msg.strip().splitlines()[0] |
---|
| 976 | subject = subject[:50] |
---|
| 977 | subject = '[http threadpool] %s' % subject |
---|
| 978 | headers = [ |
---|
| 979 | "To: %s" % self.error_email, |
---|
| 980 | "From: %s" % from_address, |
---|
| 981 | "Subject: %s" % subject, |
---|
| 982 | ] |
---|
| 983 | try: |
---|
| 984 | system = ' '.join(os.uname()) |
---|
| 985 | except: |
---|
| 986 | system = '(unknown)' |
---|
| 987 | body = ( |
---|
| 988 | "An error has occurred in the paste.httpserver.ThreadPool\n" |
---|
| 989 | "Error:\n" |
---|
| 990 | " %(msg)s\n" |
---|
| 991 | "Occurred at: %(time)s\n" |
---|
| 992 | "PID: %(pid)s\n" |
---|
| 993 | "System: %(system)s\n" |
---|
| 994 | "Server .py file: %(file)s\n" |
---|
| 995 | % dict(msg=msg, |
---|
| 996 | time=time.strftime("%c"), |
---|
| 997 | pid=os.getpid(), |
---|
| 998 | system=system, |
---|
| 999 | file=os.path.abspath(__file__), |
---|
| 1000 | )) |
---|
| 1001 | message = '\n'.join(headers) + "\n\n" + body |
---|
| 1002 | import smtplib |
---|
| 1003 | server = smtplib.SMTP('localhost') |
---|
| 1004 | error_emails = [ |
---|
| 1005 | e.strip() for e in self.error_email.split(",") |
---|
| 1006 | if e.strip()] |
---|
| 1007 | server.sendmail(from_address, error_emails, message) |
---|
| 1008 | server.quit() |
---|
| 1009 | print 'email sent to', error_emails, message |
---|
| 1010 | |
---|
| 1011 | class ThreadPoolMixIn(object): |
---|
| 1012 | """ |
---|
| 1013 | Mix-in class to process requests from a thread pool |
---|
| 1014 | """ |
---|
| 1015 | def __init__(self, nworkers, daemon=False, **threadpool_options): |
---|
| 1016 | # Create and start the workers |
---|
| 1017 | self.running = True |
---|
| 1018 | assert nworkers > 0, "ThreadPoolMixIn servers must have at least one worker" |
---|
| 1019 | self.thread_pool = ThreadPool( |
---|
| 1020 | nworkers, |
---|
| 1021 | "ThreadPoolMixIn HTTP server on %s:%d" |
---|
| 1022 | % (self.server_name, self.server_port), |
---|
| 1023 | daemon, |
---|
| 1024 | **threadpool_options) |
---|
| 1025 | |
---|
| 1026 | def process_request(self, request, client_address): |
---|
| 1027 | """ |
---|
| 1028 | Queue the request to be processed by on of the thread pool threads |
---|
| 1029 | """ |
---|
| 1030 | # This sets the socket to blocking mode (and no timeout) since it |
---|
| 1031 | # may take the thread pool a little while to get back to it. (This |
---|
| 1032 | # is the default but since we set a timeout on the parent socket so |
---|
| 1033 | # that we can trap interrupts we need to restore this,.) |
---|
| 1034 | request.setblocking(1) |
---|
| 1035 | # Queue processing of the request |
---|
| 1036 | self.thread_pool.add_task( |
---|
| 1037 | lambda: self.process_request_in_thread(request, client_address)) |
---|
| 1038 | |
---|
| 1039 | def handle_error(self, request, client_address): |
---|
| 1040 | exc_class, exc, tb = sys.exc_info() |
---|
| 1041 | if exc_class is ServerExit: |
---|
| 1042 | # This is actually a request to stop the server |
---|
| 1043 | raise |
---|
| 1044 | return super(ThreadPoolMixIn, self).handle_error(request, client_address) |
---|
| 1045 | |
---|
| 1046 | def process_request_in_thread(self, request, client_address): |
---|
| 1047 | """ |
---|
| 1048 | The worker thread should call back here to do the rest of the |
---|
| 1049 | request processing. Error handling normaller done in 'handle_request' |
---|
| 1050 | must be done here. |
---|
| 1051 | """ |
---|
| 1052 | try: |
---|
| 1053 | self.finish_request(request, client_address) |
---|
| 1054 | self.close_request(request) |
---|
| 1055 | except: |
---|
| 1056 | self.handle_error(request, client_address) |
---|
| 1057 | self.close_request(request) |
---|
| 1058 | |
---|
| 1059 | def serve_forever(self): |
---|
| 1060 | """ |
---|
| 1061 | Overrides `serve_forever` to shut the threadpool down cleanly. |
---|
| 1062 | """ |
---|
| 1063 | try: |
---|
| 1064 | while self.running: |
---|
| 1065 | try: |
---|
| 1066 | self.handle_request() |
---|
| 1067 | except socket.timeout: |
---|
| 1068 | # Timeout is expected, gives interrupts a chance to |
---|
| 1069 | # propogate, just keep handling |
---|
| 1070 | pass |
---|
| 1071 | finally: |
---|
| 1072 | self.thread_pool.shutdown() |
---|
| 1073 | |
---|
| 1074 | def server_activate(self): |
---|
| 1075 | """ |
---|
| 1076 | Overrides server_activate to set timeout on our listener socket. |
---|
| 1077 | """ |
---|
| 1078 | # We set the timeout here so that we can trap interrupts on windows |
---|
| 1079 | self.socket.settimeout(1) |
---|
| 1080 | self.socket.listen(self.request_queue_size) |
---|
| 1081 | |
---|
| 1082 | def server_close(self): |
---|
| 1083 | """ |
---|
| 1084 | Finish pending requests and shutdown the server. |
---|
| 1085 | """ |
---|
| 1086 | self.running = False |
---|
| 1087 | self.socket.close() |
---|
| 1088 | self.thread_pool.shutdown(60) |
---|
| 1089 | |
---|
| 1090 | class WSGIServerBase(SecureHTTPServer): |
---|
| 1091 | def __init__(self, wsgi_application, server_address, |
---|
| 1092 | RequestHandlerClass=None, ssl_context=None): |
---|
| 1093 | SecureHTTPServer.__init__(self, server_address, |
---|
| 1094 | RequestHandlerClass, ssl_context) |
---|
| 1095 | self.wsgi_application = wsgi_application |
---|
| 1096 | self.wsgi_socket_timeout = None |
---|
| 1097 | |
---|
| 1098 | def get_request(self): |
---|
| 1099 | # If there is a socket_timeout, set it on the accepted |
---|
| 1100 | (conn,info) = SecureHTTPServer.get_request(self) |
---|
| 1101 | if self.wsgi_socket_timeout: |
---|
| 1102 | conn.settimeout(self.wsgi_socket_timeout) |
---|
| 1103 | return (conn, info) |
---|
| 1104 | |
---|
| 1105 | class WSGIServer(ThreadingMixIn, WSGIServerBase): |
---|
| 1106 | daemon_threads = False |
---|
| 1107 | |
---|
| 1108 | class WSGIThreadPoolServer(ThreadPoolMixIn, WSGIServerBase): |
---|
| 1109 | def __init__(self, wsgi_application, server_address, |
---|
| 1110 | RequestHandlerClass=None, ssl_context=None, |
---|
| 1111 | nworkers=10, daemon_threads=False, |
---|
| 1112 | threadpool_options=None): |
---|
| 1113 | WSGIServerBase.__init__(self, wsgi_application, server_address, |
---|
| 1114 | RequestHandlerClass, ssl_context) |
---|
| 1115 | if threadpool_options is None: |
---|
| 1116 | threadpool_options = {} |
---|
| 1117 | ThreadPoolMixIn.__init__(self, nworkers, daemon_threads, |
---|
| 1118 | **threadpool_options) |
---|
| 1119 | |
---|
| 1120 | class ServerExit(SystemExit): |
---|
| 1121 | """ |
---|
| 1122 | Raised to tell the server to really exit (SystemExit is normally |
---|
| 1123 | caught) |
---|
| 1124 | """ |
---|
| 1125 | |
---|
| 1126 | def serve(application, host=None, port=None, handler=None, ssl_pem=None, |
---|
| 1127 | ssl_context=None, server_version=None, protocol_version=None, |
---|
| 1128 | start_loop=True, daemon_threads=None, socket_timeout=None, |
---|
| 1129 | use_threadpool=None, threadpool_workers=10, |
---|
| 1130 | threadpool_options=None): |
---|
| 1131 | """ |
---|
| 1132 | Serves your ``application`` over HTTP(S) via WSGI interface |
---|
| 1133 | |
---|
| 1134 | ``host`` |
---|
| 1135 | |
---|
| 1136 | This is the ipaddress to bind to (or a hostname if your |
---|
| 1137 | nameserver is properly configured). This defaults to |
---|
| 1138 | 127.0.0.1, which is not a public interface. |
---|
| 1139 | |
---|
| 1140 | ``port`` |
---|
| 1141 | |
---|
| 1142 | The port to run on, defaults to 8080 for HTTP, or 4443 for |
---|
| 1143 | HTTPS. This can be a string or an integer value. |
---|
| 1144 | |
---|
| 1145 | ``handler`` |
---|
| 1146 | |
---|
| 1147 | This is the HTTP request handler to use, it defaults to |
---|
| 1148 | ``WSGIHandler`` in this module. |
---|
| 1149 | |
---|
| 1150 | ``ssl_pem`` |
---|
| 1151 | |
---|
| 1152 | This an optional SSL certificate file (via OpenSSL). You can |
---|
| 1153 | supply ``*`` and a development-only certificate will be |
---|
| 1154 | created for you, or you can generate a self-signed test PEM |
---|
| 1155 | certificate file as follows:: |
---|
| 1156 | |
---|
| 1157 | $ openssl genrsa 1024 > host.key |
---|
| 1158 | $ chmod 400 host.key |
---|
| 1159 | $ openssl req -new -x509 -nodes -sha1 -days 365 \\ |
---|
| 1160 | -key host.key > host.cert |
---|
| 1161 | $ cat host.cert host.key > host.pem |
---|
| 1162 | $ chmod 400 host.pem |
---|
| 1163 | |
---|
| 1164 | ``ssl_context`` |
---|
| 1165 | |
---|
| 1166 | This an optional SSL context object for the server. A SSL |
---|
| 1167 | context will be automatically constructed for you if you supply |
---|
| 1168 | ``ssl_pem``. Supply this to use a context of your own |
---|
| 1169 | construction. |
---|
| 1170 | |
---|
| 1171 | ``server_version`` |
---|
| 1172 | |
---|
| 1173 | The version of the server as reported in HTTP response line. This |
---|
| 1174 | defaults to something like "PasteWSGIServer/0.5". Many servers |
---|
| 1175 | hide their code-base identity with a name like 'Amnesiac/1.0' |
---|
| 1176 | |
---|
| 1177 | ``protocol_version`` |
---|
| 1178 | |
---|
| 1179 | This sets the protocol used by the server, by default |
---|
| 1180 | ``HTTP/1.0``. There is some support for ``HTTP/1.1``, which |
---|
| 1181 | defaults to nicer keep-alive connections. This server supports |
---|
| 1182 | ``100 Continue``, but does not yet support HTTP/1.1 Chunked |
---|
| 1183 | Encoding. Hence, if you use HTTP/1.1, you're somewhat in error |
---|
| 1184 | since chunked coding is a mandatory requirement of a HTTP/1.1 |
---|
| 1185 | server. If you specify HTTP/1.1, every response *must* have a |
---|
| 1186 | ``Content-Length`` and you must be careful not to read past the |
---|
| 1187 | end of the socket. |
---|
| 1188 | |
---|
| 1189 | ``start_loop`` |
---|
| 1190 | |
---|
| 1191 | This specifies if the server loop (aka ``server.serve_forever()``) |
---|
| 1192 | should be called; it defaults to ``True``. |
---|
| 1193 | |
---|
| 1194 | ``daemon_threads`` |
---|
| 1195 | |
---|
| 1196 | This flag specifies if when your webserver terminates all |
---|
| 1197 | in-progress client connections should be droppped. It defaults |
---|
| 1198 | to ``False``. You might want to set this to ``True`` if you |
---|
| 1199 | are using ``HTTP/1.1`` and don't set a ``socket_timeout``. |
---|
| 1200 | |
---|
| 1201 | ``socket_timeout`` |
---|
| 1202 | |
---|
| 1203 | This specifies the maximum amount of time that a connection to a |
---|
| 1204 | given client will be kept open. At this time, it is a rude |
---|
| 1205 | disconnect, but at a later time it might follow the RFC a bit |
---|
| 1206 | more closely. |
---|
| 1207 | |
---|
| 1208 | ``use_threadpool`` |
---|
| 1209 | |
---|
| 1210 | Server requests from a pool of worker threads (``threadpool_workers``) |
---|
| 1211 | rather than creating a new thread for each request. This can |
---|
| 1212 | substantially reduce latency since there is a high cost associated |
---|
| 1213 | with thread creation. |
---|
| 1214 | |
---|
| 1215 | ``threadpool_workers`` |
---|
| 1216 | |
---|
| 1217 | Number of worker threads to create when ``use_threadpool`` is true. This |
---|
| 1218 | can be a string or an integer value. |
---|
| 1219 | |
---|
| 1220 | ``threadpool_options`` |
---|
| 1221 | |
---|
| 1222 | A dictionary of options to be used when instantiating the |
---|
| 1223 | threadpool. See paste.httpserver.ThreadPool for specific |
---|
| 1224 | options (``threadpool_workers`` is a specific option that can |
---|
| 1225 | also go here). |
---|
| 1226 | """ |
---|
| 1227 | is_ssl = False |
---|
| 1228 | if ssl_pem or ssl_context: |
---|
| 1229 | assert SSL, "pyOpenSSL is not installed" |
---|
| 1230 | is_ssl = True |
---|
| 1231 | port = int(port or 4443) |
---|
| 1232 | if not ssl_context: |
---|
| 1233 | if ssl_pem == '*': |
---|
| 1234 | ssl_context = _auto_ssl_context() |
---|
| 1235 | else: |
---|
| 1236 | ssl_context = SSL.Context(SSL.SSLv23_METHOD) |
---|
| 1237 | ssl_context.use_privatekey_file(ssl_pem) |
---|
| 1238 | ssl_context.use_certificate_chain_file(ssl_pem) |
---|
| 1239 | |
---|
| 1240 | host = host or '127.0.0.1' |
---|
| 1241 | if not port: |
---|
| 1242 | if ':' in host: |
---|
| 1243 | host, port = host.split(':', 1) |
---|
| 1244 | else: |
---|
| 1245 | port = 8080 |
---|
| 1246 | server_address = (host, int(port)) |
---|
| 1247 | |
---|
| 1248 | if not handler: |
---|
| 1249 | handler = WSGIHandler |
---|
| 1250 | if server_version: |
---|
| 1251 | handler.server_version = server_version |
---|
| 1252 | handler.sys_version = None |
---|
| 1253 | if protocol_version: |
---|
| 1254 | assert protocol_version in ('HTTP/0.9', 'HTTP/1.0', 'HTTP/1.1') |
---|
| 1255 | handler.protocol_version = protocol_version |
---|
| 1256 | |
---|
| 1257 | if use_threadpool is None: |
---|
| 1258 | use_threadpool = True |
---|
| 1259 | |
---|
| 1260 | if converters.asbool(use_threadpool): |
---|
| 1261 | server = WSGIThreadPoolServer(application, server_address, handler, |
---|
| 1262 | ssl_context, int(threadpool_workers), |
---|
| 1263 | daemon_threads, |
---|
| 1264 | threadpool_options=threadpool_options) |
---|
| 1265 | else: |
---|
| 1266 | server = WSGIServer(application, server_address, handler, ssl_context) |
---|
| 1267 | if daemon_threads: |
---|
| 1268 | server.daemon_threads = daemon_threads |
---|
| 1269 | |
---|
| 1270 | if socket_timeout: |
---|
| 1271 | server.wsgi_socket_timeout = int(socket_timeout) |
---|
| 1272 | |
---|
| 1273 | if converters.asbool(start_loop): |
---|
| 1274 | protocol = is_ssl and 'https' or 'http' |
---|
| 1275 | host, port = server.server_address |
---|
| 1276 | if host == '0.0.0.0': |
---|
| 1277 | print 'serving on 0.0.0.0:%s view at %s://127.0.0.1:%s' % \ |
---|
| 1278 | (port, protocol, port) |
---|
| 1279 | else: |
---|
| 1280 | print "serving on %s://%s:%s" % (protocol, host, port) |
---|
| 1281 | try: |
---|
| 1282 | server.serve_forever() |
---|
| 1283 | except KeyboardInterrupt: |
---|
| 1284 | # allow CTRL+C to shutdown |
---|
| 1285 | pass |
---|
| 1286 | return server |
---|
| 1287 | |
---|
| 1288 | # For paste.deploy server instantiation (egg:Paste#http) |
---|
| 1289 | # Note: this gets a separate function because it has to expect string |
---|
| 1290 | # arguments (though that's not much of an issue yet, ever?) |
---|
| 1291 | def server_runner(wsgi_app, global_conf, **kwargs): |
---|
| 1292 | from paste.deploy.converters import asbool |
---|
| 1293 | for name in ['port', 'socket_timeout', 'threadpool_workers', |
---|
| 1294 | 'threadpool_hung_thread_limit', |
---|
| 1295 | 'threadpool_kill_thread_limit', |
---|
| 1296 | 'threadpool_dying_limit', 'threadpool_spawn_if_under', |
---|
| 1297 | 'threadpool_max_zombie_threads_before_die', |
---|
| 1298 | 'threadpool_hung_check_period', |
---|
| 1299 | 'threadpool_max_requests']: |
---|
| 1300 | if name in kwargs: |
---|
| 1301 | kwargs[name] = int(kwargs[name]) |
---|
| 1302 | for name in ['use_threadpool', 'daemon_threads']: |
---|
| 1303 | if name in kwargs: |
---|
| 1304 | kwargs[name] = asbool(kwargs[name]) |
---|
| 1305 | threadpool_options = {} |
---|
| 1306 | for name, value in kwargs.items(): |
---|
| 1307 | if name.startswith('threadpool_') and name != 'threadpool_workers': |
---|
| 1308 | threadpool_options[name[len('threadpool_'):]] = value |
---|
| 1309 | del kwargs[name] |
---|
| 1310 | if ('error_email' not in threadpool_options |
---|
| 1311 | and 'error_email' in global_conf): |
---|
| 1312 | threadpool_options['error_email'] = global_conf['error_email'] |
---|
| 1313 | kwargs['threadpool_options'] = threadpool_options |
---|
| 1314 | serve(wsgi_app, **kwargs) |
---|
| 1315 | |
---|
| 1316 | server_runner.__doc__ = serve.__doc__ + """ |
---|
| 1317 | |
---|
| 1318 | You can also set these threadpool options: |
---|
| 1319 | |
---|
| 1320 | ``threadpool_max_requests``: |
---|
| 1321 | |
---|
| 1322 | The maximum number of requests a worker thread will process |
---|
| 1323 | before dying (and replacing itself with a new worker thread). |
---|
| 1324 | Default 100. |
---|
| 1325 | |
---|
| 1326 | ``threadpool_hung_thread_limit``: |
---|
| 1327 | |
---|
| 1328 | The number of seconds a thread can work on a task before it is |
---|
| 1329 | considered hung (stuck). Default 30 seconds. |
---|
| 1330 | |
---|
| 1331 | ``threadpool_kill_thread_limit``: |
---|
| 1332 | |
---|
| 1333 | The number of seconds a thread can work before you should kill it |
---|
| 1334 | (assuming it will never finish). Default 600 seconds (10 minutes). |
---|
| 1335 | |
---|
| 1336 | ``threadpool_dying_limit``: |
---|
| 1337 | |
---|
| 1338 | The length of time after killing a thread that it should actually |
---|
| 1339 | disappear. If it lives longer than this, it is considered a |
---|
| 1340 | "zombie". Note that even in easy situations killing a thread can |
---|
| 1341 | be very slow. Default 300 seconds (5 minutes). |
---|
| 1342 | |
---|
| 1343 | ``threadpool_spawn_if_under``: |
---|
| 1344 | |
---|
| 1345 | If there are no idle threads and a request comes in, and there are |
---|
| 1346 | less than this number of *busy* threads, then add workers to the |
---|
| 1347 | pool. Busy threads are threads that have taken less than |
---|
| 1348 | ``threadpool_hung_thread_limit`` seconds so far. So if you get |
---|
| 1349 | *lots* of requests but they complete in a reasonable amount of time, |
---|
| 1350 | the requests will simply queue up (adding more threads probably |
---|
| 1351 | wouldn't speed them up). But if you have lots of hung threads and |
---|
| 1352 | one more request comes in, this will add workers to handle it. |
---|
| 1353 | Default 5. |
---|
| 1354 | |
---|
| 1355 | ``threadpool_max_zombie_threads_before_die``: |
---|
| 1356 | |
---|
| 1357 | If there are more zombies than this, just kill the process. This is |
---|
| 1358 | only good if you have a monitor that will automatically restart |
---|
| 1359 | the server. This can clean up the mess. Default 0 (disabled). |
---|
| 1360 | |
---|
| 1361 | `threadpool_hung_check_period``: |
---|
| 1362 | |
---|
| 1363 | Every X requests, check for hung threads that need to be killed, |
---|
| 1364 | or for zombie threads that should cause a restart. Default 100 |
---|
| 1365 | requests. |
---|
| 1366 | |
---|
| 1367 | ``threadpool_logger``: |
---|
| 1368 | |
---|
| 1369 | Logging messages will go the logger named here. |
---|
| 1370 | |
---|
| 1371 | ``threadpool_error_email`` (or global ``error_email`` setting): |
---|
| 1372 | |
---|
| 1373 | When threads are killed or the process restarted, this email |
---|
| 1374 | address will be contacted (using an SMTP server on localhost). |
---|
| 1375 | """ |
---|
| 1376 | |
---|
| 1377 | |
---|
| 1378 | if __name__ == '__main__': |
---|
| 1379 | from paste.wsgilib import dump_environ |
---|
| 1380 | #serve(dump_environ, ssl_pem="test.pem") |
---|
| 1381 | serve(dump_environ, server_version="Wombles/1.0", |
---|
| 1382 | protocol_version="HTTP/1.1", port="8888") |
---|
| 1383 | |
---|