[2] | 1 | |
---|
| 2 | # Attempt to load threadframe module, and only define Heartbeat class |
---|
| 3 | # if available |
---|
| 4 | |
---|
| 5 | try: |
---|
| 6 | |
---|
| 7 | import pkg_resources |
---|
| 8 | pkg_resources.require( "threadframe" ) |
---|
| 9 | |
---|
| 10 | except: |
---|
| 11 | |
---|
| 12 | import sys |
---|
| 13 | print >> sys.stderr, "No threadframe module, Heartbeat not available" |
---|
| 14 | Heartbeat = None |
---|
| 15 | |
---|
| 16 | else: |
---|
| 17 | |
---|
| 18 | import threading |
---|
| 19 | import threadframe |
---|
| 20 | import time |
---|
| 21 | import traceback |
---|
| 22 | import os |
---|
| 23 | import sys |
---|
| 24 | |
---|
| 25 | def get_current_thread_object_dict(): |
---|
| 26 | """ |
---|
| 27 | Get a dictionary of all 'Thread' objects created via the threading |
---|
| 28 | module keyed by thread_id. Note that not all interpreter threads |
---|
| 29 | have a thread objects, only the main thread and any created via the |
---|
| 30 | 'threading' module. Threads created via the low level 'thread' module |
---|
| 31 | will not be in the returned dictionary. |
---|
| 32 | |
---|
| 33 | HACK: This mucks with the internals of the threading module since that |
---|
| 34 | module does not expose any way to match 'Thread' objects with |
---|
| 35 | intepreter thread identifiers (though it should). |
---|
| 36 | """ |
---|
| 37 | rval = dict() |
---|
| 38 | # Acquire the lock and then union the contents of 'active' and 'limbo' |
---|
| 39 | # threads into the return value. |
---|
| 40 | threading._active_limbo_lock.acquire() |
---|
| 41 | rval.update( threading._active ) |
---|
| 42 | rval.update( threading._limbo ) |
---|
| 43 | threading._active_limbo_lock.release() |
---|
| 44 | return rval |
---|
| 45 | |
---|
| 46 | class Heartbeat( threading.Thread ): |
---|
| 47 | """ |
---|
| 48 | Thread that periodically dumps the state of all threads to a file using |
---|
| 49 | the `threadframe` extension |
---|
| 50 | """ |
---|
| 51 | def __init__( self, name="Heartbeat Thread", period=20, fname="heartbeat.log" ): |
---|
| 52 | threading.Thread.__init__( self, name=name ) |
---|
| 53 | self.should_stop = False |
---|
| 54 | self.period = period |
---|
| 55 | self.fname = fname |
---|
| 56 | self.file = None |
---|
| 57 | self.fname_nonsleeping = fname + ".nonsleeping" |
---|
| 58 | self.file_nonsleeping = None |
---|
| 59 | self.nonsleeping_heartbeats = { } |
---|
| 60 | # Save process id |
---|
| 61 | self.pid = os.getpid() |
---|
| 62 | # Event to wait on when sleeping, allows us to interrupt for shutdown |
---|
| 63 | self.wait_event = threading.Event() |
---|
| 64 | def run( self ): |
---|
| 65 | self.file = open( self.fname, "a" ) |
---|
| 66 | print >> self.file, "Heartbeat for pid %d thread started at %s" % ( self.pid, time.asctime() ) |
---|
| 67 | print >> self.file |
---|
| 68 | self.file_nonsleeping = open ( self.fname_nonsleeping, "a" ) |
---|
| 69 | print >> self.file_nonsleeping, "Non-Sleeping-threads for pid %d thread started at %s" % ( self.pid, time.asctime() ) |
---|
| 70 | print >> self.file_nonsleeping |
---|
| 71 | try: |
---|
| 72 | while not self.should_stop: |
---|
| 73 | # Print separator with timestamp |
---|
| 74 | print >> self.file, "Traceback dump for all threads at %s:" % time.asctime() |
---|
| 75 | print >> self.file |
---|
| 76 | # Print the thread states |
---|
| 77 | threads = get_current_thread_object_dict() |
---|
| 78 | for thread_id, frame in threadframe.dict().iteritems(): |
---|
| 79 | if thread_id in threads: |
---|
| 80 | object = repr( threads[thread_id] ) |
---|
| 81 | else: |
---|
| 82 | object = "<No Thread object>" |
---|
| 83 | print >> self.file, "Thread %s, %s:" % ( thread_id, object ) |
---|
| 84 | print >> self.file |
---|
| 85 | traceback.print_stack( frame, file=self.file ) |
---|
| 86 | print >> self.file |
---|
| 87 | print >> self.file, "End dump" |
---|
| 88 | print >> self.file |
---|
| 89 | self.file.flush() |
---|
| 90 | self.print_nonsleeping(threads) |
---|
| 91 | # Sleep for a bit |
---|
| 92 | self.wait_event.wait( self.period ) |
---|
| 93 | finally: |
---|
| 94 | print >> self.file, "Heartbeat for pid %d thread stopped at %s" % ( self.pid, time.asctime() ) |
---|
| 95 | print >> self.file |
---|
| 96 | # Cleanup |
---|
| 97 | self.file.close() |
---|
| 98 | self.file_nonsleeping.close() |
---|
| 99 | |
---|
| 100 | def shutdown( self ): |
---|
| 101 | self.should_stop = True |
---|
| 102 | self.wait_event.set() |
---|
| 103 | self.join() |
---|
| 104 | |
---|
| 105 | def thread_is_sleeping ( self, last_stack_frame ): |
---|
| 106 | """ |
---|
| 107 | Returns True if the given stack-frame represents a known |
---|
| 108 | sleeper function (at least in python 2.5) |
---|
| 109 | """ |
---|
| 110 | _filename = last_stack_frame[0] |
---|
| 111 | _line = last_stack_frame[1] |
---|
| 112 | _funcname = last_stack_frame[2] |
---|
| 113 | _text = last_stack_frame[3] |
---|
| 114 | ### Ugly hack to tell if a thread is supposedly sleeping or not |
---|
| 115 | ### These are the most common sleeping functions I've found. |
---|
| 116 | ### Is there a better way? (python interpreter internals?) |
---|
| 117 | ### Tested only with python 2.5 |
---|
| 118 | if _funcname=="wait" and _text=="waiter.acquire()": |
---|
| 119 | return True |
---|
| 120 | if _funcname=="wait" and _text=="_sleep(delay)": |
---|
| 121 | return True |
---|
| 122 | if _funcname=="accept" and _text[-14:]=="_sock.accept()": |
---|
| 123 | return True |
---|
| 124 | #Ugly hack: always skip the heartbeat thread |
---|
| 125 | #TODO: get the current thread-id in python |
---|
| 126 | # skip heartbeat thread by thread-id, not by filename |
---|
| 127 | if _filename.find("/lib/galaxy/util/heartbeat.py")!=-1: |
---|
| 128 | return True |
---|
| 129 | ## By default, assume the thread is not sleeping |
---|
| 130 | return False |
---|
| 131 | |
---|
| 132 | def get_interesting_stack_frame ( self, stack_frames ): |
---|
| 133 | """ |
---|
| 134 | Scans a given backtrace stack frames, returns a single |
---|
| 135 | quadraple of [filename, line, function-name, text] of |
---|
| 136 | the single, deepest, most interesting frame. |
---|
| 137 | Interesting being: |
---|
| 138 | inside the galaxy source code ("/lib/galaxy"), |
---|
| 139 | prefreably not an egg. |
---|
| 140 | """ |
---|
| 141 | for _filename, _line, _funcname, _text in reversed(stack_frames): |
---|
| 142 | idx = _filename.find("/lib/galaxy/") |
---|
| 143 | if idx!=-1: |
---|
| 144 | relative_filename = _filename[idx:] |
---|
| 145 | return ( relative_filename, _line, _funcname, _text ) |
---|
| 146 | # no "/lib/galaxy" code found, return the innermost frame |
---|
| 147 | return stack_frames[-1] |
---|
| 148 | |
---|
| 149 | def print_nonsleeping( self, threads_object_dict ): |
---|
| 150 | print >> self.file_nonsleeping, "Non-Sleeping threads at %s:" % time.asctime() |
---|
| 151 | print >> self.file_nonsleeping |
---|
| 152 | all_threads_are_sleeping = True |
---|
| 153 | threads = get_current_thread_object_dict() |
---|
| 154 | for thread_id, frame in threadframe.dict().iteritems(): |
---|
| 155 | if thread_id in threads: |
---|
| 156 | object = repr( threads[thread_id] ) |
---|
| 157 | else: |
---|
| 158 | object = "<No Thread object>" |
---|
| 159 | tb = traceback.extract_stack(frame) |
---|
| 160 | if self.thread_is_sleeping(tb[-1]): |
---|
| 161 | if thread_id in self.nonsleeping_heartbeats: |
---|
| 162 | del self.nonsleeping_heartbeats[thread_id] |
---|
| 163 | continue |
---|
| 164 | |
---|
| 165 | # Count non-sleeping thread heartbeats |
---|
| 166 | if thread_id in self.nonsleeping_heartbeats: |
---|
| 167 | self.nonsleeping_heartbeats[thread_id] += 1 |
---|
| 168 | else: |
---|
| 169 | self.nonsleeping_heartbeats[thread_id]=1 |
---|
| 170 | |
---|
| 171 | good_frame = self.get_interesting_stack_frame(tb) |
---|
| 172 | print >> self.file_nonsleeping, "Thread %s\t%s\tnon-sleeping for %d heartbeat(s)\n File %s:%d\n Function \"%s\"\n %s" % \ |
---|
| 173 | ( thread_id, object, self.nonsleeping_heartbeats[thread_id], good_frame[0], good_frame[1], good_frame[2], good_frame[3] ) |
---|
| 174 | all_threads_are_sleeping = False |
---|
| 175 | |
---|
| 176 | if all_threads_are_sleeping: |
---|
| 177 | print >> self.file_nonsleeping, "All threads are sleeping." |
---|
| 178 | print >> self.file_nonsleeping |
---|
| 179 | self.file_nonsleeping.flush() |
---|
| 180 | |
---|