"""Test fixture for Geniusql.""" import datetime try: set except NameError: from sets import Set as set import sys import threading import time import traceback import unittest import warnings import geniusql from geniusql import errors, typerefs from geniusql.test import tools, test from geniusql import logic, logicfuncs logicfuncs.init() Jan_1_2001 = datetime.date(2001, 1, 1) every13days = [Jan_1_2001 + datetime.timedelta(x * 13) for x in range(20)] every17days = [Jan_1_2001 + datetime.timedelta(x * 17) for x in range(20)] del x # TODO STRABS: Evaluate if we want this or not, I think we do. warnings.filterwarnings("ignore", "The given precision and scale*") class ZooTestsBaseClass(unittest.TestCase): db = None schema = None provider = None class Phase1_ZooTests(ZooTestsBaseClass): """ The ZooTests setup the schema, and create the tables. The rest of the tests depend on them, so we do this one as a first step and the rest as a second step. Because nose orders its tests alphabetically we will call them "Phase1" and "Phase2" respectively. """ def assertEqualSet(self, a, b): self.assertEqual(set(a), set(b)) def test_01_create_tables(self): Animal = self.schema.table('Animal') Animal['ID'] = self.schema.column(int, autoincrement=True, key=True) ## Animal.add_index('ID') Animal['ZooID'] = self.schema.column(int) Animal['Name'] = self.schema.column(hints={'bytes': 100}) Animal['Species'] = self.schema.column(hints={'bytes': 100}) Animal['Legs'] = self.schema.column(int, default=4) Animal['PreviousZoos'] = self.schema.column(list, hints={'bytes': 8000}) Animal['LastEscape'] = self.schema.column(datetime.datetime) Animal['Birthdate'] = self.schema.column(datetime.datetime, hints={'timezone_aware': True}) Animal['Lifespan'] = self.schema.column(float, hints={'precision': 4}) Animal['Age'] = self.schema.column(datetime.timedelta) Animal['MotherID'] = self.schema.column(int) Animal['PreferredFoodID'] = self.schema.column(int) Animal['AlternateFoodID'] = self.schema.column(int) Animal.add_index('ZooID') Animal.references['Animal'] = ('ID', 'Animal', 'MotherID') Animal.references['Visit'] = ('ID', 'Visit', 'AnimalID') self.schema['Animal'] = Animal Zoo = self.schema.table('Zoo') Zoo['ID'] = self.schema.column(int, autoincrement=True, key=True) Zoo.add_index('ID') Zoo['Name'] = self.schema.column() Zoo['Founded'] = self.schema.column(datetime.date) Zoo['Opens'] = self.schema.column(datetime.time) Zoo['LastEscape'] = self.schema.column(datetime.datetime) if typerefs.fixedpoint: # Explicitly set precision and scale so test_msaccess # can test CURRENCY type Zoo['Admission'] = self.schema.column(typerefs.fixedpoint.FixedPoint, hints={'precision': 6, 'scale': 2}) else: Zoo['Admission'] = self.schema.column(float) Zoo.references['Animal'] = ('ID', 'Animal', 'ZooID') self.schema['Zoo'] = Zoo Food = self.schema.table('Food') Food['ID'] = self.schema.column(int, autoincrement=True, key=True) Food.add_index('ID') Food['Name'] = self.schema.column() Food['NutritionValue'] = self.schema.column(int) Food.references['Animal'] = ('ID', 'Animal', 'PreferredFoodID') Animal.references['Alternate Food'] = ('AlternateFoodID', 'Food', 'ID') self.schema['Food'] = Food Vet = self.schema.table('Vet') Vet['ID'] = c = self.schema.column(int, autoincrement=True, key=True) c.initial = 200 Vet.add_index('ID') Vet['Name'] = self.schema.column() Vet['NameLen'] = self.schema.column(int) Vet['ZooID'] = self.schema.column(int) Vet.add_index('ZooID') Vet['City'] = self.schema.column() Vet.references['Zoo'] = ('ZooID', 'Zoo', 'ID') Vet.references['Visit'] = ('ID', 'Visit', 'VetID') self.schema['Vet'] = Vet Visit = self.schema.table('Visit') Visit['ID'] = self.schema.column(int, autoincrement=True, key=True) Visit.add_index('ID') Visit['VetID'] = self.schema.column(int) Visit.add_index('VetID') Visit['ZooID'] = self.schema.column(int) Visit.add_index('ZooID') Visit['AnimalID'] = self.schema.column(int) Visit.add_index('AnimalID') Visit['Date'] = self.schema.column(datetime.date) Visit.references['Animal'] = ('AnimalID', 'Animal', 'ID') self.schema['Visit'] = Visit Exhibit = self.schema.table('Exhibit') # Make this a string to help test vs unicode. # Also, use a smaller field size so Firebird doesn't choke # when forming the primary key. # See http://www.volny.cz/iprenosil/interbase/ip_ib_indexcalculator.htm Exhibit['Name'] = self.schema.column(str, key=True, hints={'bytes': 192}) Exhibit.add_index('Name') Exhibit['ZooID'] = self.schema.column(int, key=True) Exhibit.add_index('ZooID') Exhibit['Animals'] = self.schema.column(list) Exhibit['PettingAllowed'] = self.schema.column(bool) Exhibit['Creators'] = self.schema.column(tuple) if typerefs.decimal: Exhibit['Acreage'] = self.schema.column(typerefs.decimal.Decimal) else: Exhibit['Acreage'] = self.schema.column(float) Exhibit.references['Zoo'] = ('ZooID', 'Zoo', 'ID') self.schema['Exhibit'] = Exhibit t = self.schema.table('NothingToDoWithZoos') t['ALong'] = self.schema.column(long, hints={'bytes': 1}) t['AFloat'] = self.schema.column(float, hints={'precision': 1}) if typerefs.decimal: t['ADecimal'] = self.schema.column(typerefs.decimal.Decimal, hints={'precision': 1, 'scale': 1}) if typerefs.fixedpoint: t['AFixed'] = self.schema.column(typerefs.fixedpoint.FixedPoint, hints={'precision': 1, 'scale': 1}) self.schema['NothingToDoWithZoos'] = t def test_02_populate(self): wap = self.schema['Zoo'].insert(Name='Wild Animal Park', Founded=datetime.date(2000, 1, 1), # 59 can give rounding errors with divmod, which # AdapterFromADO needs to correct. Opens=datetime.time(8, 15, 59), LastEscape=datetime.datetime(2004, 7, 29, 5, 6, 7), Admission=4.95, )['ID'] sdz = self.schema['Zoo'].insert(Name = 'San Diego Zoo', # This early date should play havoc with a number # of implementations. Founded = datetime.date(1835, 9, 13), Opens = datetime.time(9, 0, 0), Admission = 0, )['ID'] self.schema['Zoo'].insert(Name = u'Montr\xe9al Biod\xf4me', Founded = datetime.date(1992, 6, 19), Opens = datetime.time(9, 0, 0), Admission = 11.75, ) seaworld = self.schema['Zoo'].insert(Name = 'Sea_World', Admission = 60)['ID'] # Let's add a crazy futuristic Zoo to test large date values. lp = self.schema['Zoo'].insert(Name = 'Luna Park', Founded = datetime.date(2072, 7, 17), Opens = datetime.time(0, 0, 0), Admission = 134.95, )['ID'] # Animals leopardid = self.schema['Animal'].insert(Species='Leopard', Lifespan=73.5, Age=datetime.timedelta(365 * 10))['ID'] self.assertEqual(leopardid, 1) self.schema['Animal'].save(ID=leopardid, ZooID=wap, LastEscape=datetime.datetime(2004, 12, 21, 8, 15, 0, 999907)) lion = self.schema['Animal'].insert(Species='Lion', ZooID=wap)['ID'] self.schema['Animal'].insert(Species='Slug', Legs=1, Lifespan=.75, # Test our 8000-byte limit (ok, 7900) PreviousZoos=["f" * (7900 - 14)]) tiger = self.schema['Animal'].insert(Species='Tiger', ZooID=sdz, PreviousZoos=['animal\\universe'])['ID'] # Override Legs.default with itself just to make sure it works. self.schema['Animal'].insert(Species='Bear', Legs=4) # Notice that ostrich.PreviousZoos is [], whereas leopard is None. self.schema['Animal'].insert(Species='Ostrich', Legs=2, PreviousZoos=[], Lifespan=103.2) self.schema['Animal'].insert(Species='Centipede', Legs=100) emp = self.schema['Animal'].insert(Species='Emperor Penguin', Legs=2, ZooID=seaworld)['ID'] adelie = self.schema['Animal'].insert(Species='Adelie Penguin', Legs=2, ZooID=seaworld)['ID'] self.schema['Animal'].insert(Species='Millipede', Legs=1000000, ZooID=sdz, PreviousZoos=['Wild Animal Park']) # Add a mother and child to test relationships bai_yun = self.schema['Animal'].insert(Species='Ape', Name='Bai Yun', Legs=2) self.schema['Animal'].insert(Species='Ape', Name='Hua Mei', Legs=2, MotherID=bai_yun['ID']) # Exhibits self.schema['Exhibit'].insert(Name = 'The Penguin Encounter', ZooID = seaworld, Animals = [emp, adelie], PettingAllowed = True, Acreage = 3.1, # See http://www.aminus.net/dejavu/ticket/45 Creators = (u'Richard F\xfcrst', u'Sonja Martin'), ) self.schema['Exhibit'].insert(Name = 'Tiger River', ZooID = sdz, Animals = [tiger], PettingAllowed = False, Acreage = 4, ) # Vets cs = self.schema['Vet'].insert(Name = 'Charles Schroeder', ZooID = sdz) self.assertEqual(cs['ID'], self.schema['Vet']['ID'].initial) jm = self.schema['Vet'].insert(Name = 'Jim McBain', ZooID = seaworld)['ID'] # Visits for d in every13days: self.schema['Visit'].insert(VetID=cs['ID'], AnimalID=tiger, Date=d) for d in every17days: self.schema['Visit'].insert(VetID=jm, AnimalID=emp, Date=d) # Foods dead_fish = self.schema['Food'].insert(Name="Dead Fish", Nutrition=5)['ID'] live_fish = self.schema['Food'].insert(Name="Live Fish", Nutrition=10)['ID'] bunnies = self.schema['Food'].insert(Name="Live Bunny Wabbit", Nutrition=10)['ID'] steak = self.schema['Food'].insert(Name="T-Bone", Nutrition=7)['ID'] # Foods --> add preferred and alternate foods self.schema['Animal'].save(ID=lion, PreferredFoodID=steak, AlternateFoodID=bunnies) self.schema['Animal'].save(ID=tiger, PreferredFoodID=bunnies, AlternateFoodID=steak) self.schema['Animal'].save(ID=emp, PreferredFoodID=live_fish, AlternateFoodID=dead_fish) self.schema['Animal'].save(ID=adelie, PreferredFoodID=live_fish, AlternateFoodID=dead_fish) def test_03_Properties(self): # Zoos WAP = self.schema['Zoo'].select(Name='Wild Animal Park') self.assertEqual(WAP['Founded'], datetime.date(2000, 1, 1)) self.assertEqual(WAP['Opens'], datetime.time(8, 15, 59)) if typerefs.fixedpoint: self.assertEqual(WAP['Admission'], typerefs.fixedpoint.FixedPoint("4.95")) else: self.assertEqual(WAP['Admission'], 4.95) SDZ = self.schema['Zoo'].select(Founded=datetime.date(1835, 9, 13)) self.assertEqual(SDZ['Founded'], datetime.date(1835, 9, 13)) self.assertEqual(SDZ['Opens'], datetime.time(9, 0, 0)) self.assertEqual(SDZ['LastEscape'], None) self.assertEqual(float(SDZ['Admission']), 0) Biodome = self.schema['Zoo'].select(Name=u'Montr\xe9al Biod\xf4me') self.assertEqual(Biodome['Name'], u'Montr\xe9al Biod\xf4me') self.assertEqual(Biodome['Founded'], datetime.date(1992, 6, 19)) self.assertEqual(Biodome['Opens'], datetime.time(9, 0, 0)) self.assertEqual(Biodome['LastEscape'], None) self.assertEqual(float(Biodome['Admission']), 11.75) if typerefs.fixedpoint: seaworld = self.schema['Zoo'].select(lambda z: z.Admission == typerefs.fixedpoint.FixedPoint(60)) else: seaworld = self.schema['Zoo'].select(lambda z: z.Admission == float(60)) self.assertEqual(seaworld['Name'], u'Sea_World') # Animals leopard = self.schema['Animal'].select(lambda a: a.Species == 'Leopard') self.assertEqual(leopard['Species'], 'Leopard') self.assertEqual(leopard['Legs'], 4) self.assertEqual(leopard['Lifespan'], 73.5) self.assertEqual(leopard['ZooID'], WAP['ID']) self.assertEqual(leopard['PreviousZoos'], None) ostrich = self.schema['Animal'].select(Species='Ostrich') self.assertEqual(ostrich['Species'], 'Ostrich') self.assertEqual(ostrich['Legs'], 2) self.assertEqual(ostrich['ZooID'], None) self.assertEqual(ostrich['PreviousZoos'], []) self.assertEqual(ostrich['LastEscape'], None) millipede = self.schema['Animal'].select(Legs=1000000) self.assertEqual(millipede['Species'], 'Millipede') self.assertEqual(millipede['Legs'], 1000000) self.assertEqual(millipede['ZooID'], SDZ['ID']) self.assertEqual(millipede['PreviousZoos'], [WAP['Name']]) self.assertEqual(millipede['LastEscape'], None) # Test that strings in a list get decoded correctly. # See http://projects.amor.org/dejavu/ticket/50 tiger = self.schema['Animal'].select(Species='Tiger') self.assertEqual(tiger['PreviousZoos'], ["animal\\universe"]) # Test our 8000-byte limit (ok, 7900; the row is too long) # len(pickle.dumps(["f" * (7900 - 14)]) == 7900 slug = self.schema['Animal'].select(Species='Slug') self.assertEqual(len(slug['PreviousZoos'][0]), 7900 - 14) # Exhibits exes = self.schema['Exhibit'].select_all() self.assertEqual(len(exes), 2) if exes[0]['Name'] == 'The Penguin Encounter': pe = exes[0] tr = exes[1] else: pe = exes[1] tr = exes[0] self.assertEqual(pe['ZooID'], seaworld['ID']) self.assertEqual(len(pe['Animals']), 2) self.assertEqual(float(pe['Acreage']), 3.1) self.assertEqual(pe['PettingAllowed'], True) self.assertEqual(pe['Creators'], (u'Richard F\xfcrst', u'Sonja Martin')) self.assertEqual(tr['ZooID'], SDZ['ID']) self.assertEqual(len(tr['Animals']), 1) self.assertEqual(float(tr['Acreage']), 4) self.assertEqual(tr['PettingAllowed'], False) def test_04_Expressions(self): def matches(lam, tkey='Animal'): data = self.schema[tkey].select_all(lam) return len(data) self.assertEqual(matches(None, 'Zoo'), 5) self.assertEqual(matches(lambda x: True), 12) self.assertEqual(matches(lambda x: x.Legs == 4), 4) self.assertEqual(matches(lambda x: x.Legs == 2), 5) self.assertEqual(matches(lambda x: x.Legs >= 2 and x.Legs < 20), 9) self.assertEqual(matches(lambda x: x.Legs > 10), 2) self.assertEqual(matches(lambda x: x.Lifespan > 70), 2) self.assertEqual(matches(lambda x: x.Species.startswith('L')), 2) self.assertEqual(matches(lambda x: x.Species.endswith('pede')), 2) self.assertEqual(matches(lambda x: x.LastEscape != None), 1) self.assertEqual(matches(lambda x: x.LastEscape is not None), 1) self.assertEqual(matches(lambda x: None == x.LastEscape), 11) # In operator (containedby) self.assertEqual(matches(lambda x: 'pede' in x.Species), 2) self.assertEqual(matches(lambda x: x.Species in ('Lion', 'Tiger', 'Bear')), 3) # Try In with cell references class thing(object): pass pet, pet2 = thing(), thing() pet.Name, pet2.Name = 'Slug', 'Ostrich' self.assertEqual(matches(lambda x: x.Species in (pet.Name, pet2.Name)), 2) # logic and other functions self.assertEqual(matches(lambda x: ieq(x.Species, 'slug')), 1) self.assertEqual(matches(lambda x: icontains(x.Species, 'PEDE')), 2) self.assertEqual(matches(lambda x: icontains(('Lion', 'Banana'), x.Species)), 1) f = lambda x: icontainedby(x.Species, ('Lion', 'Bear', 'Leopard')) self.assertEqual(matches(f), 3) name = 'Lion' self.assertEqual(matches(lambda x: len(x.Species) == len(name)), 3) # This broke sometime in 2004. Rev 32 seems to have fixed it. self.assertEqual(matches(lambda x: 'i' in x.Species), 7) # Test now(), today(), year(), month(), day() self.assertEqual(matches(lambda x: x.Founded != None and x.Founded < today(), 'Zoo'), 3) self.assertEqual(matches(lambda x: x.LastEscape == now()), 0) self.assertEqual(matches(lambda x: year(x.LastEscape) == 2004), 1) self.assertEqual(matches(lambda x: month(x.LastEscape) == 12), 1) self.assertEqual(matches(lambda x: day(x.LastEscape) == 21), 1) # Test AND, OR with CannotRepresent. # Notice that we reference a method ('count') which no # known SM handles, so it will default back to Expr.eval(). # [u'Leopard', u'Slug', u'Bear', u'Ostrich', u'Centipede', # u'Millipede', u'Ape', u'Ape', u'Lion', u'Tiger', # u'Emperor Penguin', u'Adelie Penguin'] e = lambda x: 'p' in x.Species and x.Species.count('e') > 1 self.assertEqual(set([a['Species'] for a in self.schema['Animal'].select_all(e)]), set(['Centipede', 'Millipede', 'Emperor Penguin'])) # This broke in MSAccess (storeado) in April 2005, due to a bug in # SQLDecompiler.visit_CALL_FUNCTION (append TOS, not replace!). e = logic.Expression(lambda x, **kw: x.LastEscape != None and x.LastEscape >= datetime.datetime(kw['Year'], 12, 1) and x.LastEscape < datetime.datetime(kw['Year'], 12, 31) ) e.bind_args(Year=2004) self.assertEqual(matches(e), 1) # Test wildcards in LIKE. This fails with SQLite <= 3.0.8, # so make sure it's always at the end of this method so # it doesn't preclude running the other tests. self.assertEqual(matches(lambda x: "_" in x.Name, 'Zoo'), 1) # Temporarily insert a zoo we can test the escaping of # % symbols in LIKE queries some_other_zoo = self.schema['Zoo'].insert(Name = 'Other%%Zoo', Admission = 60)['ID'] try: self.assertEqual(matches(lambda x: "%" in x.Name, 'Zoo'), 1) finally: self.schema['Zoo'].delete(ID=some_other_zoo) # I noticed this failed on PostgreSQL when testing Table.delete_all. # Granted, not all float comparisons should work perfectly # (and we should mark more of them imperfect), but a straight # comparison with a known INSERTed value should probably work. self.assertEqual(matches(lambda x: x.Lifespan == 103.2), 1) # Test scalar columns data = self.db.select((self.schema['Zoo'], lambda z: [z.ID, 'foo'])) self.assertEqual(list(data), [[1, 'foo'], [2, 'foo'], [3, 'foo'], [4, 'foo'], [5, 'foo'], ]) # Test SELECT from no table at all data = self.db.select((self.schema, lambda: ['bar'])).scalar() self.assertEqual(data, 'bar') def assertEqualSecs(self, dt1, dt2, tolerance=1): """Assert that the two datetimes are within the given tolerance (secs).""" diff = abs(dt1 - dt2) self.assert_(diff < datetime.timedelta(0, tolerance), "%r and %r should be less than %s seconds apart, " "but differ by %r instead." % (dt1, dt2, tolerance, diff)) def test_04a_Binary_Expressions(self): def results(tablekey, fieldlist, restriction=None): data = list(self.db.select((self.schema[tablekey], fieldlist, restriction))) data.sort() return data td = datetime.timedelta # --------------------------- datetime --------------------------- # lastescape = self.schema['Zoo'].select(Name='Wild Animal Park')['LastEscape'] # datetime + timedelta -> datetime nextmonth = results('Zoo', lambda z: [z.LastEscape + td(28)], lambda z: z.LastEscape != None) self.assertEqual(nextmonth, [[lastescape + td(28)]]) # datetime - timedelta -> datetime ago = results('Zoo', lambda z: [z.LastEscape - td(14, 32)], lambda z: z.LastEscape != None) self.assertEqual(ago, [[lastescape - td(14, 32)]]) # timedelta + datetime -> datetime ago = results('Zoo', lambda z: [td(965, 86333, 300) + z.LastEscape], lambda z: z.LastEscape != None) self.assertEqualSecs(ago[0][0], td(965, 86333, 300) + lastescape) # datetime - datetime -> timedelta (including the 'now' func) elapsed = results('Zoo', lambda z: (z.Name, now() - z.LastEscape)) present = datetime.datetime.now() self.assertEqual(elapsed[:4], [[u'Luna Park', None], [u'Montr\xe9al Biod\xf4me', None], [u'San Diego Zoo', None], [u'Sea_World', None], ] ) self.assertEqualSecs(elapsed[4][1], present - lastescape, tolerance=2) # -------------------------- timedelta -------------------------- # # timedelta + timedelta -> timedelta leo_age = self.schema['Animal'].select(Species='Leopard')['Age'] for days in [1000, 10, 1, 0, -1, -10, -500, -1000]: for secs in (0, 1, 33, 100, 1000, 32767, 86399): diff = td(days, secs) data = results('Animal', lambda z: [z.Age + diff], lambda z: z.LastEscape != None) self.assertEqual(data[0][0], leo_age + diff) # timedelta - timedelta -> timedelta leo_age = self.schema['Animal'].select(Species='Leopard')['Age'] for days in [1000, 10, 1, 0, -1, -10, -500, -1000]: for secs in (0, 1, 33, 100, 1000, 32767, 86399): diff = td(days, secs) data = results('Animal', lambda z: [z.Age - diff], lambda z: z.LastEscape != None) self.assertEqual(data[0][0], leo_age - diff) # ----------------------------- date ----------------------------- # founded = results('Zoo', ['Name', 'Founded'], lambda z: z.Founded != None) # date + timedelta -> date data = results('Zoo', lambda z: (z.Name, z.Founded + td(365)), lambda z: z.Founded != None) self.assertEqual(data, [[name, f + td(365)] for name, f in founded]) # date - timedelta -> date data = results('Zoo', lambda z: (z.Name, z.Founded - td(14, 32)), lambda z: z.Founded != None) self.assertEqual(data, [[name, f - td(14, 32)] for name, f in founded]) # timedelta + date -> date data = results('Zoo', lambda z: (z.Name, td(965, 86333, 300) + z.Founded), lambda z: z.Founded != None) self.assertEqual(data, [[name, td(965, 86333, 300) + f] for name, f in founded]) # date - date -> timedelta data = results('Zoo', lambda z: (z.Name, z.Founded - datetime.date(1999, 12, 31)), lambda z: z.Founded != None) self.assertEqual(data, [[name, f - datetime.date(1999, 12, 31)] for name, f in founded]) # date - date -> timedelta (including the 'today' func) # If this fails only after midnight GMT, then your provider's # "today" function is probably using UTC, not local time. data = results('Zoo', lambda z: (z.Name, today() - z.Founded), lambda z: z.Founded != None) self.assertEqual(data, [[name, datetime.date.today() - f] for name, f in founded]) # ----------------------------- time ----------------------------- # # TODO - STRABS: Get the time tests working. #opens = results('Zoo', ['Name', 'Opens'], lambda z: z.Opens != None) # time + timedelta -> time #data = results('Zoo', lambda z: (z.Name, z.Opens + td(0, 3600*4)), #lambda z: z.Opens != None) #self.assertEqual(data, [[name, o + td(0, 3600*4)] for name, o in opens]) # time - timedelta -> time #data = results('Zoo', lambda z: (z.Name, z.Opens - td(0, 60*45)), #lambda z: z.Opens != None) #self.assertEqual(data, [[name, o - td(0, 60*45)] for name, o in opens]) # timedelta + time -> time #data = results('Zoo', lambda z: (z.Name, td(0, 300) + z.Opens), #lambda z: z.Opens != None) #self.assertEqual(data, [[name, td(0, 300) + o] #for name, o in opens]) # time - time -> timedelta #data = results('Zoo', #lambda z: (z.Name, z.Opens - datetime.time(1999, 12, 31)), #lambda z: z.Opens != None) #self.assertEqual(data, [[name, f - datetime.time(1999, 12, 31)] #for name, f in opens]) def test_05_Aggregates(self): try: Animal = self.schema['Animal'] Visit = self.schema['Visit'] Vet = self.schema['Vet'] Zoo = self.schema['Zoo'] # Subset of columns legs = [l for l, in self.db.select((Animal, ['Legs']))] legs.sort() self.assertEqual(legs, [1, 2, 2, 2, 2, 2, 4, 4, 4, 4, 100, 1000000]) expected = {'Leopard': 73.5, 'Slug': .75, 'Tiger': None, 'Lion': None, 'Bear': None, 'Ostrich': 103.2, 'Centipede': None, 'Emperor Penguin': None, 'Adelie Penguin': None, 'Millipede': None, 'Ape': None, } for species, lifespan in self.db.select((Animal, ['Species', 'Lifespan'])): if expected[species] is None: self.assertEqual(lifespan, None) else: self.assertAlmostEqual(expected[species], lifespan, places=5) expected = [u'Montr\xe9al Biod\xf4me', 'Wild Animal Park'] e = (lambda x: x.Founded != None and x.Founded <= today() and x.Founded >= datetime.date(1990, 1, 1)) values = [val[0] for val in self.db.select((self.schema['Zoo'], ['Name'], e))] for name in expected: self.assert_(name in values) # distinct legs = [x[0] for x in self.db.select((Animal, ['Legs']), distinct=True)] legs.sort() self.assertEqual(legs, [1, 2, 4, 100, 1000000]) # This may raise a warning on some DB's. f = (lambda x: x.Species == 'Lion') lionlegs = self.db.select((Animal, ['Legs'], f), distinct=True) self.assertEqual(list(lionlegs), [[4]]) # Test attribute lambdas visits = self.db.select((Animal << Visit << Vet, lambda a, v, vet: (a.Species, vet.Name), lambda a, v, vet: vet.Name != None), distinct = True) visits = list(visits) visits.sort() self.assertEqual(visits, [[u'Emperor Penguin', u'Jim McBain'], [u'Tiger', u'Charles Schroeder']]) # Test attribute lambdas with GROUP BY firstvisit = self.db.select((Animal << Visit, lambda a, v: (a.Species, min(v.Date)))) firstvisit = list(firstvisit) firstvisit.sort() self.assertEqual(firstvisit, [[u'Adelie Penguin', None], [u'Ape', None], [u'Bear', None], [u'Centipede', None], [u'Emperor Penguin', datetime.date(2001, 1, 1)], [u'Leopard', None], [u'Lion', None], [u'Millipede', None], [u'Ostrich', None], [u'Slug', None], [u'Tiger', datetime.date(2001, 1, 1)]] ) # Test implicit global funcs escapes = self.db.select((Animal, lambda a: (day(a.LastEscape),), lambda a: a.LastEscape != None)) escapes = list(escapes) escapes.sort() self.assertEqual(escapes, [[21]]) # If Python 2.4+, the above 3 lines could just be written: # self.assertEqual(sorted(escapes), [[21]]) # 'count' agg func visits = self.db.select((Animal << Visit, lambda a, v: (a.Species, count(v.Date)))) visits = list(visits) visits.sort() self.assertEqual(visits, [[u'Adelie Penguin', 0], [u'Ape', 0], [u'Bear', 0], [u'Centipede', 0], [u'Emperor Penguin', 20], [u'Leopard', 0], [u'Lion', 0], [u'Millipede', 0], [u'Ostrich', 0], [u'Slug', 0], [u'Tiger', 20]] ) # count agg func with no grouping animals = self.db.select((Animal, lambda a: [count(a.ID)])) self.assertEqual(list(animals), [[12]]) # 'now' func wap = Zoo.select(Name='Wild Animal Park') WAP_elapsed = (datetime.datetime.now() - wap['LastEscape']) elapsed = self.db.select((Zoo, lambda z: (z.Name, now() - z.LastEscape))) elapsed = list(elapsed) elapsed.sort() self.assertEqual(elapsed[:4], [[u'Luna Park', None], [u'Montr\xe9al Biod\xf4me', None], [u'San Diego Zoo', None], [u'Sea_World', None], ] ) # Assert that the expected interval and actual interval are # no more than 5 second apart. diff = abs(elapsed[4][1] - WAP_elapsed) self.assert_(diff <= datetime.timedelta(0, 5), diff) finally: self.db.connections.commit() def test_06_Editing(self): Zoo = self.schema['Zoo'] # Edit SDZ = Zoo.select(Name='San Diego Zoo') Zoo.save(ID=SDZ['ID'], Name='The San Diego Zoo', Founded = datetime.date(1900, 1, 1), Opens = datetime.time(7, 30, 0), Admission = "35.00") # Test edits SDZ = Zoo.select(Name='The San Diego Zoo') self.assertEqual(SDZ['Name'], 'The San Diego Zoo') self.assertEqual(SDZ['Founded'], datetime.date(1900, 1, 1)) self.assertEqual(SDZ['Opens'], datetime.time(7, 30, 0)) if typerefs.fixedpoint: self.assertEqual(SDZ['Admission'], typerefs.fixedpoint.FixedPoint(35, 2)) else: self.assertEqual(SDZ['Admission'], 35.0) # Change it back Zoo.save(ID=SDZ['ID'], Name = 'San Diego Zoo', Founded = datetime.date(1835, 9, 13), Opens = datetime.time(9, 0, 0), Admission = "0") # Test re-edits SDZ = Zoo.select(Name='San Diego Zoo') self.assertEqual(SDZ['Name'], 'San Diego Zoo') self.assertEqual(SDZ['Founded'], datetime.date(1835, 9, 13)) self.assertEqual(SDZ['Opens'], datetime.time(9, 0, 0)) if typerefs.fixedpoint: self.assertEqual(SDZ['Admission'], typerefs.fixedpoint.FixedPoint(0, 2)) else: self.assertEqual(SDZ['Admission'], 0.0) # Test a range of dates near 1900. I had some trouble with date # values near MSAccess' epoch (1899, 12, 30). # TODO: add an 'epoch' attribute to each DB with native dates # and test them all like this. WAPID = Zoo.select(Name='Wild Animal Park')['ID'] epoch = datetime.datetime(1899, 12, 30) for days in range(-5, +6): for secs in (0, 1, 33, 100, 1000, 32767, 86399): d = epoch + datetime.timedelta(days, secs) Zoo.save(ID=WAPID, LastEscape=d) data = self.db.select((Zoo, ['LastEscape'], lambda z: z.ID == WAPID)) self.assertEqual(list(data)[0][0], d) def test_06a_Edit_expressions(self): Vet = self.schema['Vet'] # Set all Vet.NameLen using a lambda for vet in Vet.select_all(): del vet['NameLen'] Vet.save(NameLen=lambda v: len(v.Name), **vet) for vet in Vet.select_all(): self.assertEqual(vet['NameLen'], len(vet['Name'])) # Try a lambda with insert SDZ = self.schema['Zoo'].select(Name='San Diego Zoo') gp = Vet.insert(Name = 'Geoff Pye', ZooID = SDZ['ID'], NameLen = lambda v: 2 + 3) gp = Vet.select(ID=gp['ID']) self.assertEqual(gp['NameLen'], 5) def test_07_Multiselect(self): try: f = (lambda z, a: z.Name == 'San Diego Zoo') zooed_animals = list(self.db.select((self.schema['Zoo'] & self.schema['Animal'], [('ID',), self.schema['Animal'].keys()], f))) self.assertEqual(len(zooed_animals), 2) SDZ = self.schema['Zoo'].select(Name='San Diego Zoo') d = [] for row in zooed_animals: self.assertEqual(row[0], SDZ['ID']) self.assertNotEqual(row, d) d = row # Assert that multiselects with no matching related units returns # no matches for the initial class (if joins are INNER). # We're also going to test that you can combine a one-arg expr # with a two-arg expr. sdexpr = logic.filter(Name='San Diego Zoo') leo = lambda z, a: a.Species == 'Leopard' zooed_animals = list(self.db.select((self.schema['Zoo'] & self.schema['Animal'], [('ID',), ('ID', )], sdexpr + leo))) self.assertEqual(len(zooed_animals), 0) # Now try the same query with INNER, LEFT, and RIGHT JOINs. zooed_animals = list(self.db.select((self.schema['Zoo'] & self.schema['Animal'], [('Name', ), ('Species', )]))) self.assertEqual(len(zooed_animals), 6) self.assertEqualSet([tuple(row) for row in zooed_animals], [("Wild Animal Park", "Leopard"), ("Wild Animal Park", "Lion"), ("San Diego Zoo", "Tiger"), ("San Diego Zoo", "Millipede"), ("Sea_World", "Emperor Penguin"), ("Sea_World", "Adelie Penguin")]) zooed_animals = list(self.db.select((self.schema['Zoo'] >> self.schema['Animal'], [('Name', ), ('Species', )]))) self.assertEqual(len(zooed_animals), 12) self.assertEqualSet([tuple(row) for row in zooed_animals], [("Wild Animal Park", "Leopard"), ("Wild Animal Park", "Lion"), ("San Diego Zoo", "Tiger"), ("San Diego Zoo", "Millipede"), ("Sea_World", "Emperor Penguin"), ("Sea_World", "Adelie Penguin"), (None, "Slug"), (None, "Bear"), (None, "Ostrich"), (None, "Centipede"), (None, "Ape"), (None, "Ape"), ]) zooed_animals = list(self.db.select((self.schema['Zoo'] << self.schema['Animal'], [('Name', ), ('Species', )]))) self.assertEqual(len(zooed_animals), 8) self.assertEqualSet([tuple(row) for row in zooed_animals], [("Wild Animal Park", "Leopard"), ("Wild Animal Park", "Lion"), ("Luna Park", None), ("San Diego Zoo", "Tiger"), ("San Diego Zoo", "Millipede"), ("Sea_World", "Emperor Penguin"), ("Sea_World", "Adelie Penguin"), (u'Montr\xe9al Biod\xf4me', None), ]) # Try a multiple-arg expression f = (lambda a, z: a.Legs >= 4 and z.Admission < 10) animal_zoos = list(self.db.select((self.schema['Animal'] & self.schema['Zoo'], [('Species',), self.schema['Zoo'].keys()], f), strict=False)) self.assertEqual(len(animal_zoos), 4) names = [row[0] for row in animal_zoos] names.sort() self.assertEqual(names, ['Leopard', 'Lion', 'Millipede', 'Tiger']) # Let's try three joined classes just for the sadistic fun of it. tree = (self.schema['Animal'] >> self.schema['Zoo']) >> self.schema['Vet'] f = (lambda a, z, v: z.Name == 'Sea_World') entries = list(self.db.select((tree, [('ID',), ('ID',), ('ID', 'Name')], f))) self.assertEqual(len(entries), 2) # Let's try three joined classes just for the sadistic fun of it. tree = (self.schema['Animal'] & self.schema['Zoo']) & self.schema['Vet'] f = (lambda a, z, v: z.Name == 'Sea_World') entries = list(self.db.select((tree, [('ID',), ('ID',), ('ID', 'Name')], f))) self.assertEqual(len(entries), 2) tree = self.schema['Animal'] << self.schema['Zoo'] f = (lambda a, z, v: z.Name == 'Sea_World') entries = list(self.db.select((tree, [('ID',), ('ID',), ('ID', 'Name')], f))) # THere are only two animals in sea world, so this join will return 2 rows. self.assertEqual(len(entries), 2) # Try mentioning the same class twice. tree = (self.schema['Animal'] << self.schema['Animal']) f = (lambda anim, mother: mother.ID != None) animals = list(self.db.select((tree, [(), ('Name', )], f))) self.assertEqual(animals, [['Hua Mei']]) finally: self.db.connections.commit() def test_08_CustomAssociations(self): try: # Try different association paths std_expected = ['Live Bunny Wabbit', 'Live Fish', 'Live Fish', 'T-Bone'] cus_expected = ['Dead Fish', 'Dead Fish', 'Live Bunny Wabbit', 'T-Bone'] uj = self.schema['Animal'] & self.schema['Food'] for path, expected in [# standard path (None, std_expected), # custom path ('Alternate Food', cus_expected)]: uj.path = path foods = list(self.db.select((uj, [(), ('Name',)]))) self.assertEqualSet([name for name, in foods], expected) finally: self.db.connections.commit() def test_09_delete(self): ostrich = self.schema['Animal'].select(Species='Ostrich') self.assert_(ostrich is not None) self.schema['Animal'].delete(**ostrich) ostrich = self.schema['Animal'].select(Species='Ostrich') self.assertEqual(ostrich, None) # Re-create the ostrich and try deleting it with a non-ID kwarg. self.schema['Animal'].insert(Species='Ostrich', Legs=2, PreviousZoos=[], Lifespan=103.2) ostrich = self.schema['Animal'].select(Species='Ostrich') self.assert_(ostrich is not None) self.schema['Animal'].delete_all(Species='Ostrich') ostrich = self.schema['Animal'].select(Species='Ostrich') self.assertEqual(ostrich, None) def test_10_timezone_aware(self): if self.db.__class__.__name__.endswith('PgDatabase'): # Exercise the timezone aware dbtype supported by postgresql. class Timezone(datetime.tzinfo): def __init__(self, offset=0): self._offset = offset self._timedelta = datetime.timedelta(seconds=offset * 60) def utcoffset(self, dt): return self._timedelta def dst(self, dt): return datetime.timedelta(0) self.schema['Animal'].insert( Species='Eagle', Birthdate=datetime.datetime(2011, 01, 01, 07, 31, 12, 100, tzinfo=Timezone(240)) ) self.schema['Animal'].insert( Species='Beagle', Birthdate=datetime.datetime(2011, 01, 01, 07, 31, 12, 100, tzinfo=Timezone(-240)) ) eagle = self.schema['Animal'].select(Species='Eagle') d = eagle['Birthdate'].astimezone(Timezone(240)) self.assertEqual(2011, d.year) self.assertEqual(01, d.month) self.assertEqual(01, d.day) self.assertEqual(07, d.hour) self.assertEqual(31, d.minute) self.assertEqual(12, d.second) self.assertEqual(100, d.microsecond) beagle = self.schema['Animal'].select(Species='Beagle') d = beagle['Birthdate'].astimezone(Timezone(-240)) self.assertEqual(2011, d.year) self.assertEqual(01, d.month) self.assertEqual(01, d.day) self.assertEqual(07, d.hour) self.assertEqual(31, d.minute) self.assertEqual(12, d.second) self.assertEqual(100, d.microsecond) # Ensure that different timezones between server and client work for selects. data, _ = self.db.fetch("SELECT age(now() at time zone 'UTC', now())", self.db.connections.get()) server_offset = data[0][0].seconds / 60 dtz5 = datetime.datetime(2011, 1, 1, 13, 30, 30, 0, tzinfo=Timezone(-server_offset)) dtz6 = datetime.datetime(2011, 1, 1, 12, 30, 30, 0, tzinfo=Timezone(-server_offset - 120)) dtzn = datetime.datetime(2011, 1, 1, 12, 30, 30, 0) self.schema['Animal'].insert(Species='Cows', Birthdate=dtz5) # Using a timezone aware filter should exclude all cows. expr = logic.Expression(lambda x: x.Birthdate >= dtz6 and x.Species == 'Cows') self.assertEqual(None, self.schema['Animal'].select(expr)) # Using a timezone naive filter should include all cows. expr = logic.Expression(lambda x: x.Birthdate >= dtzn and x.Species == 'Cows') cow = self.schema['Animal'].select(expr) self.assertEqual('Cows', cow['Species']) def test_primary_key_support(self): # Drop and re-add the PK on, oh, how about the Animal table? Animal = self.schema['Animal'] Animal.set_primary() Animal = self.schema.discover(Animal.name) if self.schema.db.pks_must_be_indexed: self.assertEqual(len(Animal.indices), 2) else: # Since we did not add an index on Animal.ID... self.assertEqual(len(Animal.indices), 1) def test_insert_into(self): newtable = self.db.insert_into('fishers', (self.schema['Animal'] << self.schema['Food'], [('ID', 'Species', 'ZooID'), ('ID', 'Name')], lambda a, f: f.Name == 'Live Fish')) self.assertEqualSet(newtable.keys(), ['ID', 'Species', 'ZooID', 'Food_ID', 'Name']) results = newtable.select_all() results.sort() self.assertEqual(results, [{'Food_ID': 2, 'ZooID': 4, 'Name': u'Live Fish', 'ID': 8, 'Species': u'Emperor Penguin'}, {'Food_ID': 2, 'ZooID': 4, 'Name': u'Live Fish', 'ID': 9, 'Species': u'Adelie Penguin'}] ) def test_order_by(self): d = datetime.date # We're testing several things, here: # 1. The basic self.db.select call # 2. ORDER BY with projection # 3. ORDER BY when projection is in a different column order self.assertEqual(list(self.db.select((self.schema['Zoo'], ['Founded', 'Name'], None), order=lambda z: [z.Name])), [[d(2072, 7, 17), u'Luna Park'], [d(1992, 6, 19), u'Montr\xe9al Biod\xf4me'], [d(1835, 9, 13), u'San Diego Zoo'], [None, u'Sea_World'], [d(2000, 1, 1), u'Wild Animal Park'], ]) # Test ORDER BY for the basic select_all method zoos = self.schema['Zoo'].select_all(order=lambda z: [z.Name]) self.assertEqual([z['Name'] for z in zoos], [u'Luna Park', u'Montr\xe9al Biod\xf4me', u'San Diego Zoo', u'Sea_World', u'Wild Animal Park']) # Test reversed() zoos = self.schema['Zoo'].select_all(order=lambda z: [reversed(z.Name)]) self.assertEqual([z['Name'] for z in zoos], [u'Wild Animal Park', u'Sea_World', u'San Diego Zoo', u'Montr\xe9al Biod\xf4me', u'Luna Park']) # Test the list format (instead of lambda) for query.order zoos = self.schema['Zoo'].select_all(order=['Name']) self.assertEqual([z['Name'] for z in zoos], [u'Luna Park', u'Montr\xe9al Biod\xf4me', u'San Diego Zoo', u'Sea_World', u'Wild Animal Park']) # Test the 'limit' option zoos = self.schema['Zoo'].select_all(order=['Name'], limit=3) self.assertEqual([z['Name'] for z in zoos], [u'Luna Park', u'Montr\xe9al Biod\xf4me', u'San Diego Zoo']) # Test limit with CannotRepresent. # Notice that we reference a method ('count') which no # known SM handles, so it will default back to Expr.eval(). # Since the expr is imperfect, the provider MUST return # more rows than specified by limit, which means the # select_all method must do its own limiting. # TODO - STRABS: This appears to have been incorrectly ported from # dejavu to here - according to the docs, this isn't # supported yet. What it's trying to do is to use as # much of the SQL expression as is possible and passing that # off to the DB, but overselecting the rows. IE, we'll get # all the rows we wanted, but also potentially more rows. # somewhere along the lines we are supposed to be using the # expression object to evaluate each row individually and # discarding those that don't pass. For now, this test is # invalid. #data = self.schema['Animal'].select_all( #lambda x: 'p' in x.Species and x.Species.count('e') > 1, #limit=2) #self.assertEqual(len(data), 2) def test_view_objects(self): # Create a View s = geniusql.Statement((self.schema['Visit'] << self.schema['Vet'] << self.schema['Animal'], lambda v, vet, a: [v.ID, v.Date, vet.Name, a.Species], lambda v, vet, a: a.Legs == 4 and month(v.Date) > 5)) if self.db.ordered_views: s.order = lambda v, vet, a: [reversed(v.Date)] self.schema['VisitView'] = VisitView = self.schema.view('VisitView', s) # Read from it. Note the restriction attenuates the one in the view. if self.db.ordered_views: data = VisitView.select_all(lambda v: month(v.Date) < 9) else: # Microsoft Access doesn't allow ORDER BY in a VIEW. data = VisitView.select_all(lambda v: month(v.Date) < 9, order=["Date DESC"]) self.assertEqual(data, [ ## [{'Date': datetime.date(2001, 9, 5), 'Name': u'Charles Schroeder', ## 'Species': 'Tiger', 'ID': 20}, {'Date': datetime.date(2001, 8, 23), 'Name': u'Charles Schroeder', 'Species': 'Tiger', 'ID': 19}, {'Date': datetime.date(2001, 8, 10), 'Name': u'Charles Schroeder', 'Species': 'Tiger', 'ID': 18}, {'Date': datetime.date(2001, 7, 28), 'Name': u'Charles Schroeder', 'Species': 'Tiger', 'ID': 17}, {'Date': datetime.date(2001, 7, 15), 'Name': u'Charles Schroeder', 'Species': 'Tiger', 'ID': 16}, {'Date': datetime.date(2001, 7, 2), 'Name': u'Charles Schroeder', 'Species': 'Tiger', 'ID': 15}, {'Date': datetime.date(2001, 6, 19), 'Name': u'Charles Schroeder', 'Species': 'Tiger', 'ID': 14}, {'Date': datetime.date(2001, 6, 6), 'Name': u'Charles Schroeder', 'Species': 'Tiger', 'ID': 13}, ## {'Date': datetime.date(2001, 5, 24), 'Name': u'Charles Schroeder', ## 'Species': 'Tiger', 'ID': 12}, ... ]) ## ## def test_zzz_Schema_Upgrade(self): ## # Must run last. ## zs = ZooSchema(arena) ## ## # In this first upgrade, we simulate the case where the code was ## # upgraded, and the database self.schema upgrade performed afterward. ## # The Schema.latest property is set, and upgrade() is called with ## # no argument (which should upgrade us to "latest"). ## Animal.set_property("ExhibitID") ## # Test numeric default (see hack in storeado for MS Access). ## prop = Animal.set_property("Stomachs", int) ## prop.default = 1 ## zs.latest = 2 ## zs.upgrade() ## ## # In this example, we simulate the developer who wants to put ## # model changes inline with database changes (see upgrade_to_3). ## # We do not set latest, but instead supply an arg to upgrade(). ## zs.upgrade(3) ## ## # Test that Animals have a new "Family" property, and an ExhibitID. ## box = arena.new_sandbox() ## try: ## emp = box.unit(Animal, Family='Emperor Penguin') ## self.assertEqual(emp.ExhibitID, 'The Penguin Encounter') ## finally: ## box.flush_all() class Phase2_IsolationTests(ZooTestsBaseClass): verbose = False _transid = 0 def setUp(self): try: self.old_implicit = self.db.connections.implicit_trans self.db.connections.implicit_trans = False self.old_tkey = self.db.connections.id # Use an explicit 'transid' for the transaction key self.db.connections.id = lambda: self.transid except AttributeError: self.old_implicit = None def tearDown(self): if self.old_implicit is not None: self.db.connections.implicit_trans = self.old_implicit self.db.connections.id = self.old_tkey def restore(self): self.transid = 0 self.db.connections.start() try: jim = self.schema['Vet'].select(Name = 'Jim McBain') self.schema['Vet'].save(ID = jim['ID'], City = None) except: self.db.connections.rollback() raise else: self.db.connections.commit() def cleanup_boxes(self): try: self.transid = 1 self.db.connections.rollback() except: pass try: self.transid = 2 self.db.connections.rollback() except: pass def attempt(self, testfunc, anomaly_name, level): print ".", self.restore() self.transid = 1 self.db.connections.start(level) self.transid = 2 self.db.connections.start(level) try: testfunc(level) except AssertionError: self.cleanup_boxes() if level.forbids(anomaly_name): warnings.warn("%r allowed anomaly %r." % (level, anomaly_name)) except: if self.db.is_timeout_error(sys.exc_info()[1]): self.cleanup_boxes() if not level.forbids(anomaly_name): warnings.warn("%r prevented anomaly %r with an error." % (level, anomaly_name)) else: self.cleanup_boxes() raise else: self.cleanup_boxes() if not level.forbids(anomaly_name): warnings.warn("%r prevented anomaly %r." % (level, anomaly_name)) def _get_transid(self): return self._transid def _set_transid(self, val): if self.verbose: print val, self._transid = val transid = property(_get_transid, _set_transid) def test_dirty_read(self): def dirty_read(level): # Write City 1 self.transid = 1 jim1 = self.schema['Vet'].select(Name = 'Jim McBain') self.schema['Vet'].save(ID = jim1['ID'], City = "Addis Ababa") # Read City 2. self.transid = 2 jim2 = self.schema['Vet'].select(Name = 'Jim McBain') # If READ UNCOMMITTED or lower, this should fail assert jim2['City'] is None for level in geniusql.isolation.levels: if self.verbose: print print level, if level.name in self.db.connections.isolation_levels: self.attempt(dirty_read, "Dirty Read", level) def test_nonrepeatable_read(self): def nonrepeatable_read(level): # Read City 1 self.transid = 1 jim1 = self.schema['Vet'].select(Name = 'Jim McBain') val1 = jim1['City'] assert val1 is None # Write City 2. self.transid = 2 jim2 = self.schema['Vet'].select(Name = 'Jim McBain') self.schema['Vet'].save(ID=jim2['ID'], City = "Tehachapi") self.db.connections.commit() # Re-read City 1 self.transid = 1 jim1 = self.schema['Vet'].select(Name = 'Jim McBain') # If READ COMMITTED or lower, this should fail assert jim1['City'] == val1 for level in geniusql.isolation.levels: if self.verbose: print print level, if level.name in self.db.connections.isolation_levels: self.attempt(nonrepeatable_read, "Nonrepeatable Read", level) def test_phantom(self): def phantom(level): # Read City 1 self.transid = 1 pvets = self.schema['Vet'].select_all(City = 'Poughkeepsie') assert len(pvets) == 0 # Write City 2. self.transid = 2 jim2 = self.schema['Vet'].select(Name = 'Jim McBain') self.schema['Vet'].save(ID = jim2['ID'], City = "Poughkeepsie") self.db.connections.commit() # Re-read City 1 self.transid = 1 pvets = self.schema['Vet'].select_all(City = 'Poughkeepsie') # If REPEATABLE READ or lower, this should fail assert len(pvets) == 0 for level in geniusql.isolation.levels: if self.verbose: print print level, if level.name in self.db.connections.isolation_levels: self.attempt(phantom, "Phantom", level) class Phase2_ConcurrencyTests(ZooTestsBaseClass): def test_Multithreading(self): if self.db.__class__.__name__ == 'SQLiteDatabase': print "These multithreading tests will often times segfault python with sqlite." if raw_input("Are you sure you want to continue (Y/n)").lower().startswith("n"): print "Skipping" return f = lambda x: x.Legs == 4 and x.Lifespan is not None def box_per_thread(): # Notice that, although we write changes in each thread, # we only assert the unchanged data, since the order of # thread execution can not be guaranteed. quadrupeds = self.schema['Animal'].select_all(f) self.assertEqual(len(quadrupeds), 1) first = quadrupeds[0] self.schema['Animal'].save(ID=first['ID'], Lifespan = first['Lifespan'] + 1.0) ts = [] # PostgreSQL, for example, has a default max_connections of 100. for x in range(99): t = threading.Thread(target=box_per_thread) t.start() ts.append(t) for t in ts: t.join() class Phase2_TransactionTests(ZooTestsBaseClass): def test_Implicit_Transactions(self): old_implicit = self.db.connections.implicit_trans try: def commit_test(): """Test transaction commit.""" try: try: now = datetime.time(8, 18, 28) WAP = self.schema['Zoo'].select(Name='Wild Animal Park') self.schema['Zoo'].save(ID=WAP['ID'], Opens = now) self.db.connections.commit() WAP = self.schema['Zoo'].select(Name='Wild Animal Park') self.assertEqual(WAP['Opens'], now) except: traceback.print_exc() raise finally: self.db.connections.commit() def rollback_test(): """Test transaction rollback.""" try: SDZ = self.schema['Zoo'].select(Name='San Diego Zoo') self.schema['Zoo'].save(ID=SDZ['ID'], Name = 'The One and Only San Diego Zoo', Founded = datetime.date(2039, 9, 13)) self.db.connections.rollback() SDZ = self.schema['Zoo'].select(Name='San Diego Zoo') self.assertEqual(SDZ['Name'], 'San Diego Zoo') self.assertEqual(SDZ['Founded'], datetime.date(1835, 9, 13)) finally: self.db.connections.commit() self.db.connections.implicit_trans = True commit_test() rollback_test() self.db.connections.implicit_trans = False self.db.connections.start() commit_test() self.db.connections.start() rollback_test() finally: self.db.connections.implicit_trans = old_implicit class Phase2_NumericTests(ZooTestsBaseClass): def test_numbers(self): float_prec = 53 box = arena.new_sandbox() try: print "precision:", # PostgreSQL should be able to go up to 1000 decimal digits (~= 2 ** 10), # but SQL constants don't actually overflow until 2 ** 15. Meh. self.db = getattr(arena.stores['testSM'], "self.db", None) if self.db: import math maxprec = self.db.typeset.numeric_max_precision if maxprec == 0: # SQLite, for example, must always use TEXT. # So we might as well try... oh... how about 3? overflow_prec = 3 else: overflow_prec = int(math.log(maxprec, 2)) + 1 else: overflow_prec = 8 dc = typerefs.decimal.getcontext() for prec in xrange(overflow_prec + 1): p = 2 ** prec print p, if p > dc.prec: dc.prec = p # We don't need to test at different 'scales'. long_done = False # Test scales at both extremes and the median for s in (0, int(prec/2), max(prec-1, 0)): s = 2 ** s # Modify the model and storage if not long_done: arena.drop_property(NothingToDoWithZoos, 'ALong') NothingToDoWithZoos.ALong.hints['bytes'] = p arena.add_property(NothingToDoWithZoos, 'ALong') if p <= float_prec: arena.drop_property(NothingToDoWithZoos, 'AFloat') NothingToDoWithZoos.AFloat.hints['precision'] = p arena.add_property(NothingToDoWithZoos, 'AFloat') if typerefs.decimal: arena.drop_property(NothingToDoWithZoos, 'ADecimal') NothingToDoWithZoos.ADecimal.hints['precision'] = p NothingToDoWithZoos.ADecimal.hints['scale'] = s arena.add_property(NothingToDoWithZoos, 'ADecimal') if typerefs.fixedpoint: arena.drop_property(NothingToDoWithZoos, 'AFixed') NothingToDoWithZoos.AFixed.hints['precision'] = p NothingToDoWithZoos.AFixed.hints['scale'] = s arena.add_property(NothingToDoWithZoos, 'AFixed') # Create an instance and set the specified precision/scale nothing = NothingToDoWithZoos() if not long_done: Lval = (16 ** p) - 1 setattr(nothing, 'ALong', Lval) if p <= float_prec: fval = float(((2 ** p) - 1) / (2 ** s)) setattr(nothing, 'AFloat', fval) nval = "1" * p nval = nval[:-s] + "." + nval[-s:] if typerefs.decimal: dval = typerefs.decimal.Decimal(nval) setattr(nothing, 'ADecimal', dval) if typerefs.fixedpoint: # fixedpoint uses "precision" where we use "scale"; # that is, number of digits after the decimal point. fpval = typerefs.fixedpoint.FixedPoint(nval, s) setattr(nothing, 'AFixed', fpval) box.memorize(nothing) # Flush and retrieve the object. Use comparisons to test # decompilation of imperfect_type when using large numbers. if not long_done: box.flush_all() nothing = box.unit(NothingToDoWithZoos, ALong=Lval) if nothing is None: self.fail("Unit not found by long property. " "prec=%s scale=%s" % (p, s)) if p <= float_prec: box.flush_all() nothing = box.unit(NothingToDoWithZoos, AFloat=fval) if nothing is None: self.fail("Unit not found by float property. " "prec=%s scale=%s" % (p, s)) if typerefs.decimal: box.flush_all() nothing = box.unit(NothingToDoWithZoos, ADecimal=dval) if nothing is None: self.fail("Unit not found by decimal property. " "prec=%s scale=%s" % (p, s)) if typerefs.fixedpoint: box.flush_all() nothing = box.unit(NothingToDoWithZoos, AFixed=fpval) if nothing is None: self.fail("Unit not found by fixedpoint property. " "prec=%s scale=%s" % (p, s)) # Test retrieved values. if not long_done: if nothing.ALong != Lval: self.fail("%s != %s prec=%s scale=%s" % (`nothing.ALong`, `Lval`, p, s)) if p <= float_prec: if nothing.AFloat != fval: self.fail("%s != %s prec=%s scale=%s" % (`nothing.AFloat`, `fval`, p, s)) if typerefs.decimal: if nothing.ADecimal != dval: self.fail("%s != %s prec=%s scale=%s" % (`nothing.ADecimal`, `dval`, p, s)) if typerefs.fixedpoint: if nothing.AFixed != fpval: self.fail("%s != %s prec=%s scale=%s" % (`nothing.AFixed`, `fpval`, p, s)) nothing.forget() box.flush_all() long_done = True finally: box.flush_all() class Phase2_ConnectionTests(ZooTestsBaseClass): def test_ConnClose(self): print "skipped (this feature is not yet implemented in all stores)", return Animal = self.schema['Animal'] # Start a sticky transaction so we use the same conn. connmgr = self.db.connections connmgr.start() # Test a normal query. self.assertEqual(len(Animal.select_all(lambda x: x.Legs == 4)), 4) # Take the connection down on our side. txkey = connmgr.id() conn = connmgr.transactions[txkey] connmgr._del_conn(conn) # Try another query. try: fourlegs = len(Animal.select_all(lambda x: x.Legs == 4)) self.assertEqual(fourlegs, 4) except: del connmgr.transactions[txkey] else: raise AssertionError("Connection did not fail.") def test_ConnUnreachable(self): if self.db.__class__.__name__ == 'SQLiteDatabase' and self.db.name == ":memory:": print "In memory :sqlite: databases cannot perform this test. Skipping" return if self.db.__class__.__name__ == 'FirebirdDatabase': test_sql = "SELECT 42 FROM rdb$database;" else: test_sql = "SELECT 42;" conn = self.db.connections.get() data, _ = self.db.fetch(test_sql, conn=conn) self.assertEqual(int(data[0][0]), 42) perform = raw_input("Disable the server and hit Enter (or 'N' to skip).") if perform.lower().startswith('n'): print "skipped", return try: self.db.fetch(test_sql, conn=conn) except: pass else: self.fail("db fetch did not raise an error.") raw_input("Enable the server and hit Enter.") data, _ = self.db.fetch(test_sql, conn=conn) self.assertEqual(int(data[0][0]), 42) class Phase2_SQLInjectionTests(ZooTestsBaseClass): debug = False def test_Typing(self): from geniusql.deparse import CannotRepresent try: data = self.db.select((self.schema['Zoo'], ['ID'], lambda z: z.Name > 12)) except ValueError, x: if "safely be translated to SQL" not in x.args[0]: raise else: self.fail("Type mismatch did not raise an error.") try: data = self.db.select((self.schema['Zoo'], lambda z: [z.Founded + 13.0])) except CannotRepresent, x: self.assert_(x.args[0].startswith("No binary function '+' between "), x.args[0]) else: self.fail("Type mismatch did not raise an error.") # Test attribute queries that are missing the containing list try: data = self.db.select((self.schema['Zoo'], lambda z: z.Founded + 13.0)) except ValueError, x: self.assertEqual(x.args[0], "Attribute AST roots must be Tuple or List, not Add") else: self.fail("Missing attribute list brackets did not raise ValueError.") def test_Escaping(self): # Test broadening of results for val in [r"\'' or 1=1", r"\'' or 1=1 or '", r"\' or 1=1 or ", r"\' or 1=1 or '", r"Ape' or 1=1 or '", r"Ape' or 1=1--", ]: data = self.db.select((self.schema['Animal'], ['ID'], lambda a: a.Species == val)) self.assertEqual(len(list(data)), 0, "%r was illegally allowed to broaden query results.") try: span = "4.0 or 1=1" data = self.db.select((self.schema['Animal'], ['ID'], lambda a: a.Lifespan == span)) except Exception, x: if self.debug: print x else: self.assertEqual(len(list(data)), 0) # Test multiple statement injection with mismatched types try: data = self.db.select((self.schema['Animal'], ['ID'], lambda a: a.Lifespan > '4; DELETE * FROM animal')) except Exception, x: if self.debug: print x else: self.fail("Injection of multiple statements did not error.") # Test multiple statement injection with bare scalars try: data = self.db.select((self.schema['Animal'], ['ID'], lambda a: a.Lifespan > 4 and 'DELETE * FROM animal')) except Exception, x: if self.debug: print x else: self.fail("Injection of multiple statements did not error. Returned %r" % list(data)) ##class ZooSchema(dejavu.Schema): ## ## # We set "latest" to 1 so we can test upgrading manually. ## latest = 1 ## ## def upgrade_to_2(self): ## self.arena.add_property(Animal, "Stomachs") ## self.arena.add_property(Animal, "ExhibitID") ## box = self.arena.new_sandbox() ## for exhibit in box.recall(Exhibit): ## for animalID in exhibit.Animals: ## # Use the Sandbox magic recaller method. ## a = box.Animal(animalID) ## if a: ## # Exhibits are identified by ZooID and Name ## a.ZooID = exhibit.ZooID ## a.ExhibitID = exhibit.Name ## box.flush_all() ## ## def upgrade_to_3(self): ## Animal.remove_property("Species") ## Animal.set_property("Family") ## ## # Note that we drop this column in a separate step from step 2. ## # If we had mixed model properties and SM properties in step 2, ## # we could have done this all in one step. But this is a better ## # demonstration of the possibilities. ;) ## Exhibit.remove_property("Animals") ## self.arena.drop_property(Exhibit, "Animals") ## ## self.arena.rename_property(Animal, "Species", "Family")