1 | |
---|
2 | # Attempt to load threadframe module, and only define Heartbeat class |
---|
3 | # if available |
---|
4 | |
---|
5 | try: |
---|
6 | |
---|
7 | import pkg_resources |
---|
8 | pkg_resources.require( "threadframe" ) |
---|
9 | |
---|
10 | except: |
---|
11 | |
---|
12 | import sys |
---|
13 | print >> sys.stderr, "No threadframe module, Heartbeat not available" |
---|
14 | Heartbeat = None |
---|
15 | |
---|
16 | else: |
---|
17 | |
---|
18 | import threading |
---|
19 | import threadframe |
---|
20 | import time |
---|
21 | import traceback |
---|
22 | import os |
---|
23 | import sys |
---|
24 | |
---|
25 | def get_current_thread_object_dict(): |
---|
26 | """ |
---|
27 | Get a dictionary of all 'Thread' objects created via the threading |
---|
28 | module keyed by thread_id. Note that not all interpreter threads |
---|
29 | have a thread objects, only the main thread and any created via the |
---|
30 | 'threading' module. Threads created via the low level 'thread' module |
---|
31 | will not be in the returned dictionary. |
---|
32 | |
---|
33 | HACK: This mucks with the internals of the threading module since that |
---|
34 | module does not expose any way to match 'Thread' objects with |
---|
35 | intepreter thread identifiers (though it should). |
---|
36 | """ |
---|
37 | rval = dict() |
---|
38 | # Acquire the lock and then union the contents of 'active' and 'limbo' |
---|
39 | # threads into the return value. |
---|
40 | threading._active_limbo_lock.acquire() |
---|
41 | rval.update( threading._active ) |
---|
42 | rval.update( threading._limbo ) |
---|
43 | threading._active_limbo_lock.release() |
---|
44 | return rval |
---|
45 | |
---|
46 | class Heartbeat( threading.Thread ): |
---|
47 | """ |
---|
48 | Thread that periodically dumps the state of all threads to a file using |
---|
49 | the `threadframe` extension |
---|
50 | """ |
---|
51 | def __init__( self, name="Heartbeat Thread", period=20, fname="heartbeat.log" ): |
---|
52 | threading.Thread.__init__( self, name=name ) |
---|
53 | self.should_stop = False |
---|
54 | self.period = period |
---|
55 | self.fname = fname |
---|
56 | self.file = None |
---|
57 | self.fname_nonsleeping = fname + ".nonsleeping" |
---|
58 | self.file_nonsleeping = None |
---|
59 | self.nonsleeping_heartbeats = { } |
---|
60 | # Save process id |
---|
61 | self.pid = os.getpid() |
---|
62 | # Event to wait on when sleeping, allows us to interrupt for shutdown |
---|
63 | self.wait_event = threading.Event() |
---|
64 | def run( self ): |
---|
65 | self.file = open( self.fname, "a" ) |
---|
66 | print >> self.file, "Heartbeat for pid %d thread started at %s" % ( self.pid, time.asctime() ) |
---|
67 | print >> self.file |
---|
68 | self.file_nonsleeping = open ( self.fname_nonsleeping, "a" ) |
---|
69 | print >> self.file_nonsleeping, "Non-Sleeping-threads for pid %d thread started at %s" % ( self.pid, time.asctime() ) |
---|
70 | print >> self.file_nonsleeping |
---|
71 | try: |
---|
72 | while not self.should_stop: |
---|
73 | # Print separator with timestamp |
---|
74 | print >> self.file, "Traceback dump for all threads at %s:" % time.asctime() |
---|
75 | print >> self.file |
---|
76 | # Print the thread states |
---|
77 | threads = get_current_thread_object_dict() |
---|
78 | for thread_id, frame in threadframe.dict().iteritems(): |
---|
79 | if thread_id in threads: |
---|
80 | object = repr( threads[thread_id] ) |
---|
81 | else: |
---|
82 | object = "<No Thread object>" |
---|
83 | print >> self.file, "Thread %s, %s:" % ( thread_id, object ) |
---|
84 | print >> self.file |
---|
85 | traceback.print_stack( frame, file=self.file ) |
---|
86 | print >> self.file |
---|
87 | print >> self.file, "End dump" |
---|
88 | print >> self.file |
---|
89 | self.file.flush() |
---|
90 | self.print_nonsleeping(threads) |
---|
91 | # Sleep for a bit |
---|
92 | self.wait_event.wait( self.period ) |
---|
93 | finally: |
---|
94 | print >> self.file, "Heartbeat for pid %d thread stopped at %s" % ( self.pid, time.asctime() ) |
---|
95 | print >> self.file |
---|
96 | # Cleanup |
---|
97 | self.file.close() |
---|
98 | self.file_nonsleeping.close() |
---|
99 | |
---|
100 | def shutdown( self ): |
---|
101 | self.should_stop = True |
---|
102 | self.wait_event.set() |
---|
103 | self.join() |
---|
104 | |
---|
105 | def thread_is_sleeping ( self, last_stack_frame ): |
---|
106 | """ |
---|
107 | Returns True if the given stack-frame represents a known |
---|
108 | sleeper function (at least in python 2.5) |
---|
109 | """ |
---|
110 | _filename = last_stack_frame[0] |
---|
111 | _line = last_stack_frame[1] |
---|
112 | _funcname = last_stack_frame[2] |
---|
113 | _text = last_stack_frame[3] |
---|
114 | ### Ugly hack to tell if a thread is supposedly sleeping or not |
---|
115 | ### These are the most common sleeping functions I've found. |
---|
116 | ### Is there a better way? (python interpreter internals?) |
---|
117 | ### Tested only with python 2.5 |
---|
118 | if _funcname=="wait" and _text=="waiter.acquire()": |
---|
119 | return True |
---|
120 | if _funcname=="wait" and _text=="_sleep(delay)": |
---|
121 | return True |
---|
122 | if _funcname=="accept" and _text[-14:]=="_sock.accept()": |
---|
123 | return True |
---|
124 | #Ugly hack: always skip the heartbeat thread |
---|
125 | #TODO: get the current thread-id in python |
---|
126 | # skip heartbeat thread by thread-id, not by filename |
---|
127 | if _filename.find("/lib/galaxy/util/heartbeat.py")!=-1: |
---|
128 | return True |
---|
129 | ## By default, assume the thread is not sleeping |
---|
130 | return False |
---|
131 | |
---|
132 | def get_interesting_stack_frame ( self, stack_frames ): |
---|
133 | """ |
---|
134 | Scans a given backtrace stack frames, returns a single |
---|
135 | quadraple of [filename, line, function-name, text] of |
---|
136 | the single, deepest, most interesting frame. |
---|
137 | Interesting being: |
---|
138 | inside the galaxy source code ("/lib/galaxy"), |
---|
139 | prefreably not an egg. |
---|
140 | """ |
---|
141 | for _filename, _line, _funcname, _text in reversed(stack_frames): |
---|
142 | idx = _filename.find("/lib/galaxy/") |
---|
143 | if idx!=-1: |
---|
144 | relative_filename = _filename[idx:] |
---|
145 | return ( relative_filename, _line, _funcname, _text ) |
---|
146 | # no "/lib/galaxy" code found, return the innermost frame |
---|
147 | return stack_frames[-1] |
---|
148 | |
---|
149 | def print_nonsleeping( self, threads_object_dict ): |
---|
150 | print >> self.file_nonsleeping, "Non-Sleeping threads at %s:" % time.asctime() |
---|
151 | print >> self.file_nonsleeping |
---|
152 | all_threads_are_sleeping = True |
---|
153 | threads = get_current_thread_object_dict() |
---|
154 | for thread_id, frame in threadframe.dict().iteritems(): |
---|
155 | if thread_id in threads: |
---|
156 | object = repr( threads[thread_id] ) |
---|
157 | else: |
---|
158 | object = "<No Thread object>" |
---|
159 | tb = traceback.extract_stack(frame) |
---|
160 | if self.thread_is_sleeping(tb[-1]): |
---|
161 | if thread_id in self.nonsleeping_heartbeats: |
---|
162 | del self.nonsleeping_heartbeats[thread_id] |
---|
163 | continue |
---|
164 | |
---|
165 | # Count non-sleeping thread heartbeats |
---|
166 | if thread_id in self.nonsleeping_heartbeats: |
---|
167 | self.nonsleeping_heartbeats[thread_id] += 1 |
---|
168 | else: |
---|
169 | self.nonsleeping_heartbeats[thread_id]=1 |
---|
170 | |
---|
171 | good_frame = self.get_interesting_stack_frame(tb) |
---|
172 | print >> self.file_nonsleeping, "Thread %s\t%s\tnon-sleeping for %d heartbeat(s)\n File %s:%d\n Function \"%s\"\n %s" % \ |
---|
173 | ( thread_id, object, self.nonsleeping_heartbeats[thread_id], good_frame[0], good_frame[1], good_frame[2], good_frame[3] ) |
---|
174 | all_threads_are_sleeping = False |
---|
175 | |
---|
176 | if all_threads_are_sleeping: |
---|
177 | print >> self.file_nonsleeping, "All threads are sleeping." |
---|
178 | print >> self.file_nonsleeping |
---|
179 | self.file_nonsleeping.flush() |
---|
180 | |
---|