import datetime import linecache import os import sys import thread import threading __version__ = "0.2" delta_zero = datetime.timedelta(0) # Scale constants microseconds = usec = 1e6 milliseconds = msec = 1e3 seconds = 1 minutes = 1 / 60.0 hours = 1 / 3600.0 def seconds(td): """Return the given timedelta as a floating-point number of seconds.""" return (td.days * 86400) + td.seconds + (td.microseconds / 1e6) class Race(list): """A list of elapsed times.""" start = None end = None clicktime = None class Stopwatch(object): """A thread-safe code timer. Usage: sw = Stopwatch() sw.click('start') foo sw.click('foo done') bar sw.click('bar done') baz sw.click() print sw.format() sw.reset() Clicks and Races ---------------- When you call the "click" method, you store the elapsed time since the previous click (or 0 if this is the first click). When you call the "reset" method, that collection of elapsed times (and any labels) is stored as an individual "race" in self.races. Swimlanes --------- By default, calls to the "click" method are threadsafe; that is, you can use the same Stopwatch instance in multiple threads at the same time. Each thread runs its own race. Sometimes, however, you want to collect clicks across threads, or collect several 'races' of clicks in parallel within a single- threaded application. In these cases, you can supply any hashable value, or callable which produces a hashable value, as the swimlane argument to the constructor, and it will be used to isolate and aggregate clicks for that value. You can also pass individual swimlane arguments to the "click", "reset", and "format" methods. For example, you might be tracking various updates to a database row across several web requests; you can pass the row's PK as the swimlane argument to collect those clicks() together into a single race. Note that clicks are stored in RAM, so all clicks within the same race need to at least occur in the same process, if not the same thread. """ races = None """An ordered list of Race objects (lists of (index from self.points, elapsed)).""" points = None """A dict of (label, index) pairs.""" _current = None swimlane = None format_template = u"ST\u00f6PWATCH start: %(start)s clicks: %(points)s \u03a3: %(total)s" def __init__(self, swimlane=thread.get_ident): self.points = {} self.clear() self.swimlane = swimlane def get_current(self, swimlane=None): """Return the current race, creating a new Race if neceessary. If swimlane is None, self.swimlane is used. """ if swimlane is None: swimlane = self.swimlane if callable(swimlane): swimlane = swimlane() try: return self._current[swimlane] except KeyError: race = Race() self._current[swimlane] = race return race current = property(get_current, doc="The current race") def click(self, label=None, swimlane=None): """Record the elapsed time, using a label if given. If this is the first click for the current race, datetime.timedelta(0) is recorded as the elapsed time. If swimlane is None, self.swimlane is used. """ now = datetime.datetime.now() race = self.get_current(swimlane) if race.clicktime is None: elapsed = delta_zero race.start = now else: elapsed = now - race.clicktime race.clicktime = now line_count = len(self.points) if label is None: label = u'Point %d' % line_count idx = self.points.setdefault(label, line_count) race.append((idx, elapsed)) def get_labels(self): """Return a dict of (idx, label) pairs from self.points.""" # self.points is in (label, idx) order for max speed while gathering and # writing data. When reading, we want a reverse lookup, but it's # generally OK to provide that all at once with a big calculation: return dict((idx, label) for (label, idx) in self.points.iteritems()) def reset(self, swimlane=None, record=True): """Stop and return the current race. If 'record', store it in self.races. If swimlane is None, self.swimlane is used. """ if swimlane is None: swimlane = self.swimlane if callable(swimlane): swimlane = swimlane() current = self._current.pop(swimlane, Race()) # Use the last clicktime for the end time. current.end = current.clicktime if record: self.races.append(current) return current def clear(self): """Remove all current and recorded races.""" self.races = [] self._current = {} def format(self, race=None, swimlane=None): """Return the current race in a format suitable for logging. If race is None, the current race (defined either by the given swimlane or self.swimlane) will be used. """ if race is None: race = self.get_current(swimlane) labels = self.get_labels() p = {'points': u"".join([u"[%s=%s]" % (labels[idx], td) for idx, td in race]), 'start': race.start, 'total': sum([td for idx, td in race], delta_zero), } return self.format_template % p def _filter_minima(self, minimum): """Return {idx: bool(c.peak/total >= minimum)} for c in self.points.""" mask = dict((idx, False) for idx in self.points.itervalues()) for race in self.races: tot = sum([td for idx, td in race], delta_zero) if tot: tot = seconds(tot) for idx, td in race: if mask[idx]: # This index has already proven it meets the minimum. continue if (seconds(td) / tot) >= minimum: mask[idx] = True if False not in mask.itervalues(): return mask return mask def csv_gen(self, scale=1, minimum=None): """Yield CSV lines for all races recorded by this stopwatch. All numbers yielded will be multiplied by the 'scale' argument, which defaults to 1. Use stopwatch.usec (1e6), for example, to output microseconds instead of seconds with fractions. If 'minimum' is provided, it must be a fraction between 0.0 and 1.0, and will be used to filter out lines which have no peak times above the minimum percentage given of the total time for each race. For example, if minimum is 0.1, then for each column, at least one race must have that column take 10% of the total time for that race; if no such race exists, the column will not be included in the output. However, if any lines are filtered out in this way, an "Extra" column is included with the sum of all removed lines for each race. This allows you to reduce output width but still make proper percentage calculations wih the displayed lines. """ if minimum: minmask = self._filter_minima(minimum) else: minmask = {} if False not in minmask.itervalues(): # Fast path: labels = self.get_labels() order = self.points.values() order.sort() header = [] for idx in order: label = labels[idx] if not isinstance(label, basestring): label = repr(label) header.append('"%s"' % label.replace('"', '""')) yield u",".join(header) + os.linesep for race in self.races: cells = [""] * len(order) for idx, td in race: cells[order.index(idx)] = "%f" % (seconds(td) * scale) yield u",".join(cells) + os.linesep else: labels = self.get_labels() order = [idx for idx in minmask if minmask[idx]] order.sort() header = [] for idx in order: label = labels[idx] if not isinstance(label, basestring): label = repr(label) header.append('"%s"' % label.replace('"', '""')) header.append(u'"Extra"') yield u",".join(header) + os.linesep for race in self.races: cells = [""] * len(order) extra = 0 for idx, td in race: td = (seconds(td) * scale) if minmask[idx]: cells[order.index(idx)] = "%f" % td else: extra += td cells.append("%f" % extra) yield u",".join(cells) + os.linesep def write_csv(self, file, mode="ab+", scale=1, minimum=None): """Write CSV lines for all races recorded by this stopwatch to a file. All numbers yielded will be multiplied by the 'scale' argument, which defaults to 1. Use stopwatch.usec (1e6), for example, to output microseconds instead of seconds with fractions. """ # TODO: handle file objects, not just filenames open(file, mode).writelines(self.csv_gen(scale=scale, minimum=minimum)) def totals(self): """Return the total of all races (as a race itself). For example, given the races: Race([(0, td("00:00")), (1, td("00:02")), (2, td("00:04"))]) Race([(0, td("00:00")), (1, td("00:04")), (2, td("00:08"))]) Race([(0, td("00:00")), (1, td("00:06")), (2, td("00:18"))]) This will return: Race([(0, td("00:00")), (1, td("00:12")), (2, td("00:30"))]) """ t = dict((idx, delta_zero) for idx in self.points.itervalues()) for race in self.races: for idx, elapsed in race: if elapsed: t[idx] += elapsed totalrace = Race(sorted(t.items())) if self.races: totalrace.start = self.races[0].start totalrace.end = self.races[-1].end return totalrace def average(self): """Return the average of all races (as a race itself). For example, given the races: Race([(0, td("00:00")), (1, td("00:02")), (2, td("00:04"))]) Race([(0, td("00:00")), (1, td("00:04")), (2, td("00:08"))]) Race([(0, td("00:00")), (1, td("00:06")), (2, td("00:18"))]) This will return: Race([(0, td("00:00")), (1, td("00:04")), (2, td("00:10"))]) """ t = self.totals() numraces = len(self.races) avgrace = Race([(idx, (td / numraces)) for idx, td in t]) if self.races: avgrace.start = self.races[0].start avgrace.end = self.races[-1].end return avgrace class _GlobalHookLocalStorage(threading.local): def __init__(self): self.settrace_depth = 0 self.swimlane = 0 class _GlobalTraceHook(list): local = _GlobalHookLocalStorage() def append(self, autowatch): # Stick the autowatch on the list of calls to watch. list.append(self, autowatch) # Do some simple refcounting so we only hook in once # even if we encounter multiple autowatches. if self.local.settrace_depth == 0: # Because we're using a decorator, this call is always in the same # thread as the lines in our target function. So threading.settrace # is never needed; just call sys.settrace! sys.settrace(self.hook) # Make a new swimlane, which means a new race for our watch. self.local.swimlane = max(autowatch.watch._current.keys() + [0]) self.local.settrace_depth += 1 def remove(self, autowatch): self.local.settrace_depth -= 1 if self.local.settrace_depth == 0: sys.settrace(None) autowatch.watch.reset(swimlane=self.local.swimlane) list.remove(self, autowatch) def hook(self, frame, event, arg): if event == 'call': for autowatch in self[:]: curframe = frame for hop in xrange(autowatch.depth): if curframe.f_code is autowatch.target.func_code: if (# For non-methods, always allow: autowatch._instance is None or # But for methods, we only hook in if the autowatch # 'self' arg matches the current frame: autowatch._instance in curframe.f_locals.values()): return _LocalTraceHook(autowatch, self.local.swimlane, hop) break curframe = curframe.f_back if curframe is None: break return None _globaltracehook = _GlobalTraceHook() class _LocalTraceHook(object): lastline = None def __init__(self, autowatch, swimlane, depth): self.autowatch = autowatch self.swimlane = swimlane self.depth = depth def __call__(self, frame, event, arg): if event in ('line', 'return', 'exception'): filename = frame.f_globals["__file__"] if (filename.endswith(".pyc") or filename.endswith(".pyo")): filename = filename[:-1] if self.lastline is None: # Use the line just above the current one self.lastline = (filename, frame.f_lineno - 1, self.depth) self.autowatch.watch.click(self.lastline, swimlane=self.swimlane) if event == 'line': # Prep the label for the *next* click self.lastline = (filename, frame.f_lineno, self.depth) return self class Autowatch(object): """A wrapper to automatically watch significant lines of a single function. Usage: # Wrap the target func obj.func = Autowatch(obj.func, ) # Wait for some timing data to accumulate time.passes() # Unwrap our wrapper obj.func = obj.func.target """ def __init__(self, target, watch=None, enabled=True, depth=1, _instance=None, on_finish=None): self.target = target self.watch = watch or Stopwatch() self.enabled = enabled self.depth = depth self.swimlane = id(self) while self.swimlane in self.watch._current: self.swimlane += 1 self._instance = _instance self.on_finish = on_finish def trace(self): _globaltracehook.append(self) def untrace(self): _globaltracehook.remove(self) @property def is_tracing(self): return self in _globaltracehook def __call__(self, *args, **kwargs): if not self.enabled: if self._instance is None: return self.target(*args, **kwargs) else: return self.target(self._instance, *args, **kwargs) try: self.trace() if self._instance is None: result = self.target(*args, **kwargs) else: result = self.target(self._instance, *args, **kwargs) finally: self.untrace() if self.on_finish: self.on_finish(self) return result def __get__(self, instance, owner): if instance is None: return self # Make a new Autowatch instance each time a method gets an # instancemethod proxy, which is pretty much every time it's referenced. return self.__class__( target=self.target, watch=self.watch, enabled=self.enabled, depth=self.depth, _instance=instance, on_finish=self.on_finish) def get_trace(self, race): """Yield (filename, lineno, depth, linetext, elapsed td) for each race.""" labels = self.watch.get_labels() for idx, td in race: filename, lineno, depth = labels[idx] yield (filename, lineno, depth, linecache.getline(filename, lineno).rstrip()[:80], td) def report(self, race): """Yield (filename, lineno, depth, linetext, elapsed td) for each race. The last line yielded will be ("", 0, -1, "", total time). """ subtotals = [] for filename, lineno, depth, text, td in self.get_trace(race): if depth > len(subtotals) - 1: subtotals.append(delta_zero) elif depth < len(subtotals) - 1: td = subtotals.pop() else: subtotals = [t + td for t in subtotals] yield filename, lineno, depth, text, td if subtotals: yield "", 0, -1, "", subtotals[0] else: yield "", 0, -1, "", delta_zero def format(self, race, filedepth=2, percent_to_mark=10): """Yield a list of formatted lines reporting on the given race.""" tds = [(line[-1], i) for i, line in enumerate(self.report(race))] tds.sort() tds = tds[-int(len(tds) * (percent_to_mark / 100.0)):] top_i = [i for td, i in tds] del tds for i, line in enumerate(self.report(race)): filename, lineno, depth, text, td = line yield ('%s%s<%s:%d>%s (%s)' % ('*' if i in top_i else ' ', '-' * (depth * 4), '/'.join(filename.split('/')[-filedepth:]), lineno, text, td)) def autowatch(watch=None, enabled=True, depth=1): """A decorator to automatically watch significant lines of a single function. Usage: @stopwatch.autowatch() def f(a, b, c): ... [f(1, 2, 3) for i in range(100)] f.watch.write_csv('f.csv', scale=stopwatch.usec, minimum=0.1) """ def autowatch_wrapper(f): return Autowatch(target=f, watch=watch, enabled=enabled, depth=depth) return autowatch_wrapper