#!/usr/bin/python """PyConquer, a concurrent-thread analyzer.""" usage = """PyConquer tracing tool Usage: pyconquer.py [-h] [-e=] [-f=path] [-n] [-r=.*] MODULE.py [ARG1 ARG2 ...] -e --events=: Trace the given events (separate by commas). Available events: 'c_events' = ('c_call', 'c_return', 'c_exception') 'non_c_events' = ('call', 'return', 'exception', 'line') 'all_events' = c_events + non_c_events 'all_call_events' = ('call', 'return', 'c_call', 'c_return') -f --filename=: the file where trace output will be written. If omitted, output goes to stdout. -n --notimes: do not emit call times in the output. -r --regex=: limit traced modules to filenames matching the given regular expression. If omitted, all modules are traced. If the given regex begins with "!", filenames which do NOT match will be traced. """ import os import re try: set except NameError: from sets import Set as set import sys import thread import threading import time import traceback import warnings non_c_events = ('call', 'return', 'exception', 'line') c_events = ('c_call', 'c_return', 'c_exception') all_events = c_events + non_c_events all_call_events = ('call', 'return', 'c_call', 'c_return') def scope_name(frame): path, name = os.path.split(frame.f_code.co_filename) # Strip off any file extension name = os.path.splitext(name)[0] if name == "__init__": return "/".join((os.path.split(path)[1], name)) else: return name def is_thread_start(frame, event): """Return True if current frame/event is threading.Thread.start().""" if event == 'call' and frame.f_code.co_name == 'start': localself = frame.f_locals.get('self') if localself and isinstance(localself, threading.Thread): return True return False def is_thread_stop(frame, event): """Return True if current frame/event is threading.Thread.__stop().""" if event == 'return' and frame.f_code.co_name == '__stop': # Process threading.Thread.__stop() localself = frame.f_locals.get('self') if localself and isinstance(localself, threading.Thread): return True return False TERMINATED = 0 RUNNING = 1 class Swimlane: """A thread (since thread id's get re-used, this differentiates them).""" def __init__(self, index): self.indent = 0 self.index = index self.state = RUNNING self.calltimes = [] self.ccalltimes = [] class Tracer: def __init__(self, fileregex=".*", events=None): self.active = False self.swimlanes = {} self.numlanes = 0 self.tabsize = 8 self.fileregex = fileregex if events is None: events = ['call', 'return', 'exception'] self.events = set(events) def start(self, caller=None): self._using_profile_hook = False for e in c_events: if e in self.events: # Must also use the profile hook, # since tracing does not support C events. threading.setprofile(self.profhook) sys.setprofile(self.profhook) self._using_profile_hook = True break # Set the trace hook for all events other than C events # (it's easier to clean up nicely). threading.settrace(self.hook) sys.settrace(self.hook) # Set the calling frame's local trace func to our hook. if caller is None: caller = sys._getframe().f_back caller.f_trace = self.hook # Set '_caller_lineno' to help with writing tests. self._caller_lineno = caller.f_lineno self.active = True def stop(self, caller=None): self.active = False if threading._trace_hook == self.hook: threading.settrace(None) if threading._profile_hook == self.profhook: threading.setprofile(None) # Ugly hack to get around no sys.gettrace(). if caller is None: caller = sys._getframe().f_back if caller.f_trace == self.hook: sys.settrace(None) if caller.f_back and caller.f_back.f_trace == self.hook: sys.settrace(None) # Ugly hack to get around no sys.getprofile(). if self._using_profile_hook: sys.setprofile(None) def profhook(self, frame, event, arg): if not event.startswith("c_"): return self.hook(frame, event, arg) def hook(self, frame, event, arg): # Base method; must be overridden. pass # Logging # class WatchedValue: def __init__(self, obj, attr, default=None, name=None): self.obj = obj self.attr = attr self.default = default if name is None: try: name = ("%s.%s" % (obj.__class__.__module__, obj.__class__.__name__)) except AttributeError: name = repr(obj) name = "%s.%s" % (name, attr) self.name = name self.oldvalue = self.getvalue() def getvalue(self): return getattr(self.obj, self.attr, self.default) class Logger(Tracer): def __init__(self, fileregex=".*", events=None): Tracer.__init__(self, fileregex, events) self.currentlane = Swimlane(-1) self.watches = {} self.out = sys.stdout self.time_calls = True # Don't use an RLock here! hook() will hang because Thread.__delete # will acquire the _active_limbo_lock, and then RLock.acquire() # will call currentThread (which creates a _DummyThread, which will # try in its __init__ method to acquire the _active_limbo_lock). self._mutex = threading.Lock() def start(self): self.out.write("\n\n________ pyconquer tracing started ________\n") self.write_watches() Tracer.start(self, sys._getframe().f_back) def stop(self): self.currentlane = Swimlane(-1) Tracer.stop(self, sys._getframe().f_back) self.out.write("\n\n________ pyconquer tracing stopped ________\n") def hook(self, frame, event, arg): try: if not self.active: return self._mutex.acquire(True) try: filename = frame.f_code.co_filename # Deny tracing this module itself. # TODO: is this Windows-only? if r"\pyconquer.py" in filename: return self.hook # Handle this event if requested. if event in self.events: try: if self.fileregex.startswith("!"): process = not re.search(self.fileregex[1:], filename) else: process = re.search(self.fileregex, filename) except GeneratorExit: warnings.warn("Generator bug hit; proceeding anyway. " "See http://bugs.python.org/issue1454.") process = True if not process: if is_thread_start(frame, event): process = True event = 'threadstart' elif is_thread_stop(frame, event): process = True event = 'threadstop' if process: self.process_event(frame, event, arg) # See if any of our watched attributes have changed. self.write_watches(changes_only=True) return self.hook finally: self._mutex.release() except: self.out.write("\n\n________ pyconquer traceback start ________\n") self.out.write(traceback.format_exc()) self.out.write("\n\n________ pyconquer traceback end ________\n") raise def write_lane(self, msg, indent=None): lane = self.currentlane tab = " " * (self.tabsize * lane.index) if indent is None: indent = lane.indent * 2 self.out.write("\n%s%s%s" % (tab, "-" * indent, msg)) # Processors # def process_event(self, frame, event, arg): # Locate the current lane. ct = thread.get_ident() lane = self.swimlanes.get(ct) if lane is None: # Since we're logging only (and not verifying), re-use # the lane index to reduce right-shift. for k, v in self.swimlanes.items(): if v.state == TERMINATED: index = v.index break else: index = self.numlanes self.numlanes += 1 lane = self.swimlanes[ct] = Swimlane(index) elif lane.state == TERMINATED: # Terminated threads may have their ID's re-used; # but use the same lane to reduce right-shift lane = self.swimlanes[ct] = Swimlane(lane.index) if event == 'threadstop': lane.state = TERMINATED if not (lane is self.currentlane): self.currentlane = lane # Write a header row with the current thread's name. self.out.write("\n") self.write_lane("[ %s ]" % threading.currentThread().getName()) getattr(self, "process_" + event)(frame, arg) def process_call(self, frame, arg): self.currentlane.indent += 1 name = frame.f_code.co_name if name == "?": self.write_lane("> import %s" % frame.f_code.co_filename) else: self.write_lane("> %s (%s:%s)" % (name, scope_name(frame), frame.f_lineno)) if self.time_calls: self.currentlane.calltimes.append(time.clock()) def process_return(self, frame, arg): if self.time_calls: end = time.clock() start = self.currentlane.calltimes.pop() diff = " %.3fms" % ((end - start) * 1000) else: diff = "" retval = "" if arg is not None: retval = ": %s" % repr(arg) name = frame.f_code.co_name if name == "?": self.write_lane("< import %s (%s lines)" % (frame.f_code.co_filename, frame.f_lineno)) else: self.write_lane("< %s (%s:%s)%s%s" % (name, scope_name(frame), frame.f_lineno, retval, diff)) self.currentlane.indent -= 1 def process_c_call(self, frame, arg): self.currentlane.indent += 1 # arg should be the called C function name = type(arg.__self__).__name__ if name == 'NoneType': name = repr(arg) else: name = name + "." + arg.__name__ self.write_lane("[ %s (%s:%s)" % (name, scope_name(frame), frame.f_lineno)) if self.time_calls: self.currentlane.ccalltimes.append(time.clock()) def process_c_return(self, frame, arg): if self.time_calls: end = time.clock() start = self.currentlane.ccalltimes.pop() diff = " %.3fms" % ((end - start) * 1000) else: diff = "" # arg should be the called C function name = type(arg.__self__).__name__ if name == 'NoneType': name = repr(arg) else: name = name + "." + arg.__name__ self.write_lane("] %s (%s:%s) %s" % (name, scope_name(frame), frame.f_lineno, diff)) self.currentlane.indent -= 1 def process_exception(self, frame, arg): exc_class = arg[0] if issubclass(exc_class, basestring): retval = repr(exc_class) else: retval = exc_class.__name__ self.write_lane("E %s (%s:%s): %s" % (frame.f_code.co_name, scope_name(frame), frame.f_lineno, retval)) def process_c_exception(self, frame, arg): # Note that Python exceptions will also (eventually) # call process_return, but C exceptions won't. # So we figure time and dedent here. if self.time_calls: end = time.clock() start = self.currentlane.ccalltimes.pop() diff = " %.3fms" % ((end - start) * 1000) else: diff = "" # arg should be the called C function self.write_lane("e %s (%s:%s) %s: %s" % (frame.f_code.co_name, scope_name(frame), frame.f_lineno, diff, arg)) self.currentlane.indent -= 1 def process_line(self, frame, arg): self.write_lane(". %s (%s:%s)" % (frame.f_code.co_name, scope_name(frame), frame.f_lineno)) def process_threadstart(self, frame, arg): threadname = frame.f_locals['self'].getName() frame = frame.f_back self.write_lane("* %s (%s:%s) New Thread: %s" % (frame.f_code.co_name, scope_name(frame), frame.f_lineno, threadname)) def process_threadstop(self, frame, arg): threadname = frame.f_locals['self'].getName() self.write_lane("X Thread terminated: %s" % threadname) # Watches # def watch(self, obj, attr, default=None, name=None): """Log the state of obj.attr each time it changes.""" self.watches[(obj, attr)] = WatchedValue(obj, attr, default, name) def write_watches(self, changes_only=False): for w in self.watches.itervalues(): newvalue = w.getvalue() if (not changes_only) or newvalue != w.oldvalue: w.oldvalue = newvalue self.write_lane("= %s: %s" % (w.name, repr(newvalue))) def log(callback, filename, fileregex=".*", events=None): tr = Logger(fileregex, events=events) tr.out = open(filename, "wb") try: tr.start() callback() finally: tr.stop() tr.out.close() def log_times(globpath, pattern=".*"): """Return a dict of matching (float(time in ms), line) pairs for each file.""" import glob results = {} for fname in glob.glob(globpath): f = open(fname, 'rb') lines = [] for line in f.readlines(): line = line.rstrip() if line.endswith("ms") and re.search(pattern, line): line = line.rsplit(" ", 1) lines.append((float(line[1][:-2]), line[0])) results[fname] = lines f.close() return results # Verifier # SLEEPING = 2 class Node: """A node on an execution tree, with references to children.""" def __init__(self, data=None, exhausted=False, children=None): self.data = data self.exhausted = exhausted # The 'children' dict will eventually have one entry for each lane. self.children = children or {} self.result = None def __repr__(self): return "Node(%s, %s, %s, %s)" % (self.data, self.exhausted, self.children, self.result) class Verifier(Tracer): """A tool for proving the thread-safety of closed, finite machines.""" def __init__(self, fileregex="!(threading.py)", events=None): if events is None: events = non_c_events Tracer.__init__(self, fileregex, events) self.exhausted = False self.debug = False def start(self, caller=None): # Overridden because we don't need to set hooks on the main thread. self._using_profile_hook = False for e in c_events: if e in self.events: # Must also use the profile hook, # since tracing does not support C events. threading.setprofile(self.profhook) self._using_profile_hook = True break # Set the trace hook for all events other than C events # (it's easier to clean up nicely). threading.settrace(self.hook) self.active = True def hook(self, frame, event, arg): """Trace hook.""" if self.exhausted or not self.active: return filename = frame.f_code.co_filename # Locate the current lane. ct = thread.get_ident() lane = self.swimlanes.get(ct) if lane is None: # event from thread_wrapper. ignore it. return None # Deny tracing this module itself. if filename.endswith(r"\pyconquer.py"): process = False elif self.fileregex.startswith("!"): process = not re.search(self.fileregex[1:], filename) else: process = re.search(self.fileregex, filename) if process: if lane.state != TERMINATED: while lane.state == SLEEPING: # TODO: why .001 here, but pass is OK in test()? time.sleep(.001) if self.exhausted: return None # Handle this event if requested. if event in self.events: child = self.currentnode.children.get(lane.index) if child is None: data = {'index': lane.index, 'filename': frame.f_code.co_filename, 'function': frame.f_code.co_name, 'lineno': frame.f_lineno, } child = Node(data) self.currentnode.children[lane.index] = child # Continue down the tree. self.currentnode = child if child.exhausted: self.abort_path() else: self._select_subpath(child, lane) if self.debug: print ("e %s:%s" % (lane.index, frame.f_lineno)), return self.hook def _select_subpath(self, node, lane): """Sleep this lane and wake another.""" if self.debug: print ("[%s?" % sys._getframe().f_back.f_code.co_name), for nextlane in self.swimlanes.values(): if nextlane.state != TERMINATED: child = node.children.get(nextlane.index) if child and child.exhausted: # This subpath has been fully explored. continue if nextlane is lane: # Continue with the current lane. if self.debug: print ("= %s]" % lane.index), else: # Switch processing to a different lane. if lane.state != TERMINATED: lane.state = SLEEPING if self.debug: print ("> %s]" % nextlane.index), nextlane.state = RUNNING return else: # We've tried all possible subpaths from this node. # Tell all remaining threads to finish normally; the # current node will get marked "exhausted" inside test() # after all threads have terminated. self.abort_path() def abort_path(self): self.exhausted = True if self.debug: print ("X]"), for lane in self.swimlanes.values(): if lane.state == SLEEPING: lane.state = RUNNING def compile(self, func, concurrency=2): """Graph all possible execution paths for 'func' in self.tree. 'func' must be a closed system, with no external stimuli. External state is fine, as long as it is only modified from within 'func' during the test. If 'concurrency' is 2 or greater, then the code will be run by that many threads 'simultaneously'. The default is 2 threads. 'func' CANNOT be allowed to spawn its own threads. If 'concurrency' is 1, then 'func' will be run once, and the test will not prove anything. """ self.tree = Node(data={}) self.func_under_test = func try: self.start() ts = [] while True: if self.debug: print "\n----------------------------------" self.swimlanes = {} self.numlanes = 0 self.currentnode = self.tree self.exhausted = False # Start concurrent calls ts = [threading.Thread(target=self.thread_wrapper) for x in range(concurrency)] for t in ts: t.start() # Wait for all our threads to start and sleep while self.numlanes != concurrency: pass # All threads are sleeping; wake one up. self._select_subpath(self.tree, Swimlane(-1)) # Wait for all threads to complete. for t in ts: t.join() self.currentnode.exhausted = True if self.tree.exhausted: # Done with all runs. break finally: self.stop() def thread_wrapper(self): self.swimlanes[thread.get_ident()] = lane = Swimlane(self.numlanes) lane.state = SLEEPING self.numlanes += 1 try: self.currentnode.result = self.func_under_test() except Exception, x: lane.state = TERMINATED if self.debug: print "E", lane.index, self.currentnode.result = x self.abort_path() else: lane.state = TERMINATED if self.debug: print "T", lane.index, if not self.exhausted: # Wake another thread self._select_subpath(self.currentnode, lane) def ptree(self, node=None): """Return a pretty-printed version of the execution tree.""" def descend(node, depth): for k, v in node.children.iteritems(): output.append((" " * depth) + repr(v.data)) descend(v, depth + 1) output = [] if node is None: node = self.tree descend(node, 0) return "\n".join(output) def paths(self, node=None, tests=None): """Generate a list of paths which pass the given test(s).""" def descend(node, trail): trail = trail + [node,] success = True if tests: for test in tests: if not test(node): success = False break if success: yield trail for k, v in node.children.iteritems(): for item in descend(v, trail): yield item if node is None: node = self.tree for path in descend(node, []): yield path def has_errors(exc=(AssertionError,)): """A Verifier test; if node resulted in an error, return True.""" def err_test(node): if isinstance(node.result, exc): return True return False return err_test def failed_paths(func, tests=(has_errors(),), concurrency=2): """Generate all paths which resulted in an AssertionError.""" t = Verifier() t.compile(func, concurrency) for path in t.paths(tests=tests): yield path if __name__ == '__main__': _log = Logger() argv = sys.argv[1:] import getopt optmap = { '-e:': 'events=', '-f:': 'filename=', '-h': 'help', '-n': 'notimes', '-r:': 'regex=', } short_opts = ''.join([k[1:] for k in optmap]) long_opts = optmap.values() options, args = getopt.getopt(argv, short_opts, long_opts) for o, a in options: if o == '-e' or o == '--events': if a in ('non_c_events', 'c_events', 'all_events', 'all_call_events'): _log.events = set(globals()[a]) else: _log.events = set([x.strip() for x in a.split(",") if x.strip()]) elif o == '-f' or o == '--filename': _log.out = open(a, "wb") elif o == '-h' or o == '--help': print usage sys.exit() elif o == '-n' or o == '--notimes': _log.time_calls = False elif o == '-r' or o == '--regex': _log.fileregex = a __file__ = args[0] sys.path[0] = os.path.dirname(__file__) sys.argv = args try: _log.start() execfile(args[0]) finally: _log.stop() _log.out.close()