import datetime import getopt import os localDir = os.path.dirname(__file__) import sys import unittest import geniusql class TestHarness(object): """A test harness for Geniusql.""" cover = False conquer = False def __init__(self, available_tests): """Constructor to populate the TestHarness instance. available_tests should be a list of module names (strings). """ self.available_tests = available_tests self.tests = [] def load(self, args=sys.argv[1:]): """Populate a TestHarness from sys.argv. args defaults to sys.argv[1:], but you can provide a different set of args if you like. """ longopts = ['help', 'cover', 'conquer'] longopts.extend(self.available_tests) longopts.extend(['skip_zoo', 'skip_concurrency', 'skip_isolation', 'skip_connection']) try: opts, args = getopt.getopt(args, "", longopts) except getopt.GetoptError: # print help information and exit self.help() sys.exit(2) self.tests = [] for o, a in opts: if o == '--help': self.help() sys.exit() elif o == '--cover': self.cover = True elif o == '--conquer': self.conquer = True elif o.startswith('--skip'): pass else: o = o[2:] if o in self.available_tests and o not in self.tests: self.tests.append(o) if not self.tests: self.tests = self.available_tests[:] def help(self): """Print help for test.py command-line options.""" print """ Geniusql Test Program Usage: test.py -- --cover --conquer --skip_zoo --skip_concurrency --skip_isolation --skip_connection --cover: Run coverage tools --conquer: use pyconquer to trace calls tests:""" for name in self.available_tests: print ' --' + name def run(self): """Run the test harness.""" self.load() if self.cover: self.start_coverage() try: self._run() finally: self.stop_coverage() elif self.conquer: import pyconquer f = os.path.join(os.path.dirname(__file__), "conquer.log") tr = pyconquer.Logger("geniusql", events=pyconquer.all_events) tr.out = open(f, "wb") try: tr.start() self._run() finally: tr.stop() tr.out.close() else: self._run() def _run(self): """Run the test harness.""" # Delay the import of any geniusql module so coverage can work # on the import (including class and func defs) as well. from geniusql.test import tools tools.prefer_parent_path() import geniusql print "Python version:", sys.version.split()[0] print "Geniusql version:", geniusql.__version__ for testmod in self.tests: print print "Testing %s..." % testmod[5:] mod = __import__(testmod, globals(), locals(), ['']) if hasattr(mod, 'opts'): mod.opts['Prefix'] = 'test' if hasattr(mod, 'run'): mod.run() else: suite = unittest.TestLoader().loadTestsFromName(testmod) tools.TestRunner.run(suite) def start_coverage(self): """Start the coverage tool. To use this feature, you need to download 'coverage.py', either Gareth Rees' original implementation: http://www.garethrees.org/2001/12/04/python-coverage/ or Ned Batchelder's enhanced version: http://www.nedbatchelder.com/code/modules/coverage.html If neither module is found in PYTHONPATH, coverage is silently(!) disabled. """ try: from coverage import the_coverage as coverage c = os.path.join(localDir, "coverage.cache") coverage.cache_default = c if c and os.path.exists(c): os.remove(c) coverage.start() except ImportError: coverage = None self.coverage = coverage def stop_coverage(self): """Stop the coverage tool, save results, and report.""" if self.coverage: self.coverage.save() self.report_coverage() def report_coverage(self): """Print a summary from the code coverage tool.""" # Assume we want to cover everything in "../../geniusql/" basedir = os.path.normpath(os.path.join(os.getcwd(), localDir, '../')) basedir = basedir.lower() self.coverage.get_ready() morfs = [] for x in self.coverage.cexecuted: if x.lower().startswith(basedir): morfs.append(x) total_statements = 0 total_executed = 0 print print "CODE COVERAGE (this might take a while)", for morf in morfs: sys.stdout.write(".") sys.stdout.flush() ## name = os.path.split(morf)[1] if morf.find('test') != -1: continue try: _, statements, _, missing, readable = self.coverage.analysis2(morf) n = len(statements) m = n - len(missing) total_statements = total_statements + n total_executed = total_executed + m except KeyboardInterrupt: raise except: # No, really! We truly want to ignore any other errors. pass pc = 100.0 if total_statements > 0: pc = 100.0 * total_executed / total_statements print ("\nTotal: %s Covered: %s Percent: %2d%%" % (total_statements, total_executed, pc)) class Provider(object): db = None schema = None logname = os.path.join(localDir, "geniusqltest.log") def geniusqllog(self, message): """Geniusql logger (writes to error.log).""" if isinstance(message, unicode): message = message.encode('utf8') s = "%s %s" % (datetime.datetime.now().isoformat(), message) f = open(self.logname, 'ab') f.write(s + '\n') f.close() def setup(self, provider, opts): """Set up storage for Zoo classes.""" self.db = geniusql.db(provider, **opts) self.db.log = self.geniusqllog if self.db.name != ":memory:": # sqlite in memory database will be created immediately upon connection. assert self.db.exists() == False self.db.create() assert self.db.exists() == True self.schema = self.db.schema() self.schema.create() def teardown(self): """Tear down storage for Zoo classes.""" try: self.schema.drop() except (AttributeError, NotImplementedError): pass try: self.db.drop() except (AttributeError, NotImplementedError): pass self.db.connections.shutdown() #------------------------------------------------------------------------------- class DBConfig(object): """ A helper object to ease the setup of tests at the module level in geniusql. Example: # Initialize an in memory sqlite database. db = test.DBConfig('sqlite', {'name': ':memory:'}) # Register the setup/teardown functions at the module level for nose. setUp = db.setUp tearDown = db.tearDown # Instantiate a class that will need access to the db/schema # NOTE: class Phase1_ZooTests(zoo_fixture.Phase1_ZooTests): pass db.register_db_test_class(Phase1_ZooTests) """ #--------------------------------------------------------------------------- def __init__(self, db_type, opts): self.tests = [] self.db_type = db_type self.opts = opts self.db = None self.schema = None self.provider = None #--------------------------------------------------------------------------- def register_db_test_class(self, test_class): """ Registers the given class as needing access to the db/schema. When the setup is run, we will create a db/schema and dynamically attach it to each class registered in this manner as the 'db' and 'schema' attributes respectively. """ self.tests.append(test_class) #--------------------------------------------------------------------------- def setUp(self, obj): """ This method will setup the specified db class with the given options. It will also ensure that all registered test classes have access to the db/schema attributes. """ self.provider = Provider() self.provider.setup(self.db_type, self.opts) # Attach it to all of the tests that need access for test_class in self.tests: test_class.provider = self.provider test_class.db = self.provider.db test_class.schema = self.provider.schema #--------------------------------------------------------------------------- def tearDown(self, obj): """ Tear down the given provider. """ self.provider.teardown()