1 | # (c) 2005 Ian Bicking and contributors; written for Paste (http://pythonpaste.org) |
---|
2 | # Licensed under the MIT license: http://www.opensource.org/licenses/mit-license.php |
---|
3 | # (c) 2005 Clark C. Evans |
---|
4 | # This module is part of the Python Paste Project and is released under |
---|
5 | # the MIT License: http://www.opensource.org/licenses/mit-license.php |
---|
6 | # This code was written with funding by http://prometheusresearch.com |
---|
7 | """ |
---|
8 | WSGI HTTP Server |
---|
9 | |
---|
10 | This is a minimalistic WSGI server using Python's built-in BaseHTTPServer; |
---|
11 | if pyOpenSSL is installed, it also provides SSL capabilities. |
---|
12 | """ |
---|
13 | |
---|
14 | # @@: add in protection against HTTP/1.0 clients who claim to |
---|
15 | # be 1.1 but do not send a Content-Length |
---|
16 | |
---|
17 | # @@: add support for chunked encoding, this is not a 1.1 server |
---|
18 | # till this is completed. |
---|
19 | |
---|
20 | import atexit |
---|
21 | import traceback |
---|
22 | import socket, sys, threading, urlparse, Queue, urllib |
---|
23 | import posixpath |
---|
24 | import time |
---|
25 | import thread |
---|
26 | import os |
---|
27 | from itertools import count |
---|
28 | from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer |
---|
29 | from SocketServer import ThreadingMixIn |
---|
30 | from paste.util import converters |
---|
31 | import logging |
---|
32 | try: |
---|
33 | from paste.util import killthread |
---|
34 | except ImportError: |
---|
35 | # Not available, probably no ctypes |
---|
36 | killthread = None |
---|
37 | |
---|
38 | __all__ = ['WSGIHandlerMixin', 'WSGIServer', 'WSGIHandler', 'serve'] |
---|
39 | __version__ = "0.5" |
---|
40 | |
---|
41 | class ContinueHook(object): |
---|
42 | """ |
---|
43 | When a client request includes a 'Expect: 100-continue' header, then |
---|
44 | it is the responsibility of the server to send 100 Continue when it |
---|
45 | is ready for the content body. This allows authentication, access |
---|
46 | levels, and other exceptions to be detected *before* bandwith is |
---|
47 | spent on the request body. |
---|
48 | |
---|
49 | This is a rfile wrapper that implements this functionality by |
---|
50 | sending 100 Continue to the client immediately after the user |
---|
51 | requests the content via a read() operation on the rfile stream. |
---|
52 | After this response is sent, it becomes a pass-through object. |
---|
53 | """ |
---|
54 | |
---|
55 | def __init__(self, rfile, write): |
---|
56 | self._ContinueFile_rfile = rfile |
---|
57 | self._ContinueFile_write = write |
---|
58 | for attr in ('close', 'closed', 'fileno', 'flush', |
---|
59 | 'mode', 'bufsize', 'softspace'): |
---|
60 | if hasattr(rfile, attr): |
---|
61 | setattr(self, attr, getattr(rfile, attr)) |
---|
62 | for attr in ('read', 'readline', 'readlines'): |
---|
63 | if hasattr(rfile, attr): |
---|
64 | setattr(self, attr, getattr(self, '_ContinueFile_' + attr)) |
---|
65 | |
---|
66 | def _ContinueFile_send(self): |
---|
67 | self._ContinueFile_write("HTTP/1.1 100 Continue\r\n\r\n") |
---|
68 | rfile = self._ContinueFile_rfile |
---|
69 | for attr in ('read', 'readline', 'readlines'): |
---|
70 | if hasattr(rfile, attr): |
---|
71 | setattr(self, attr, getattr(rfile, attr)) |
---|
72 | |
---|
73 | def _ContinueFile_read(self, size=-1): |
---|
74 | self._ContinueFile_send() |
---|
75 | return self._ContinueFile_rfile.readline(size) |
---|
76 | |
---|
77 | def _ContinueFile_readline(self, size=-1): |
---|
78 | self._ContinueFile_send() |
---|
79 | return self._ContinueFile_rfile.readline(size) |
---|
80 | |
---|
81 | def _ContinueFile_readlines(self, sizehint=0): |
---|
82 | self._ContinueFile_send() |
---|
83 | return self._ContinueFile_rfile.readlines(sizehint) |
---|
84 | |
---|
85 | class WSGIHandlerMixin: |
---|
86 | """ |
---|
87 | WSGI mix-in for HTTPRequestHandler |
---|
88 | |
---|
89 | This class is a mix-in to provide WSGI functionality to any |
---|
90 | HTTPRequestHandler derivative (as provided in Python's BaseHTTPServer). |
---|
91 | This assumes a ``wsgi_application`` handler on ``self.server``. |
---|
92 | """ |
---|
93 | lookup_addresses = True |
---|
94 | |
---|
95 | def log_request(self, *args, **kwargs): |
---|
96 | """ disable success request logging |
---|
97 | |
---|
98 | Logging transactions should not be part of a WSGI server, |
---|
99 | if you want logging; look at paste.translogger |
---|
100 | """ |
---|
101 | pass |
---|
102 | |
---|
103 | def log_message(self, *args, **kwargs): |
---|
104 | """ disable error message logging |
---|
105 | |
---|
106 | Logging transactions should not be part of a WSGI server, |
---|
107 | if you want logging; look at paste.translogger |
---|
108 | """ |
---|
109 | pass |
---|
110 | |
---|
111 | def version_string(self): |
---|
112 | """ behavior that BaseHTTPServer should have had """ |
---|
113 | if not self.sys_version: |
---|
114 | return self.server_version |
---|
115 | else: |
---|
116 | return self.server_version + ' ' + self.sys_version |
---|
117 | |
---|
118 | def wsgi_write_chunk(self, chunk): |
---|
119 | """ |
---|
120 | Write a chunk of the output stream; send headers if they |
---|
121 | have not already been sent. |
---|
122 | """ |
---|
123 | if not self.wsgi_headers_sent and not self.wsgi_curr_headers: |
---|
124 | raise RuntimeError( |
---|
125 | "Content returned before start_response called") |
---|
126 | if not self.wsgi_headers_sent: |
---|
127 | self.wsgi_headers_sent = True |
---|
128 | (status, headers) = self.wsgi_curr_headers |
---|
129 | code, message = status.split(" ", 1) |
---|
130 | self.send_response(int(code), message) |
---|
131 | # |
---|
132 | # HTTP/1.1 compliance; either send Content-Length or |
---|
133 | # signal that the connection is being closed. |
---|
134 | # |
---|
135 | send_close = True |
---|
136 | for (k, v) in headers: |
---|
137 | lk = k.lower() |
---|
138 | if 'content-length' == lk: |
---|
139 | send_close = False |
---|
140 | if 'connection' == lk: |
---|
141 | if 'close' == v.lower(): |
---|
142 | self.close_connection = 1 |
---|
143 | send_close = False |
---|
144 | self.send_header(k, v) |
---|
145 | if send_close: |
---|
146 | self.close_connection = 1 |
---|
147 | self.send_header('Connection', 'close') |
---|
148 | |
---|
149 | self.end_headers() |
---|
150 | self.wfile.write(chunk) |
---|
151 | |
---|
152 | def wsgi_start_response(self, status, response_headers, exc_info=None): |
---|
153 | if exc_info: |
---|
154 | try: |
---|
155 | if self.wsgi_headers_sent: |
---|
156 | raise exc_info[0], exc_info[1], exc_info[2] |
---|
157 | else: |
---|
158 | # In this case, we're going to assume that the |
---|
159 | # higher-level code is currently handling the |
---|
160 | # issue and returning a resonable response. |
---|
161 | # self.log_error(repr(exc_info)) |
---|
162 | pass |
---|
163 | finally: |
---|
164 | exc_info = None |
---|
165 | elif self.wsgi_curr_headers: |
---|
166 | assert 0, "Attempt to set headers a second time w/o an exc_info" |
---|
167 | self.wsgi_curr_headers = (status, response_headers) |
---|
168 | return self.wsgi_write_chunk |
---|
169 | |
---|
170 | def wsgi_setup(self, environ=None): |
---|
171 | """ |
---|
172 | Setup the member variables used by this WSGI mixin, including |
---|
173 | the ``environ`` and status member variables. |
---|
174 | |
---|
175 | After the basic environment is created; the optional ``environ`` |
---|
176 | argument can be used to override any settings. |
---|
177 | """ |
---|
178 | |
---|
179 | (scheme, netloc, path, query, fragment) = urlparse.urlsplit(self.path) |
---|
180 | path = urllib.unquote(path) |
---|
181 | endslash = path.endswith('/') |
---|
182 | path = posixpath.normpath(path) |
---|
183 | if endslash and path != '/': |
---|
184 | # Put the slash back... |
---|
185 | path += '/' |
---|
186 | (server_name, server_port) = self.server.server_address |
---|
187 | |
---|
188 | rfile = self.rfile |
---|
189 | if 'HTTP/1.1' == self.protocol_version and \ |
---|
190 | '100-continue' == self.headers.get('Expect','').lower(): |
---|
191 | rfile = ContinueHook(rfile, self.wfile.write) |
---|
192 | else: |
---|
193 | # We can put in the protection to keep from over-reading the |
---|
194 | # file |
---|
195 | try: |
---|
196 | content_length = int(self.headers.get('Content-Length', '0')) |
---|
197 | except ValueError: |
---|
198 | content_length = 0 |
---|
199 | if not hasattr(self.connection, 'get_context'): |
---|
200 | # @@: LimitedLengthFile is currently broken in connection |
---|
201 | # with SSL (sporatic errors that are diffcult to trace, but |
---|
202 | # ones that go away when you don't use LimitedLengthFile) |
---|
203 | rfile = LimitedLengthFile(rfile, content_length) |
---|
204 | |
---|
205 | remote_address = self.client_address[0] |
---|
206 | self.wsgi_environ = { |
---|
207 | 'wsgi.version': (1,0) |
---|
208 | ,'wsgi.url_scheme': 'http' |
---|
209 | ,'wsgi.input': rfile |
---|
210 | ,'wsgi.errors': sys.stderr |
---|
211 | ,'wsgi.multithread': True |
---|
212 | ,'wsgi.multiprocess': False |
---|
213 | ,'wsgi.run_once': False |
---|
214 | # CGI variables required by PEP-333 |
---|
215 | ,'REQUEST_METHOD': self.command |
---|
216 | ,'SCRIPT_NAME': '' # application is root of server |
---|
217 | ,'PATH_INFO': path |
---|
218 | ,'QUERY_STRING': query |
---|
219 | ,'CONTENT_TYPE': self.headers.get('Content-Type', '') |
---|
220 | ,'CONTENT_LENGTH': self.headers.get('Content-Length', '0') |
---|
221 | ,'SERVER_NAME': server_name |
---|
222 | ,'SERVER_PORT': str(server_port) |
---|
223 | ,'SERVER_PROTOCOL': self.request_version |
---|
224 | # CGI not required by PEP-333 |
---|
225 | ,'REMOTE_ADDR': remote_address |
---|
226 | } |
---|
227 | if scheme: |
---|
228 | self.wsgi_environ['paste.httpserver.proxy.scheme'] = scheme |
---|
229 | if netloc: |
---|
230 | self.wsgi_environ['paste.httpserver.proxy.host'] = netloc |
---|
231 | |
---|
232 | if self.lookup_addresses: |
---|
233 | # @@: make lookup_addreses actually work, at this point |
---|
234 | # it has been address_string() is overriden down in |
---|
235 | # file and hence is a noop |
---|
236 | if remote_address.startswith("192.168.") \ |
---|
237 | or remote_address.startswith("10.") \ |
---|
238 | or remote_address.startswith("172.16."): |
---|
239 | pass |
---|
240 | else: |
---|
241 | address_string = None # self.address_string() |
---|
242 | if address_string: |
---|
243 | self.wsgi_environ['REMOTE_HOST'] = address_string |
---|
244 | |
---|
245 | if hasattr(self.server, 'thread_pool'): |
---|
246 | # Now that we know what the request was for, we should |
---|
247 | # tell the thread pool what its worker is working on |
---|
248 | self.server.thread_pool.worker_tracker[thread.get_ident()][1] = self.wsgi_environ |
---|
249 | self.wsgi_environ['paste.httpserver.thread_pool'] = self.server.thread_pool |
---|
250 | |
---|
251 | for k, v in self.headers.items(): |
---|
252 | key = 'HTTP_' + k.replace("-","_").upper() |
---|
253 | if key in ('HTTP_CONTENT_TYPE','HTTP_CONTENT_LENGTH'): |
---|
254 | continue |
---|
255 | self.wsgi_environ[key] = ','.join(self.headers.getheaders(k)) |
---|
256 | |
---|
257 | if hasattr(self.connection,'get_context'): |
---|
258 | self.wsgi_environ['wsgi.url_scheme'] = 'https' |
---|
259 | # @@: extract other SSL parameters from pyOpenSSL at... |
---|
260 | # http://www.modssl.org/docs/2.8/ssl_reference.html#ToC25 |
---|
261 | |
---|
262 | if environ: |
---|
263 | assert isinstance(environ, dict) |
---|
264 | self.wsgi_environ.update(environ) |
---|
265 | if 'on' == environ.get('HTTPS'): |
---|
266 | self.wsgi_environ['wsgi.url_scheme'] = 'https' |
---|
267 | |
---|
268 | self.wsgi_curr_headers = None |
---|
269 | self.wsgi_headers_sent = False |
---|
270 | |
---|
271 | def wsgi_connection_drop(self, exce, environ=None): |
---|
272 | """ |
---|
273 | Override this if you're interested in socket exceptions, such |
---|
274 | as when the user clicks 'Cancel' during a file download. |
---|
275 | """ |
---|
276 | pass |
---|
277 | |
---|
278 | def wsgi_execute(self, environ=None): |
---|
279 | """ |
---|
280 | Invoke the server's ``wsgi_application``. |
---|
281 | """ |
---|
282 | |
---|
283 | self.wsgi_setup(environ) |
---|
284 | |
---|
285 | try: |
---|
286 | result = self.server.wsgi_application(self.wsgi_environ, |
---|
287 | self.wsgi_start_response) |
---|
288 | try: |
---|
289 | for chunk in result: |
---|
290 | self.wsgi_write_chunk(chunk) |
---|
291 | if not self.wsgi_headers_sent: |
---|
292 | self.wsgi_write_chunk('') |
---|
293 | finally: |
---|
294 | if hasattr(result,'close'): |
---|
295 | result.close() |
---|
296 | result = None |
---|
297 | except socket.error, exce: |
---|
298 | self.wsgi_connection_drop(exce, environ) |
---|
299 | return |
---|
300 | except: |
---|
301 | if not self.wsgi_headers_sent: |
---|
302 | error_msg = "Internal Server Error\n" |
---|
303 | self.wsgi_curr_headers = ( |
---|
304 | '500 Internal Server Error', |
---|
305 | [('Content-type', 'text/plain'), |
---|
306 | ('Content-length', str(len(error_msg)))]) |
---|
307 | self.wsgi_write_chunk("Internal Server Error\n") |
---|
308 | raise |
---|
309 | |
---|
310 | # |
---|
311 | # SSL Functionality |
---|
312 | # |
---|
313 | # This implementation was motivated by Sebastien Martini's SSL example |
---|
314 | # http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/442473 |
---|
315 | # |
---|
316 | try: |
---|
317 | from OpenSSL import SSL, tsafe |
---|
318 | SocketErrors = (socket.error, SSL.ZeroReturnError, SSL.SysCallError) |
---|
319 | except ImportError: |
---|
320 | # Do not require pyOpenSSL to be installed, but disable SSL |
---|
321 | # functionality in that case. |
---|
322 | SSL = None |
---|
323 | SocketErrors = (socket.error,) |
---|
324 | class SecureHTTPServer(HTTPServer): |
---|
325 | def __init__(self, server_address, RequestHandlerClass, |
---|
326 | ssl_context=None): |
---|
327 | assert not ssl_context, "pyOpenSSL not installed" |
---|
328 | HTTPServer.__init__(self, server_address, RequestHandlerClass) |
---|
329 | else: |
---|
330 | |
---|
331 | class _ConnFixer(object): |
---|
332 | """ wraps a socket connection so it implements makefile """ |
---|
333 | def __init__(self, conn): |
---|
334 | self.__conn = conn |
---|
335 | def makefile(self, mode, bufsize): |
---|
336 | return socket._fileobject(self.__conn, mode, bufsize) |
---|
337 | def __getattr__(self, attrib): |
---|
338 | return getattr(self.__conn, attrib) |
---|
339 | |
---|
340 | class SecureHTTPServer(HTTPServer): |
---|
341 | """ |
---|
342 | Provides SSL server functionality on top of the BaseHTTPServer |
---|
343 | by overriding _private_ members of Python's standard |
---|
344 | distribution. The interface for this instance only changes by |
---|
345 | adding a an optional ssl_context attribute to the constructor: |
---|
346 | |
---|
347 | cntx = SSL.Context(SSL.SSLv23_METHOD) |
---|
348 | cntx.use_privatekey_file("host.pem") |
---|
349 | cntx.use_certificate_file("host.pem") |
---|
350 | |
---|
351 | """ |
---|
352 | |
---|
353 | def __init__(self, server_address, RequestHandlerClass, |
---|
354 | ssl_context=None): |
---|
355 | # This overrides the implementation of __init__ in python's |
---|
356 | # SocketServer.TCPServer (which BaseHTTPServer.HTTPServer |
---|
357 | # does not override, thankfully). |
---|
358 | HTTPServer.__init__(self, server_address, RequestHandlerClass) |
---|
359 | self.socket = socket.socket(self.address_family, |
---|
360 | self.socket_type) |
---|
361 | self.ssl_context = ssl_context |
---|
362 | if ssl_context: |
---|
363 | class TSafeConnection(tsafe.Connection): |
---|
364 | def settimeout(self, *args): |
---|
365 | self._lock.acquire() |
---|
366 | try: |
---|
367 | return self._ssl_conn.settimeout(*args) |
---|
368 | finally: |
---|
369 | self._lock.release() |
---|
370 | self.socket = TSafeConnection(ssl_context, self.socket) |
---|
371 | self.server_bind() |
---|
372 | self.server_activate() |
---|
373 | |
---|
374 | def get_request(self): |
---|
375 | # The default SSL request object does not seem to have a |
---|
376 | # ``makefile(mode, bufsize)`` method as expected by |
---|
377 | # Socketserver.StreamRequestHandler. |
---|
378 | (conn, info) = self.socket.accept() |
---|
379 | if self.ssl_context: |
---|
380 | conn = _ConnFixer(conn) |
---|
381 | return (conn, info) |
---|
382 | |
---|
383 | def _auto_ssl_context(): |
---|
384 | import OpenSSL, time, random |
---|
385 | pkey = OpenSSL.crypto.PKey() |
---|
386 | pkey.generate_key(OpenSSL.crypto.TYPE_RSA, 768) |
---|
387 | |
---|
388 | cert = OpenSSL.crypto.X509() |
---|
389 | |
---|
390 | cert.set_serial_number(random.randint(0, sys.maxint)) |
---|
391 | cert.gmtime_adj_notBefore(0) |
---|
392 | cert.gmtime_adj_notAfter(60 * 60 * 24 * 365) |
---|
393 | cert.get_subject().CN = '*' |
---|
394 | cert.get_subject().O = 'Dummy Certificate' |
---|
395 | cert.get_issuer().CN = 'Untrusted Authority' |
---|
396 | cert.get_issuer().O = 'Self-Signed' |
---|
397 | cert.set_pubkey(pkey) |
---|
398 | cert.sign(pkey, 'md5') |
---|
399 | |
---|
400 | ctx = SSL.Context(SSL.SSLv23_METHOD) |
---|
401 | ctx.use_privatekey(pkey) |
---|
402 | ctx.use_certificate(cert) |
---|
403 | |
---|
404 | return ctx |
---|
405 | |
---|
406 | class WSGIHandler(WSGIHandlerMixin, BaseHTTPRequestHandler): |
---|
407 | """ |
---|
408 | A WSGI handler that overrides POST, GET and HEAD to delegate |
---|
409 | requests to the server's ``wsgi_application``. |
---|
410 | """ |
---|
411 | server_version = 'PasteWSGIServer/' + __version__ |
---|
412 | |
---|
413 | def handle_one_request(self): |
---|
414 | """Handle a single HTTP request. |
---|
415 | |
---|
416 | You normally don't need to override this method; see the class |
---|
417 | __doc__ string for information on how to handle specific HTTP |
---|
418 | commands such as GET and POST. |
---|
419 | |
---|
420 | """ |
---|
421 | self.raw_requestline = self.rfile.readline() |
---|
422 | if not self.raw_requestline: |
---|
423 | self.close_connection = 1 |
---|
424 | return |
---|
425 | if not self.parse_request(): # An error code has been sent, just exit |
---|
426 | return |
---|
427 | self.wsgi_execute() |
---|
428 | |
---|
429 | def handle(self): |
---|
430 | # don't bother logging disconnects while handling a request |
---|
431 | try: |
---|
432 | BaseHTTPRequestHandler.handle(self) |
---|
433 | except SocketErrors, exce: |
---|
434 | self.wsgi_connection_drop(exce) |
---|
435 | |
---|
436 | def address_string(self): |
---|
437 | """Return the client address formatted for logging. |
---|
438 | |
---|
439 | This is overridden so that no hostname lookup is done. |
---|
440 | """ |
---|
441 | return '' |
---|
442 | |
---|
443 | class LimitedLengthFile(object): |
---|
444 | def __init__(self, file, length): |
---|
445 | self.file = file |
---|
446 | self.length = length |
---|
447 | self._consumed = 0 |
---|
448 | |
---|
449 | def __repr__(self): |
---|
450 | base_repr = repr(self.file) |
---|
451 | return base_repr[:-1] + ' length=%s>' % self.length |
---|
452 | |
---|
453 | def read(self, length=None): |
---|
454 | left = self.length - self._consumed |
---|
455 | if length is None: |
---|
456 | length = left |
---|
457 | else: |
---|
458 | length = min(length, left) |
---|
459 | # next two lines are hnecessary only if read(0) blocks |
---|
460 | if not left: |
---|
461 | return '' |
---|
462 | data = self.file.read(length) |
---|
463 | self._consumed += len(data) |
---|
464 | return data |
---|
465 | |
---|
466 | def readline(self, *args): |
---|
467 | data = self.file.readline(self.length - self._consumed) |
---|
468 | self._consumed += len(data) |
---|
469 | return data |
---|
470 | |
---|
471 | def readlines(self, hint=None): |
---|
472 | data = self.file.readlines(hint) |
---|
473 | for chunk in data: |
---|
474 | self._consumed += len(chunk) |
---|
475 | return data |
---|
476 | |
---|
477 | def __iter__(self): |
---|
478 | return self |
---|
479 | |
---|
480 | def next(self): |
---|
481 | if self.length - self._consumed <= 0: |
---|
482 | raise StopIteration |
---|
483 | return self.readline() |
---|
484 | |
---|
485 | ## Optional methods ## |
---|
486 | |
---|
487 | def seek(self, place): |
---|
488 | self.file.seek(place) |
---|
489 | self._consumed = place |
---|
490 | |
---|
491 | def tell(self): |
---|
492 | if hasattr(self.file, 'tell'): |
---|
493 | return self.file.tell() |
---|
494 | else: |
---|
495 | return self._consumed |
---|
496 | |
---|
497 | class ThreadPool(object): |
---|
498 | """ |
---|
499 | Generic thread pool with a queue of callables to consume. |
---|
500 | |
---|
501 | Keeps a notion of the status of its worker threads: |
---|
502 | |
---|
503 | idle: worker thread with nothing to do |
---|
504 | |
---|
505 | busy: worker thread doing its job |
---|
506 | |
---|
507 | hung: worker thread that's been doing a job for too long |
---|
508 | |
---|
509 | dying: a hung thread that has been killed, but hasn't died quite |
---|
510 | yet. |
---|
511 | |
---|
512 | zombie: what was a worker thread that we've tried to kill but |
---|
513 | isn't dead yet. |
---|
514 | |
---|
515 | At any time you can call track_threads, to get a dictionary with |
---|
516 | these keys and lists of thread_ids that fall in that status. All |
---|
517 | keys will be present, even if they point to emty lists. |
---|
518 | |
---|
519 | hung threads are threads that have been busy more than |
---|
520 | hung_thread_limit seconds. Hung threads are killed when they live |
---|
521 | longer than kill_thread_limit seconds. A thread is then |
---|
522 | considered dying for dying_limit seconds, if it is still alive |
---|
523 | after that it is considered a zombie. |
---|
524 | |
---|
525 | When there are no idle workers and a request comes in, another |
---|
526 | worker *may* be spawned. If there are less than spawn_if_under |
---|
527 | threads in the busy state, another thread will be spawned. So if |
---|
528 | the limit is 5, and there are 4 hung threads and 6 busy threads, |
---|
529 | no thread will be spawned. |
---|
530 | |
---|
531 | When there are more than max_zombie_threads_before_die zombie |
---|
532 | threads, a SystemExit exception will be raised, stopping the |
---|
533 | server. Use 0 or None to never raise this exception. Zombie |
---|
534 | threads *should* get cleaned up, but killing threads is no |
---|
535 | necessarily reliable. This is turned off by default, since it is |
---|
536 | only a good idea if you've deployed the server with some process |
---|
537 | watching from above (something similar to daemontools or zdaemon). |
---|
538 | |
---|
539 | Each worker thread only processes ``max_requests`` tasks before it |
---|
540 | dies and replaces itself with a new worker thread. |
---|
541 | """ |
---|
542 | |
---|
543 | |
---|
544 | SHUTDOWN = object() |
---|
545 | |
---|
546 | def __init__( |
---|
547 | self, nworkers, name="ThreadPool", daemon=False, |
---|
548 | max_requests=100, # threads are killed after this many requests |
---|
549 | hung_thread_limit=30, # when a thread is marked "hung" |
---|
550 | kill_thread_limit=1800, # when you kill that hung thread |
---|
551 | dying_limit=300, # seconds that a kill should take to go into effect (longer than this and the thread is a "zombie") |
---|
552 | spawn_if_under=5, # spawn if there's too many hung threads |
---|
553 | max_zombie_threads_before_die=0, # when to give up on the process |
---|
554 | hung_check_period=100, # every 100 requests check for hung workers |
---|
555 | logger=None, # Place to log messages to |
---|
556 | error_email=None, # Person(s) to notify if serious problem occurs |
---|
557 | ): |
---|
558 | """ |
---|
559 | Create thread pool with `nworkers` worker threads. |
---|
560 | """ |
---|
561 | self.nworkers = nworkers |
---|
562 | self.max_requests = max_requests |
---|
563 | self.name = name |
---|
564 | self.queue = Queue.Queue() |
---|
565 | self.workers = [] |
---|
566 | self.daemon = daemon |
---|
567 | if logger is None: |
---|
568 | logger = logging.getLogger('paste.httpserver.ThreadPool') |
---|
569 | if isinstance(logger, basestring): |
---|
570 | logger = logging.getLogger(logger) |
---|
571 | self.logger = logger |
---|
572 | self.error_email = error_email |
---|
573 | self._worker_count = count() |
---|
574 | |
---|
575 | assert (not kill_thread_limit |
---|
576 | or kill_thread_limit >= hung_thread_limit), ( |
---|
577 | "kill_thread_limit (%s) should be higher than hung_thread_limit (%s)" |
---|
578 | % (kill_thread_limit, hung_thread_limit)) |
---|
579 | if not killthread: |
---|
580 | kill_thread_limit = 0 |
---|
581 | self.logger.info( |
---|
582 | "Cannot use kill_thread_limit as ctypes/killthread is not available") |
---|
583 | self.kill_thread_limit = kill_thread_limit |
---|
584 | self.dying_limit = dying_limit |
---|
585 | self.hung_thread_limit = hung_thread_limit |
---|
586 | assert spawn_if_under <= nworkers, ( |
---|
587 | "spawn_if_under (%s) should be less than nworkers (%s)" |
---|
588 | % (spawn_if_under, nworkers)) |
---|
589 | self.spawn_if_under = spawn_if_under |
---|
590 | self.max_zombie_threads_before_die = max_zombie_threads_before_die |
---|
591 | self.hung_check_period = hung_check_period |
---|
592 | self.requests_since_last_hung_check = 0 |
---|
593 | # Used to keep track of what worker is doing what: |
---|
594 | self.worker_tracker = {} |
---|
595 | # Used to keep track of the workers not doing anything: |
---|
596 | self.idle_workers = [] |
---|
597 | # Used to keep track of threads that have been killed, but maybe aren't dead yet: |
---|
598 | self.dying_threads = {} |
---|
599 | # This is used to track when we last had to add idle workers; |
---|
600 | # we shouldn't cull extra workers until some time has passed |
---|
601 | # (hung_thread_limit) since workers were added: |
---|
602 | self._last_added_new_idle_workers = 0 |
---|
603 | if not daemon: |
---|
604 | atexit.register(self.shutdown) |
---|
605 | for i in range(self.nworkers): |
---|
606 | self.add_worker_thread(message='Initial worker pool') |
---|
607 | |
---|
608 | def add_task(self, task): |
---|
609 | """ |
---|
610 | Add a task to the queue |
---|
611 | """ |
---|
612 | self.logger.debug('Added task (%i tasks queued)', self.queue.qsize()) |
---|
613 | if self.hung_check_period: |
---|
614 | self.requests_since_last_hung_check += 1 |
---|
615 | if self.requests_since_last_hung_check > self.hung_check_period: |
---|
616 | self.requests_since_last_hung_check = 0 |
---|
617 | self.kill_hung_threads() |
---|
618 | if not self.idle_workers and self.spawn_if_under: |
---|
619 | # spawn_if_under can come into effect... |
---|
620 | busy = 0 |
---|
621 | now = time.time() |
---|
622 | self.logger.debug('No idle workers for task; checking if we need to make more workers') |
---|
623 | for worker in self.workers: |
---|
624 | if not hasattr(worker, 'thread_id'): |
---|
625 | # Not initialized |
---|
626 | continue |
---|
627 | time_started, info = self.worker_tracker.get(worker.thread_id, |
---|
628 | (None, None)) |
---|
629 | if time_started is not None: |
---|
630 | if now - time_started < self.hung_thread_limit: |
---|
631 | busy += 1 |
---|
632 | if busy < self.spawn_if_under: |
---|
633 | self.logger.info( |
---|
634 | 'No idle tasks, and only %s busy tasks; adding %s more ' |
---|
635 | 'workers', busy, self.spawn_if_under-busy) |
---|
636 | self._last_added_new_idle_workers = time.time() |
---|
637 | for i in range(self.spawn_if_under - busy): |
---|
638 | self.add_worker_thread(message='Response to lack of idle workers') |
---|
639 | else: |
---|
640 | self.logger.debug( |
---|
641 | 'No extra workers needed (%s busy workers)', |
---|
642 | busy) |
---|
643 | if (len(self.workers) > self.nworkers |
---|
644 | and len(self.idle_workers) > 3 |
---|
645 | and time.time()-self._last_added_new_idle_workers > self.hung_thread_limit): |
---|
646 | # We've spawned worers in the past, but they aren't needed |
---|
647 | # anymore; kill off some |
---|
648 | self.logger.info( |
---|
649 | 'Culling %s extra workers (%s idle workers present)', |
---|
650 | len(self.workers)-self.nworkers, len(self.idle_workers)) |
---|
651 | self.logger.debug( |
---|
652 | 'Idle workers: %s', self.idle_workers) |
---|
653 | for i in range(len(self.workers) - self.nworkers): |
---|
654 | self.queue.put(self.SHUTDOWN) |
---|
655 | self.queue.put(task) |
---|
656 | |
---|
657 | def track_threads(self): |
---|
658 | """ |
---|
659 | Return a dict summarizing the threads in the pool (as |
---|
660 | described in the ThreadPool docstring). |
---|
661 | """ |
---|
662 | result = dict(idle=[], busy=[], hung=[], dying=[], zombie=[]) |
---|
663 | now = time.time() |
---|
664 | for worker in self.workers: |
---|
665 | if not hasattr(worker, 'thread_id'): |
---|
666 | # The worker hasn't fully started up, we should just |
---|
667 | # ignore it |
---|
668 | continue |
---|
669 | time_started, info = self.worker_tracker.get(worker.thread_id, |
---|
670 | (None, None)) |
---|
671 | if time_started is not None: |
---|
672 | if now - time_started > self.hung_thread_limit: |
---|
673 | result['hung'].append(worker) |
---|
674 | else: |
---|
675 | result['busy'].append(worker) |
---|
676 | else: |
---|
677 | result['idle'].append(worker) |
---|
678 | for thread_id, (time_killed, worker) in self.dying_threads.items(): |
---|
679 | if not self.thread_exists(thread_id): |
---|
680 | # Cull dying threads that are actually dead and gone |
---|
681 | self.logger.info('Killed thread %s no longer around', |
---|
682 | thread_id) |
---|
683 | try: |
---|
684 | del self.dying_threads[thread_id] |
---|
685 | except KeyError: |
---|
686 | pass |
---|
687 | continue |
---|
688 | if now - time_killed > self.dying_limit: |
---|
689 | result['zombie'].append(worker) |
---|
690 | else: |
---|
691 | result['dying'].append(worker) |
---|
692 | return result |
---|
693 | |
---|
694 | def kill_worker(self, thread_id): |
---|
695 | """ |
---|
696 | Removes the worker with the given thread_id from the pool, and |
---|
697 | replaces it with a new worker thread. |
---|
698 | |
---|
699 | This should only be done for mis-behaving workers. |
---|
700 | """ |
---|
701 | if killthread is None: |
---|
702 | raise RuntimeError( |
---|
703 | "Cannot kill worker; killthread/ctypes not available") |
---|
704 | thread_obj = threading._active.get(thread_id) |
---|
705 | killthread.async_raise(thread_id, SystemExit) |
---|
706 | try: |
---|
707 | del self.worker_tracker[thread_id] |
---|
708 | except KeyError: |
---|
709 | pass |
---|
710 | self.logger.info('Killing thread %s', thread_id) |
---|
711 | if thread_obj in self.workers: |
---|
712 | self.workers.remove(thread_obj) |
---|
713 | self.dying_threads[thread_id] = (time.time(), thread_obj) |
---|
714 | self.add_worker_thread(message='Replacement for killed thread %s' % thread_id) |
---|
715 | |
---|
716 | def thread_exists(self, thread_id): |
---|
717 | """ |
---|
718 | Returns true if a thread with this id is still running |
---|
719 | """ |
---|
720 | return thread_id in threading._active |
---|
721 | |
---|
722 | def add_worker_thread(self, *args, **kwargs): |
---|
723 | index = self._worker_count.next() |
---|
724 | worker = threading.Thread(target=self.worker_thread_callback, |
---|
725 | args=args, kwargs=kwargs, |
---|
726 | name=("worker %d" % index)) |
---|
727 | worker.setDaemon(self.daemon) |
---|
728 | worker.start() |
---|
729 | |
---|
730 | def kill_hung_threads(self): |
---|
731 | """ |
---|
732 | Tries to kill any hung threads |
---|
733 | """ |
---|
734 | if not self.kill_thread_limit: |
---|
735 | # No killing should occur |
---|
736 | return |
---|
737 | now = time.time() |
---|
738 | max_time = 0 |
---|
739 | total_time = 0 |
---|
740 | idle_workers = 0 |
---|
741 | starting_workers = 0 |
---|
742 | working_workers = 0 |
---|
743 | killed_workers = 0 |
---|
744 | for worker in self.workers: |
---|
745 | if not hasattr(worker, 'thread_id'): |
---|
746 | # Not setup yet |
---|
747 | starting_workers += 1 |
---|
748 | continue |
---|
749 | time_started, info = self.worker_tracker.get(worker.thread_id, |
---|
750 | (None, None)) |
---|
751 | if time_started is None: |
---|
752 | # Must be idle |
---|
753 | idle_workers += 1 |
---|
754 | continue |
---|
755 | working_workers += 1 |
---|
756 | max_time = max(max_time, now-time_started) |
---|
757 | total_time += now-time_started |
---|
758 | if now - time_started > self.kill_thread_limit: |
---|
759 | self.logger.warning( |
---|
760 | 'Thread %s hung (working on task for %i seconds)', |
---|
761 | worker.thread_id, now - time_started) |
---|
762 | try: |
---|
763 | import pprint |
---|
764 | info_desc = pprint.pformat(info) |
---|
765 | except: |
---|
766 | out = StringIO() |
---|
767 | traceback.print_exc(file=out) |
---|
768 | info_desc = 'Error:\n%s' % out.getvalue() |
---|
769 | self.notify_problem( |
---|
770 | "Killing worker thread (id=%(thread_id)s) because it has been \n" |
---|
771 | "working on task for %(time)s seconds (limit is %(limit)s)\n" |
---|
772 | "Info on task:\n" |
---|
773 | "%(info)s" |
---|
774 | % dict(thread_id=worker.thread_id, |
---|
775 | time=now - time_started, |
---|
776 | limit=self.kill_thread_limit, |
---|
777 | info=info_desc)) |
---|
778 | self.kill_worker(worker.thread_id) |
---|
779 | killed_workers += 1 |
---|
780 | if working_workers: |
---|
781 | ave_time = float(total_time) / working_workers |
---|
782 | ave_time = '%.2fsec' % ave_time |
---|
783 | else: |
---|
784 | ave_time = 'N/A' |
---|
785 | self.logger.info( |
---|
786 | "kill_hung_threads status: %s threads (%s working, %s idle, %s starting) " |
---|
787 | "ave time %s, max time %.2fsec, killed %s workers" |
---|
788 | % (idle_workers + starting_workers + working_workers, |
---|
789 | working_workers, idle_workers, starting_workers, |
---|
790 | ave_time, max_time, killed_workers)) |
---|
791 | self.check_max_zombies() |
---|
792 | |
---|
793 | def check_max_zombies(self): |
---|
794 | """ |
---|
795 | Check if we've reached max_zombie_threads_before_die; if so |
---|
796 | then kill the entire process. |
---|
797 | """ |
---|
798 | if not self.max_zombie_threads_before_die: |
---|
799 | return |
---|
800 | found = [] |
---|
801 | now = time.time() |
---|
802 | for thread_id, (time_killed, worker) in self.dying_threads.items(): |
---|
803 | if not self.thread_exists(thread_id): |
---|
804 | # Cull dying threads that are actually dead and gone |
---|
805 | try: |
---|
806 | del self.dying_threads[thread_id] |
---|
807 | except KeyError: |
---|
808 | pass |
---|
809 | continue |
---|
810 | if now - time_killed > self.dying_limit: |
---|
811 | found.append(thread_id) |
---|
812 | if found: |
---|
813 | self.logger.info('Found %s zombie threads', found) |
---|
814 | if len(found) > self.max_zombie_threads_before_die: |
---|
815 | self.logger.fatal( |
---|
816 | 'Exiting process because %s zombie threads is more than %s limit', |
---|
817 | len(found), self.max_zombie_threads_before_die) |
---|
818 | self.notify_problem( |
---|
819 | "Exiting process because %(found)s zombie threads " |
---|
820 | "(more than limit of %(limit)s)\n" |
---|
821 | "Bad threads (ids):\n" |
---|
822 | " %(ids)s\n" |
---|
823 | % dict(found=len(found), |
---|
824 | limit=self.max_zombie_threads_before_die, |
---|
825 | ids="\n ".join(map(str, found))), |
---|
826 | subject="Process restart (too many zombie threads)") |
---|
827 | self.shutdown(10) |
---|
828 | print 'Shutting down', threading.currentThread() |
---|
829 | raise ServerExit(3) |
---|
830 | |
---|
831 | def worker_thread_callback(self, message=None): |
---|
832 | """ |
---|
833 | Worker thread should call this method to get and process queued |
---|
834 | callables. |
---|
835 | """ |
---|
836 | thread_obj = threading.currentThread() |
---|
837 | thread_id = thread_obj.thread_id = thread.get_ident() |
---|
838 | self.workers.append(thread_obj) |
---|
839 | self.idle_workers.append(thread_id) |
---|
840 | requests_processed = 0 |
---|
841 | add_replacement_worker = False |
---|
842 | self.logger.debug('Started new worker %s: %s', thread_id, message) |
---|
843 | try: |
---|
844 | while True: |
---|
845 | if self.max_requests and self.max_requests < requests_processed: |
---|
846 | # Replace this thread then die |
---|
847 | self.logger.debug('Thread %s processed %i requests (limit %s); stopping thread' |
---|
848 | % (thread_id, requests_processed, self.max_requests)) |
---|
849 | add_replacement_worker = True |
---|
850 | break |
---|
851 | runnable = self.queue.get() |
---|
852 | if runnable is ThreadPool.SHUTDOWN: |
---|
853 | self.logger.debug('Worker %s asked to SHUTDOWN', thread_id) |
---|
854 | break |
---|
855 | try: |
---|
856 | self.idle_workers.remove(thread_id) |
---|
857 | except ValueError: |
---|
858 | pass |
---|
859 | self.worker_tracker[thread_id] = [time.time(), None] |
---|
860 | requests_processed += 1 |
---|
861 | try: |
---|
862 | try: |
---|
863 | runnable() |
---|
864 | except: |
---|
865 | # We are later going to call sys.exc_clear(), |
---|
866 | # removing all remnants of any exception, so |
---|
867 | # we should log it now. But ideally no |
---|
868 | # exception should reach this level |
---|
869 | print >> sys.stderr, ( |
---|
870 | 'Unexpected exception in worker %r' % runnable) |
---|
871 | traceback.print_exc() |
---|
872 | if thread_id in self.dying_threads: |
---|
873 | # That last exception was intended to kill me |
---|
874 | break |
---|
875 | finally: |
---|
876 | try: |
---|
877 | del self.worker_tracker[thread_id] |
---|
878 | except KeyError: |
---|
879 | pass |
---|
880 | sys.exc_clear() |
---|
881 | self.idle_workers.append(thread_id) |
---|
882 | finally: |
---|
883 | try: |
---|
884 | del self.worker_tracker[thread_id] |
---|
885 | except KeyError: |
---|
886 | pass |
---|
887 | try: |
---|
888 | self.idle_workers.remove(thread_id) |
---|
889 | except ValueError: |
---|
890 | pass |
---|
891 | try: |
---|
892 | self.workers.remove(thread_obj) |
---|
893 | except ValueError: |
---|
894 | pass |
---|
895 | try: |
---|
896 | del self.dying_threads[thread_id] |
---|
897 | except KeyError: |
---|
898 | pass |
---|
899 | if add_replacement_worker: |
---|
900 | self.add_worker_thread(message='Voluntary replacement for thread %s' % thread_id) |
---|
901 | |
---|
902 | def shutdown(self, force_quit_timeout=0): |
---|
903 | """ |
---|
904 | Shutdown the queue (after finishing any pending requests). |
---|
905 | """ |
---|
906 | self.logger.info('Shutting down threadpool') |
---|
907 | # Add a shutdown request for every worker |
---|
908 | for i in range(len(self.workers)): |
---|
909 | self.queue.put(ThreadPool.SHUTDOWN) |
---|
910 | # Wait for each thread to terminate |
---|
911 | hung_workers = [] |
---|
912 | for worker in self.workers: |
---|
913 | worker.join(0.5) |
---|
914 | if worker.isAlive(): |
---|
915 | hung_workers.append(worker) |
---|
916 | zombies = [] |
---|
917 | for thread_id in self.dying_threads: |
---|
918 | if self.thread_exists(thread_id): |
---|
919 | zombies.append(thread_id) |
---|
920 | if hung_workers or zombies: |
---|
921 | self.logger.info("%s workers didn't stop properly, and %s zombies", |
---|
922 | len(hung_workers), len(zombies)) |
---|
923 | if hung_workers: |
---|
924 | for worker in hung_workers: |
---|
925 | self.kill_worker(worker.thread_id) |
---|
926 | self.logger.info('Workers killed forcefully') |
---|
927 | if force_quit_timeout: |
---|
928 | hung = [] |
---|
929 | timed_out = False |
---|
930 | need_force_quit = bool(zombies) |
---|
931 | for workers in self.workers: |
---|
932 | if not timed_out and worker.isAlive(): |
---|
933 | timed_out = True |
---|
934 | worker.join(force_quit_timeout) |
---|
935 | if worker.isAlive(): |
---|
936 | print "Worker %s won't die" % worker |
---|
937 | need_force_quit = True |
---|
938 | if need_force_quit: |
---|
939 | import atexit |
---|
940 | # Remove the threading atexit callback |
---|
941 | for callback in list(atexit._exithandlers): |
---|
942 | func = getattr(callback[0], 'im_func', None) |
---|
943 | if not func: |
---|
944 | continue |
---|
945 | globs = getattr(func, 'func_globals', {}) |
---|
946 | mod = globs.get('__name__') |
---|
947 | if mod == 'threading': |
---|
948 | atexit._exithandlers.remove(callback) |
---|
949 | atexit._run_exitfuncs() |
---|
950 | print 'Forcefully exiting process' |
---|
951 | os._exit(3) |
---|
952 | else: |
---|
953 | self.logger.info('All workers eventually killed') |
---|
954 | else: |
---|
955 | self.logger.info('All workers stopped') |
---|
956 | |
---|
957 | def notify_problem(self, msg, subject=None, spawn_thread=True): |
---|
958 | """ |
---|
959 | Called when there's a substantial problem. msg contains the |
---|
960 | body of the notification, subject the summary. |
---|
961 | |
---|
962 | If spawn_thread is true, then the email will be send in |
---|
963 | another thread (so this doesn't block). |
---|
964 | """ |
---|
965 | if not self.error_email: |
---|
966 | return |
---|
967 | if spawn_thread: |
---|
968 | t = threading.Thread( |
---|
969 | target=self.notify_problem, |
---|
970 | args=(msg, subject, False)) |
---|
971 | t.start() |
---|
972 | return |
---|
973 | from_address = 'errors@localhost' |
---|
974 | if not subject: |
---|
975 | subject = msg.strip().splitlines()[0] |
---|
976 | subject = subject[:50] |
---|
977 | subject = '[http threadpool] %s' % subject |
---|
978 | headers = [ |
---|
979 | "To: %s" % self.error_email, |
---|
980 | "From: %s" % from_address, |
---|
981 | "Subject: %s" % subject, |
---|
982 | ] |
---|
983 | try: |
---|
984 | system = ' '.join(os.uname()) |
---|
985 | except: |
---|
986 | system = '(unknown)' |
---|
987 | body = ( |
---|
988 | "An error has occurred in the paste.httpserver.ThreadPool\n" |
---|
989 | "Error:\n" |
---|
990 | " %(msg)s\n" |
---|
991 | "Occurred at: %(time)s\n" |
---|
992 | "PID: %(pid)s\n" |
---|
993 | "System: %(system)s\n" |
---|
994 | "Server .py file: %(file)s\n" |
---|
995 | % dict(msg=msg, |
---|
996 | time=time.strftime("%c"), |
---|
997 | pid=os.getpid(), |
---|
998 | system=system, |
---|
999 | file=os.path.abspath(__file__), |
---|
1000 | )) |
---|
1001 | message = '\n'.join(headers) + "\n\n" + body |
---|
1002 | import smtplib |
---|
1003 | server = smtplib.SMTP('localhost') |
---|
1004 | error_emails = [ |
---|
1005 | e.strip() for e in self.error_email.split(",") |
---|
1006 | if e.strip()] |
---|
1007 | server.sendmail(from_address, error_emails, message) |
---|
1008 | server.quit() |
---|
1009 | print 'email sent to', error_emails, message |
---|
1010 | |
---|
1011 | class ThreadPoolMixIn(object): |
---|
1012 | """ |
---|
1013 | Mix-in class to process requests from a thread pool |
---|
1014 | """ |
---|
1015 | def __init__(self, nworkers, daemon=False, **threadpool_options): |
---|
1016 | # Create and start the workers |
---|
1017 | self.running = True |
---|
1018 | assert nworkers > 0, "ThreadPoolMixIn servers must have at least one worker" |
---|
1019 | self.thread_pool = ThreadPool( |
---|
1020 | nworkers, |
---|
1021 | "ThreadPoolMixIn HTTP server on %s:%d" |
---|
1022 | % (self.server_name, self.server_port), |
---|
1023 | daemon, |
---|
1024 | **threadpool_options) |
---|
1025 | |
---|
1026 | def process_request(self, request, client_address): |
---|
1027 | """ |
---|
1028 | Queue the request to be processed by on of the thread pool threads |
---|
1029 | """ |
---|
1030 | # This sets the socket to blocking mode (and no timeout) since it |
---|
1031 | # may take the thread pool a little while to get back to it. (This |
---|
1032 | # is the default but since we set a timeout on the parent socket so |
---|
1033 | # that we can trap interrupts we need to restore this,.) |
---|
1034 | request.setblocking(1) |
---|
1035 | # Queue processing of the request |
---|
1036 | self.thread_pool.add_task( |
---|
1037 | lambda: self.process_request_in_thread(request, client_address)) |
---|
1038 | |
---|
1039 | def handle_error(self, request, client_address): |
---|
1040 | exc_class, exc, tb = sys.exc_info() |
---|
1041 | if exc_class is ServerExit: |
---|
1042 | # This is actually a request to stop the server |
---|
1043 | raise |
---|
1044 | return super(ThreadPoolMixIn, self).handle_error(request, client_address) |
---|
1045 | |
---|
1046 | def process_request_in_thread(self, request, client_address): |
---|
1047 | """ |
---|
1048 | The worker thread should call back here to do the rest of the |
---|
1049 | request processing. Error handling normaller done in 'handle_request' |
---|
1050 | must be done here. |
---|
1051 | """ |
---|
1052 | try: |
---|
1053 | self.finish_request(request, client_address) |
---|
1054 | self.close_request(request) |
---|
1055 | except: |
---|
1056 | self.handle_error(request, client_address) |
---|
1057 | self.close_request(request) |
---|
1058 | |
---|
1059 | def serve_forever(self): |
---|
1060 | """ |
---|
1061 | Overrides `serve_forever` to shut the threadpool down cleanly. |
---|
1062 | """ |
---|
1063 | try: |
---|
1064 | while self.running: |
---|
1065 | try: |
---|
1066 | self.handle_request() |
---|
1067 | except socket.timeout: |
---|
1068 | # Timeout is expected, gives interrupts a chance to |
---|
1069 | # propogate, just keep handling |
---|
1070 | pass |
---|
1071 | finally: |
---|
1072 | self.thread_pool.shutdown() |
---|
1073 | |
---|
1074 | def server_activate(self): |
---|
1075 | """ |
---|
1076 | Overrides server_activate to set timeout on our listener socket. |
---|
1077 | """ |
---|
1078 | # We set the timeout here so that we can trap interrupts on windows |
---|
1079 | self.socket.settimeout(1) |
---|
1080 | self.socket.listen(self.request_queue_size) |
---|
1081 | |
---|
1082 | def server_close(self): |
---|
1083 | """ |
---|
1084 | Finish pending requests and shutdown the server. |
---|
1085 | """ |
---|
1086 | self.running = False |
---|
1087 | self.socket.close() |
---|
1088 | self.thread_pool.shutdown(60) |
---|
1089 | |
---|
1090 | class WSGIServerBase(SecureHTTPServer): |
---|
1091 | def __init__(self, wsgi_application, server_address, |
---|
1092 | RequestHandlerClass=None, ssl_context=None): |
---|
1093 | SecureHTTPServer.__init__(self, server_address, |
---|
1094 | RequestHandlerClass, ssl_context) |
---|
1095 | self.wsgi_application = wsgi_application |
---|
1096 | self.wsgi_socket_timeout = None |
---|
1097 | |
---|
1098 | def get_request(self): |
---|
1099 | # If there is a socket_timeout, set it on the accepted |
---|
1100 | (conn,info) = SecureHTTPServer.get_request(self) |
---|
1101 | if self.wsgi_socket_timeout: |
---|
1102 | conn.settimeout(self.wsgi_socket_timeout) |
---|
1103 | return (conn, info) |
---|
1104 | |
---|
1105 | class WSGIServer(ThreadingMixIn, WSGIServerBase): |
---|
1106 | daemon_threads = False |
---|
1107 | |
---|
1108 | class WSGIThreadPoolServer(ThreadPoolMixIn, WSGIServerBase): |
---|
1109 | def __init__(self, wsgi_application, server_address, |
---|
1110 | RequestHandlerClass=None, ssl_context=None, |
---|
1111 | nworkers=10, daemon_threads=False, |
---|
1112 | threadpool_options=None): |
---|
1113 | WSGIServerBase.__init__(self, wsgi_application, server_address, |
---|
1114 | RequestHandlerClass, ssl_context) |
---|
1115 | if threadpool_options is None: |
---|
1116 | threadpool_options = {} |
---|
1117 | ThreadPoolMixIn.__init__(self, nworkers, daemon_threads, |
---|
1118 | **threadpool_options) |
---|
1119 | |
---|
1120 | class ServerExit(SystemExit): |
---|
1121 | """ |
---|
1122 | Raised to tell the server to really exit (SystemExit is normally |
---|
1123 | caught) |
---|
1124 | """ |
---|
1125 | |
---|
1126 | def serve(application, host=None, port=None, handler=None, ssl_pem=None, |
---|
1127 | ssl_context=None, server_version=None, protocol_version=None, |
---|
1128 | start_loop=True, daemon_threads=None, socket_timeout=None, |
---|
1129 | use_threadpool=None, threadpool_workers=10, |
---|
1130 | threadpool_options=None): |
---|
1131 | """ |
---|
1132 | Serves your ``application`` over HTTP(S) via WSGI interface |
---|
1133 | |
---|
1134 | ``host`` |
---|
1135 | |
---|
1136 | This is the ipaddress to bind to (or a hostname if your |
---|
1137 | nameserver is properly configured). This defaults to |
---|
1138 | 127.0.0.1, which is not a public interface. |
---|
1139 | |
---|
1140 | ``port`` |
---|
1141 | |
---|
1142 | The port to run on, defaults to 8080 for HTTP, or 4443 for |
---|
1143 | HTTPS. This can be a string or an integer value. |
---|
1144 | |
---|
1145 | ``handler`` |
---|
1146 | |
---|
1147 | This is the HTTP request handler to use, it defaults to |
---|
1148 | ``WSGIHandler`` in this module. |
---|
1149 | |
---|
1150 | ``ssl_pem`` |
---|
1151 | |
---|
1152 | This an optional SSL certificate file (via OpenSSL). You can |
---|
1153 | supply ``*`` and a development-only certificate will be |
---|
1154 | created for you, or you can generate a self-signed test PEM |
---|
1155 | certificate file as follows:: |
---|
1156 | |
---|
1157 | $ openssl genrsa 1024 > host.key |
---|
1158 | $ chmod 400 host.key |
---|
1159 | $ openssl req -new -x509 -nodes -sha1 -days 365 \\ |
---|
1160 | -key host.key > host.cert |
---|
1161 | $ cat host.cert host.key > host.pem |
---|
1162 | $ chmod 400 host.pem |
---|
1163 | |
---|
1164 | ``ssl_context`` |
---|
1165 | |
---|
1166 | This an optional SSL context object for the server. A SSL |
---|
1167 | context will be automatically constructed for you if you supply |
---|
1168 | ``ssl_pem``. Supply this to use a context of your own |
---|
1169 | construction. |
---|
1170 | |
---|
1171 | ``server_version`` |
---|
1172 | |
---|
1173 | The version of the server as reported in HTTP response line. This |
---|
1174 | defaults to something like "PasteWSGIServer/0.5". Many servers |
---|
1175 | hide their code-base identity with a name like 'Amnesiac/1.0' |
---|
1176 | |
---|
1177 | ``protocol_version`` |
---|
1178 | |
---|
1179 | This sets the protocol used by the server, by default |
---|
1180 | ``HTTP/1.0``. There is some support for ``HTTP/1.1``, which |
---|
1181 | defaults to nicer keep-alive connections. This server supports |
---|
1182 | ``100 Continue``, but does not yet support HTTP/1.1 Chunked |
---|
1183 | Encoding. Hence, if you use HTTP/1.1, you're somewhat in error |
---|
1184 | since chunked coding is a mandatory requirement of a HTTP/1.1 |
---|
1185 | server. If you specify HTTP/1.1, every response *must* have a |
---|
1186 | ``Content-Length`` and you must be careful not to read past the |
---|
1187 | end of the socket. |
---|
1188 | |
---|
1189 | ``start_loop`` |
---|
1190 | |
---|
1191 | This specifies if the server loop (aka ``server.serve_forever()``) |
---|
1192 | should be called; it defaults to ``True``. |
---|
1193 | |
---|
1194 | ``daemon_threads`` |
---|
1195 | |
---|
1196 | This flag specifies if when your webserver terminates all |
---|
1197 | in-progress client connections should be droppped. It defaults |
---|
1198 | to ``False``. You might want to set this to ``True`` if you |
---|
1199 | are using ``HTTP/1.1`` and don't set a ``socket_timeout``. |
---|
1200 | |
---|
1201 | ``socket_timeout`` |
---|
1202 | |
---|
1203 | This specifies the maximum amount of time that a connection to a |
---|
1204 | given client will be kept open. At this time, it is a rude |
---|
1205 | disconnect, but at a later time it might follow the RFC a bit |
---|
1206 | more closely. |
---|
1207 | |
---|
1208 | ``use_threadpool`` |
---|
1209 | |
---|
1210 | Server requests from a pool of worker threads (``threadpool_workers``) |
---|
1211 | rather than creating a new thread for each request. This can |
---|
1212 | substantially reduce latency since there is a high cost associated |
---|
1213 | with thread creation. |
---|
1214 | |
---|
1215 | ``threadpool_workers`` |
---|
1216 | |
---|
1217 | Number of worker threads to create when ``use_threadpool`` is true. This |
---|
1218 | can be a string or an integer value. |
---|
1219 | |
---|
1220 | ``threadpool_options`` |
---|
1221 | |
---|
1222 | A dictionary of options to be used when instantiating the |
---|
1223 | threadpool. See paste.httpserver.ThreadPool for specific |
---|
1224 | options (``threadpool_workers`` is a specific option that can |
---|
1225 | also go here). |
---|
1226 | """ |
---|
1227 | is_ssl = False |
---|
1228 | if ssl_pem or ssl_context: |
---|
1229 | assert SSL, "pyOpenSSL is not installed" |
---|
1230 | is_ssl = True |
---|
1231 | port = int(port or 4443) |
---|
1232 | if not ssl_context: |
---|
1233 | if ssl_pem == '*': |
---|
1234 | ssl_context = _auto_ssl_context() |
---|
1235 | else: |
---|
1236 | ssl_context = SSL.Context(SSL.SSLv23_METHOD) |
---|
1237 | ssl_context.use_privatekey_file(ssl_pem) |
---|
1238 | ssl_context.use_certificate_chain_file(ssl_pem) |
---|
1239 | |
---|
1240 | host = host or '127.0.0.1' |
---|
1241 | if not port: |
---|
1242 | if ':' in host: |
---|
1243 | host, port = host.split(':', 1) |
---|
1244 | else: |
---|
1245 | port = 8080 |
---|
1246 | server_address = (host, int(port)) |
---|
1247 | |
---|
1248 | if not handler: |
---|
1249 | handler = WSGIHandler |
---|
1250 | if server_version: |
---|
1251 | handler.server_version = server_version |
---|
1252 | handler.sys_version = None |
---|
1253 | if protocol_version: |
---|
1254 | assert protocol_version in ('HTTP/0.9', 'HTTP/1.0', 'HTTP/1.1') |
---|
1255 | handler.protocol_version = protocol_version |
---|
1256 | |
---|
1257 | if use_threadpool is None: |
---|
1258 | use_threadpool = True |
---|
1259 | |
---|
1260 | if converters.asbool(use_threadpool): |
---|
1261 | server = WSGIThreadPoolServer(application, server_address, handler, |
---|
1262 | ssl_context, int(threadpool_workers), |
---|
1263 | daemon_threads, |
---|
1264 | threadpool_options=threadpool_options) |
---|
1265 | else: |
---|
1266 | server = WSGIServer(application, server_address, handler, ssl_context) |
---|
1267 | if daemon_threads: |
---|
1268 | server.daemon_threads = daemon_threads |
---|
1269 | |
---|
1270 | if socket_timeout: |
---|
1271 | server.wsgi_socket_timeout = int(socket_timeout) |
---|
1272 | |
---|
1273 | if converters.asbool(start_loop): |
---|
1274 | protocol = is_ssl and 'https' or 'http' |
---|
1275 | host, port = server.server_address |
---|
1276 | if host == '0.0.0.0': |
---|
1277 | print 'serving on 0.0.0.0:%s view at %s://127.0.0.1:%s' % \ |
---|
1278 | (port, protocol, port) |
---|
1279 | else: |
---|
1280 | print "serving on %s://%s:%s" % (protocol, host, port) |
---|
1281 | try: |
---|
1282 | server.serve_forever() |
---|
1283 | except KeyboardInterrupt: |
---|
1284 | # allow CTRL+C to shutdown |
---|
1285 | pass |
---|
1286 | return server |
---|
1287 | |
---|
1288 | # For paste.deploy server instantiation (egg:Paste#http) |
---|
1289 | # Note: this gets a separate function because it has to expect string |
---|
1290 | # arguments (though that's not much of an issue yet, ever?) |
---|
1291 | def server_runner(wsgi_app, global_conf, **kwargs): |
---|
1292 | from paste.deploy.converters import asbool |
---|
1293 | for name in ['port', 'socket_timeout', 'threadpool_workers', |
---|
1294 | 'threadpool_hung_thread_limit', |
---|
1295 | 'threadpool_kill_thread_limit', |
---|
1296 | 'threadpool_dying_limit', 'threadpool_spawn_if_under', |
---|
1297 | 'threadpool_max_zombie_threads_before_die', |
---|
1298 | 'threadpool_hung_check_period', |
---|
1299 | 'threadpool_max_requests']: |
---|
1300 | if name in kwargs: |
---|
1301 | kwargs[name] = int(kwargs[name]) |
---|
1302 | for name in ['use_threadpool', 'daemon_threads']: |
---|
1303 | if name in kwargs: |
---|
1304 | kwargs[name] = asbool(kwargs[name]) |
---|
1305 | threadpool_options = {} |
---|
1306 | for name, value in kwargs.items(): |
---|
1307 | if name.startswith('threadpool_') and name != 'threadpool_workers': |
---|
1308 | threadpool_options[name[len('threadpool_'):]] = value |
---|
1309 | del kwargs[name] |
---|
1310 | if ('error_email' not in threadpool_options |
---|
1311 | and 'error_email' in global_conf): |
---|
1312 | threadpool_options['error_email'] = global_conf['error_email'] |
---|
1313 | kwargs['threadpool_options'] = threadpool_options |
---|
1314 | serve(wsgi_app, **kwargs) |
---|
1315 | |
---|
1316 | server_runner.__doc__ = serve.__doc__ + """ |
---|
1317 | |
---|
1318 | You can also set these threadpool options: |
---|
1319 | |
---|
1320 | ``threadpool_max_requests``: |
---|
1321 | |
---|
1322 | The maximum number of requests a worker thread will process |
---|
1323 | before dying (and replacing itself with a new worker thread). |
---|
1324 | Default 100. |
---|
1325 | |
---|
1326 | ``threadpool_hung_thread_limit``: |
---|
1327 | |
---|
1328 | The number of seconds a thread can work on a task before it is |
---|
1329 | considered hung (stuck). Default 30 seconds. |
---|
1330 | |
---|
1331 | ``threadpool_kill_thread_limit``: |
---|
1332 | |
---|
1333 | The number of seconds a thread can work before you should kill it |
---|
1334 | (assuming it will never finish). Default 600 seconds (10 minutes). |
---|
1335 | |
---|
1336 | ``threadpool_dying_limit``: |
---|
1337 | |
---|
1338 | The length of time after killing a thread that it should actually |
---|
1339 | disappear. If it lives longer than this, it is considered a |
---|
1340 | "zombie". Note that even in easy situations killing a thread can |
---|
1341 | be very slow. Default 300 seconds (5 minutes). |
---|
1342 | |
---|
1343 | ``threadpool_spawn_if_under``: |
---|
1344 | |
---|
1345 | If there are no idle threads and a request comes in, and there are |
---|
1346 | less than this number of *busy* threads, then add workers to the |
---|
1347 | pool. Busy threads are threads that have taken less than |
---|
1348 | ``threadpool_hung_thread_limit`` seconds so far. So if you get |
---|
1349 | *lots* of requests but they complete in a reasonable amount of time, |
---|
1350 | the requests will simply queue up (adding more threads probably |
---|
1351 | wouldn't speed them up). But if you have lots of hung threads and |
---|
1352 | one more request comes in, this will add workers to handle it. |
---|
1353 | Default 5. |
---|
1354 | |
---|
1355 | ``threadpool_max_zombie_threads_before_die``: |
---|
1356 | |
---|
1357 | If there are more zombies than this, just kill the process. This is |
---|
1358 | only good if you have a monitor that will automatically restart |
---|
1359 | the server. This can clean up the mess. Default 0 (disabled). |
---|
1360 | |
---|
1361 | `threadpool_hung_check_period``: |
---|
1362 | |
---|
1363 | Every X requests, check for hung threads that need to be killed, |
---|
1364 | or for zombie threads that should cause a restart. Default 100 |
---|
1365 | requests. |
---|
1366 | |
---|
1367 | ``threadpool_logger``: |
---|
1368 | |
---|
1369 | Logging messages will go the logger named here. |
---|
1370 | |
---|
1371 | ``threadpool_error_email`` (or global ``error_email`` setting): |
---|
1372 | |
---|
1373 | When threads are killed or the process restarted, this email |
---|
1374 | address will be contacted (using an SMTP server on localhost). |
---|
1375 | """ |
---|
1376 | |
---|
1377 | |
---|
1378 | if __name__ == '__main__': |
---|
1379 | from paste.wsgilib import dump_environ |
---|
1380 | #serve(dump_environ, ssl_pem="test.pem") |
---|
1381 | serve(dump_environ, server_version="Wombles/1.0", |
---|
1382 | protocol_version="HTTP/1.1", port="8888") |
---|
1383 | |
---|