import ConfigParser import threading from types import ClassType from dejavu.containers import Graph from dejavu import logic, errors, xray __all__ = ['Arena', 'Sandbox', 'logflags', ] class Enum(object): pass # logging flags (see Arena.logflags) logflags = Enum() logflags.ERROR = 1 logflags.IO = 2 logflags.SQL = 4 logflags.MEMORIZE = 128 logflags.RECALL = 256 logflags.VIEW = 512 logflags.REPRESS = 1024 logflags.FORGET = 2048 logflags.SANDBOX = (logflags.MEMORIZE | logflags.RECALL | logflags.VIEW | logflags.REPRESS | logflags.FORGET) class Arena(object): """A namespace/workspace for a Dejavu application.""" def __init__(self): self._sync_lock = threading.Lock() self.stores = {} self._registered_classes = {} self.associations = Graph(directed=False) self.engine_functions = {} self.logflags = logflags.ERROR + logflags.IO def log(self, message): """Default logger (writes to stdout). Feel free to replace.""" if isinstance(message, unicode): print message.encode('utf8') else: print message def load(self, configFileName): """Load StorageManagers.""" parser = ConfigParser.ConfigParser() # Make names case-sensitive by overriding optionxform. parser.optionxform = unicode parser.read(configFileName) stores = [] for section in parser.sections(): opts = dict(parser.items(section)) stores.append((int(opts.get("Load Order", "0")), section, opts)) stores.sort() for order, name, options in stores: self.add_store(name, options[u'Class'], options) def add_store(self, name, store, options=None): """Register a StorageManager. The 'store' argument may be a StorageManager class, an instance of that class, or the full importable dotted-package name of the class. """ if isinstance(store, basestring): store = xray.classes(store)(self, options or {}) elif isinstance(store, (type, ClassType)): store = store(self, options or {}) self.stores[name] = store return store def remove_store(self, name): if name in self.stores: store = self.stores[name] # Disassociate all registered classes with this store. for c in self._registered_classes.keys(): if self._registered_classes[c] is store: self._registered_classes[c] = None del self.stores[name] def shutdown(self): """Shutdown the arena.""" # Tell all stores to shut down. stores = [(v.shutdownOrder, v, k) for k, v in self.stores.iteritems()] stores.sort() for order, store, name in stores: store.shutdown() def new_sandbox(self): """Return a new sandbox object in this Arena.""" return Sandbox(self) # --------------------- Unit Class Registration --------------------- # def register(self, cls): """Assert that Units of class 'cls' will be handled.""" # We must allow modules to register classes before any stores have # been added, but not overwrite a store which has already been found. if cls not in self._registered_classes: self._registered_classes[cls] = None # Register any association(s) in an undirected graph. for ua in cls._associations.itervalues(): if getattr(ua, "register", True): self.associations.connect(cls, ua.farClass) def register_all(self, globals): import dejavu seen = {} for obj in globals.itervalues(): if isinstance(obj, type) and issubclass(obj, dejavu.Unit): self.register(obj) seen[obj] = None return seen.keys() def class_by_name(self, classname): for cls in self._registered_classes: if cls.__name__ == classname: return cls raise KeyError("No registered class found for '%s'." % classname) def storage(self, cls): """Return the StorageManager which handles Units of the given class.""" store = self._registered_classes.get(cls) if store: return store self._sync_lock.acquire() try: # Search all stores for the class name. default_store = None clsname = cls.__name__ for store in self.stores.values(): if not store.classnames: # This store has no "classnames" list, which signals that it # handles all classes which are not handled by other stores. default_store = store elif clsname in store.classnames: store.sync([cls]) self._registered_classes[cls] = store return store # Name not found in any store's classnames. Try a default store. if default_store: default_store.sync([cls]) self._registered_classes[cls] = default_store return default_store finally: self._sync_lock.release() raise KeyError("No store found for '%s'." % clsname) def create_storage(self, cls): """Create storage space for cls.""" self.storage(cls).create_storage(cls) def has_storage(self, cls): return self.storage(cls).has_storage(cls) def drop_storage(self, cls): self.storage(cls).drop_storage(cls) def add_property(self, cls, name): self.storage(cls).add_property(cls, name) def drop_property(self, cls, name): self.storage(cls).drop_property(cls, name) def rename_property(self, cls, oldname, newname): self.storage(cls).rename_property(cls, oldname, newname) def migrate_class(self, cls, new_store): """Copy all units of cls to new_store.""" new_store.create_storage(cls) for unit in self.new_sandbox().xrecall(cls): new_store.reserve(unit) new_store.save(unit, True) def migrate(self, new_store, old_store=None, copy_only=False): """Copy all units (of old_store) to new_store.""" for cls in self._registered_classes: store = self.storage(cls) if old_store is None or old_store is store: self.migrate_class(cls, new_store) if not copy_only: self._registered_classes[cls] = new_store ########################################################################### ## ## ## Sandboxes ## ## ## ########################################################################### class Sandbox(object): """Data sandbox for Dejavu arenas. Each consumer (that is, each UI process or thread) maintains a Sandbox for managing Units. Sandboxes populate themselves with Units on a lazy basis, allowing UI code to request data as it's needed. However, once obtained, such Units are persisted (usually for the lifetime of the thread); this important detail means that multiple requests for the same Units result in multiple references to the same objects, rather than multiple objects. Sandboxes are basically what Fowler calls Identity Maps. The *REALLY* important thing to understand if you're customizing this is that Sandboxes won't survive sharing across threads--DON'T TRY IT. If you need to share unit data across requests, use or make an SM which persists the data, and chain it with another, more normal SM. _cache(), _caches, and _stores are private for a reason--don't access them from interface code--tell the Sandbox to do it for you. Starting with Python 2.5, each Sandbox instance is its own context manager, so you can have boxes automatically flush themselves when you're done, and automatically rollback on error. Example: # __future__ only needed for Python 2.5, not 2.6+ from __future__ import with_statement with arena.new_sandbox() as box: WAP = box.unit(Zoo, Name='Wild Animal Park') WAP.Opens = now """ def __init__(self, arena): self.arena = arena self._caches = {} def __getattr__(self, key): # Support "magic recaller" methods on self. for cls in self.arena._registered_classes.iterkeys(): name = cls.__name__ if name == key: def recaller(*args, **kwargs): # Allow identifiers to be supplied as args or kwargs # (since the common case will be a single identifier). for arg, key in zip(args, cls.identifiers): kwargs[str(key)] = arg expr = logic.filter(**kwargs) try: return self.xrecall(cls, expr).next() except StopIteration: return None recaller.__doc__ = "A single %s Unit, else None." % name return recaller raise AttributeError("Sandbox object has no attribute '%s'" % key) def memorize(self, unit): """Persist unit in storage.""" cls = unit.__class__ unit.sandbox = self # Ask the store to accept the unit, assigning it primary key values # if necessary. The store should also call unit.cleanse() if it # saves the whole unit state on this call. self.arena.storage(cls).reserve(unit) # Insert the unit into the cache. id = unit.identity() self._cache(cls)[id] = unit if self.arena.logflags & logflags.MEMORIZE: self.arena.log("MEMORIZE %s: %s" % (cls.__name__, id)) # Do this at the end of the func, since most on_memorize # will want to have an identity when called. if hasattr(unit, "on_memorize"): unit.on_memorize() def forget(self, unit): """Destroy unit, both in the cache and storage.""" cls = unit.__class__ id = unit.identity() if self.arena.logflags & logflags.FORGET: self.arena.log("FORGET %s: %s" % (cls.__name__, id)) self.arena.storage(cls).destroy(unit) del self._cache(cls)[id] # This must be done after the destroy() call, so that a # related unit can poll all instances of this class. if hasattr(unit, "on_forget"): unit.on_forget() unit.sandbox = None def xrecall(self, classes, expr=None, inherit=False, **kwargs): """Iterator over units of cls which match expr. If inherit is True, units of the given class and all registered subclasses of the given class will be recalled. """ if classes.__class__.__name__ == "UnitJoin": for unitrow in self.xmulti(classes, expr, **kwargs): yield unitrow return cls = classes if expr and not isinstance(expr, logic.Expression): expr = logic.Expression(expr) if kwargs: f = logic.filter(**kwargs) if expr: expr += f else: expr = f if self.arena.logflags & logflags.RECALL: self.arena.log("RECALL %s: %s" % (cls.__name__, expr)) if inherit: # Collect all registered subclasses of cls. # Note that cls is a subclass of itself. classes = [c for c in self.arena._registered_classes.iterkeys() if issubclass(c, cls)] else: classes = [cls] if not classes: # Even the requested class is not registered. raise errors.UnrecallableError("The '%s' class is not registered." % cls.__name__) for cls in classes: cache = self._cache(cls) # Special-case the scenario where one Unit is expected # and called by ID. We should be able to save a database hit. if expr: fc = expr.func.func_code if (fc.co_code == '|\x00\x00i\x01\x00d\x01\x00j\x02\x00S' and fc.co_names[-1] == 'ID'): ID = fc.co_consts[-1] unit = cache.get((ID,)) if unit is not None: # Do NOT call on_recall here. That should be called # only at the Sandbox-SM boundary. yield unit return # Query the cache. We have to use a static copy of the # keys, to ensure that our cache doesn't change size # during iteration (due to overlapping xrecalls). keys = cache.keys() for id in keys: unit = cache.get(id) if unit and ((expr is None) or expr.evaluate(unit)): # Do NOT call on_recall here. That should be called # only at the Sandbox-SM boundary. yield unit # Query Storage. for unit in self.arena.storage(cls).recall(cls, expr): id = unit.identity() # Don't offer up a unit that was already checked in our cache # (whether it matched the expr() or not--we assume the cache # has the freshest data). if id not in keys: # Very important that we check for existing unit, as its # state may have changed in memory but not in storage # (even between our cache yields and this yield). # Make sure the cache lookup and get happens atomically. existing = cache.get(id) if existing: yield existing else: unit.sandbox = self confirmed = True cache[id] = unit if hasattr(unit, 'on_recall'): try: unit.on_recall() except errors.UnrecallableError: confirmed = False if confirmed: yield unit def recall(self, classes, expr=None, inherit=False, **kwargs): """List of units of the given class which match expr. If inherit is True, units of the given class and all registered subclasses of the given class will be recalled. """ return [x for x in self.xrecall(classes, expr, inherit, **kwargs)] def xmulti(self, classes, expr=None, **kwargs): """Recall units of each cls if they together match the expr. Each yielded value will be a list of Units, in the same order as the classes arg. This facilitates unpacking in iterative consumer code like: for invoice, price in sandbox.xmulti(Invoice & Price, f): deal_with(invoice) deal_with(price) """ if expr and not isinstance(expr, logic.Expression): expr = logic.Expression(expr) if kwargs: f = logic.filter(**kwargs) if expr: expr += f else: expr = f if self.arena.logflags & logflags.RECALL: self.arena.log("RECALL %s %s" % (", ".join([c.__name__ for c in classes]), expr)) stores = [self.arena.storage(cls) for cls in classes] firststore = stores[0] for s in stores: if s is not firststore: raise ValueError(u"xmulti() does not support multiple" u" classes in disparate stores.") # This is broken. If a filter expr is supplied, then the store may # not return rows which our cache would, and those won't be included # in the resultset. If you're using xmulti with no expr's, or # in read-only scripts, it should be OK for now. But if you mutate # Units and then call multirecall, expect inconsistent results. for unitset in firststore.multirecall(classes, expr): confirmed = True for index in xrange(len(unitset)): unit = unitset[index] id = unit.identity() cache = self._cache(unit.__class__) if id in cache: # Keep the unit which is in our cache! unitset[index] = cache[id] else: cache[id] = unit unit.sandbox = self if hasattr(unit, 'on_recall'): try: unit.on_recall() except errors.UnrecallableError: confirmed = False break if confirmed: yield unitset def unit(self, cls, expr=None, inherit=False, **kwargs): """A single matching Unit, else None. **kwargs will be combined into an Expression via logic.filter. The first Unit matching that expression is returned; if no Units match, None is returned. """ try: return self.xrecall(cls, expr, inherit, **kwargs).next() except StopIteration: return None def view(self, cls, attrs, expr=None, **kwargs): """Iterator of all Property tuples.""" if expr and not isinstance(expr, logic.Expression): expr = logic.Expression(expr) if kwargs: f = logic.filter(**kwargs) if expr: expr += f else: expr = f if self.arena.logflags & logflags.VIEW: self.arena.log("VIEW %s [%s]: %s" % (cls.__name__, attrs, expr)) cache = self._cache(cls) for unit in cache.itervalues(): if expr is None or expr(unit): yield tuple([getattr(unit, attr) for attr in attrs]) # Add the identity attribute(s) if not present. This is necessary # to avoid duplicating objects which are already in our cache. fields = list(attrs) indices = [] added_fields = 0 for key in cls.identifiers: if key not in fields: added_fields += 1 fields.append(key) indices.append(fields.index(key)) for row in self.arena.storage(cls).view(cls, fields, expr): id = tuple([row[x] for x in indices]) if id not in cache: if added_fields: # Remove the added identifier columns from the row. row = row[:len(row) - added_fields] yield row def sum(self, cls, attr, expr=None, **kwargs): """Sum of all non-None values for the given cls.attr.""" expr = logic.Expression(lambda x: getattr(x, attr) != None) + expr return sum([row[0] for row in self.view(cls, (attr,), expr, **kwargs)]) def distinct(self, cls, attrs, expr=None, **kwargs): """List of distinct Property tuples. If only one attribute is specified, a list of values will be returned. If more than one attribute is specified, a zipped list will be returned. Notice that you can also use this function as a count() function (in fact it's the only way to do it) by using attrs = ['ID']. """ if expr and not isinstance(expr, logic.Expression): expr = logic.Expression(expr) if kwargs: f = logic.filter(**kwargs) if expr: expr += f else: expr = f if self.arena.logflags & logflags.VIEW: self.arena.log("DISTINCT %s [%s]: %s" % (cls.__name__, attrs, expr)) seen = {} cache = self._cache(cls) for unit in cache.itervalues(): if expr is None or expr(unit): row = tuple([getattr(unit, attr) for attr in attrs]) if row not in seen: seen[row] = None for row in self.arena.storage(cls).distinct(cls, attrs, expr): if row not in seen: seen[row] = None seen = seen.keys() seen.sort() if len(attrs) == 1: seen = [x[0] for x in seen] return seen def count(self, cls, expr): """Number of Units of class 'cls'.""" return len(self.distinct(cls, cls.identifiers, expr)) def range(self, cls, attr, expr=None, **kwargs): """Distinct, non-None attr values (ordered and continuous, if possible). If the given attribute is a known discrete, ordered type (like int, long, datetime.date), this returns the closed interval: [min(attr), ..., max(attr)] That is, all possible values will be output between min and max, even if they do not appear in the dataset. """ existing = [x for x in self.distinct(cls, [attr], expr, **kwargs) if x is not None] if not existing: return [] attr_type = getattr(cls, attr).type if issubclass(attr_type, (int, long)): return range(min(existing), max(existing) + 1) else: try: import datetime except ImportError: pass else: if issubclass(attr_type, datetime.date): def date_gen(): start, end = min(existing), max(existing) for d in range((end + 1) - start): yield start + datetime.timedelta(d) return date_gen() try: existing.sort() except TypeError: pass return existing # Cache Management # def _cache(self, cls): """Return the cache for the specified class. This base class creates a new cache for each cls per request. """ if cls not in self._caches: self._caches[cls] = {} return self._caches[cls] def purge(self, cls): """Drop all cached Units of class 'cls'. Do not save.""" del self._caches[cls] def repress(self, unit): """Remove unit from cache (but don't destroy).""" cls = unit.__class__ id = unit.identity() if self.arena.logflags & logflags.REPRESS: self.arena.log("REPRESS %s: %s" % (cls.__name__, id)) if hasattr(unit, "on_repress"): unit.on_repress() # Save after on_repress in case on_repress modified the unit. self.arena.storage(cls).save(unit) del self._cache(cls)[id] unit.sandbox = None def flush_all(self): """Repress all units.""" for cls in self._caches.keys(): # Call all on_repress methods first! There are truly horrible # interdependency chains in most on_repress methods, and # it's best to resolve them all at once BEFORE flushing # any units from the cache. # Note we use values instead of itervalues, since the # cache may change size during iteration. for unit in self._cache(cls).values(): if hasattr(unit, "on_repress"): unit.on_repress() seen_stores = {} for cls in self._caches.keys(): cache = self._cache(cls) store = self.arena.storage(cls) seen_stores[store] = None while cache: unitid, unit = cache.popitem() if self.arena.logflags & logflags.REPRESS: self.arena.log("REPRESS %s: %s" % (cls.__name__, unitid)) store.save(unit) for store in seen_stores: try: commit = store.db.commit except AttributeError: pass else: commit() def rollback(self): """Ignore all changes and purge our cache.""" seen_stores = {} for cls in self._caches.keys(): self.purge(cls) seen_stores[self.arena.storage(cls)] = None for store in seen_stores: try: rollback = store.db.rollback except AttributeError: pass else: rollback() # Context Management # def __enter__(self): return self def __exit__ (self, type, value, tb): if tb is None: self.flush_all() else: self.rollback()