""" Notice in particular that UnitCollection, UnitEngineRule, and UnitEngine are all _temporary_ Units. Even when you memorize them, they won't be persistent unless you set each instance's Expiration to None. If you use UnitEngine.immortalize(), it will make all of its rules immortal (no Expiration) as well. """ import threading import datetime try: import cPickle as pickle except ImportError: import pickle import dejavu from dejavu import logic import sets class UnitCollection(dejavu.Unit): """A Set of Unit identifiers. Type: Unit Type of all Units referenced by this collection. The Unit Collection is primarily for use as an index for Units. Unit Engines use Expressions and other rules to transform a Collection as a whole. These classes consume and produce Unit Collections. The Unit Collection provides special methods for iteration, whether reading or writing, to avoid errors common with multi-process/ multi-threaded access. UnitCollection is a subclass of Unit, so that it can be managed by Sandboxes. However, due to the structure of the data contained in a UnitCollection, it is recommended that Storage Managers use different techniques to store and retrieve Unit Collections. They do not need more than the ID's of their contained Units stored, since they will recall such Units as needed. Not every Storage Manager is going to be able to handle this kind of dynamic storage; deployers-- examine your Storage Managers and make sure they can! """ Members = dejavu.UnitProperty(list) EngineID = dejavu.UnitProperty(int, index=True) Type = dejavu.UnitProperty() Expiration = dejavu.UnitProperty(datetime.datetime) Timestamp = dejavu.UnitProperty(datetime.datetime) def __init__(self, **kwargs): dejavu.Unit.__init__(self) self.Members = [] self._mutex = threading.RLock() for k, v in kwargs.iteritems(): setattr(self, k, v) def __getstate__(self): return (self._properties, self._initial_property_hash) def __setstate__(self, state): self.sandbox = None self._mutex = threading.RLock() self._properties, self._initial_property_hash = state def acquire(self): self._mutex.acquire(True) def release(self): self._mutex.release() def __len__(self): return len(self.Members) def add(self, ID): self.acquire() try: if ID not in self.Members: self.Members.append(ID) finally: self.release() def unit_class(self): return self.sandbox.arena.class_by_name(self.Type) def ids(self): self.acquire() try: return self.Members[:] finally: self.release() def units(self, quota=None): cls = self.unit_class() idnames = [prop.key for prop in cls.identifiers] output = [] self.acquire() try: for i, eachID in enumerate(self.Members): if quota and i >= quota: break unit = self.sandbox.unit(cls, **dict(zip(idnames, eachID))) if unit: output.append(unit) finally: self.release() return output def xdict(self, attr): """Return a dictionary of {Unit.attr: [Unit, Unit, ...]}.""" product = {} self.acquire() try: for unit in self.units(): key = getattr(unit, attr) product.setdefault(key, []).append(unit) finally: self.release() return product def __copy__(self): newUnit = dejavu.Unit.__copy__(self) newUnit.Members = self.Members[:] return newUnit def on_recall(self): if self.Expiration is not None: if self.Expiration <= datetime.datetime.now(): self.forget() raise dejavu.UnrecallableError else: self.decay(minutes=15) def decay(self, **kw): """decay(**kw) -> Set Expiration to now() + timedelta(**kw).""" self.Expiration = datetime.datetime.now() + datetime.timedelta(**kw) operations = [ # OPERAND 'COPY', # SetID of mixin 'CREATE', # New type (= class.__name__) 'DIFFERENCE', # SetID of mixin 'FILTER', # logic.Expression 'FUNCTION', # key into arena.engine_functions dict 'INTERSECTION', # SetID of mixin 'RETURN', # 'TRANSFORM', # New type (= class.__name__) 'UNION', # SetID of mixin ] class RuleProperty(dejavu.UnitProperty): def __set__(self, unit, value): if self.coerce: value = self.coerce(unit, value) oldvalue = unit._properties[self.key] if oldvalue != value: unit._properties[self.key] = value if unit.sandbox: eng = unit.sandbox.unit(UnitEngine, ID=unit.EngineID) if eng: eng.update_final_class() class UnitEngineRule(dejavu.Unit): """A Rule for Unit Engines.""" Operation = RuleProperty(str) SetID = RuleProperty(int) Operand = RuleProperty(str, False, hints = {u'bytes': 0}) Sequence = RuleProperty(int) EngineID = dejavu.UnitProperty(int, index=True) Expiration = dejavu.UnitProperty(datetime.datetime) def __init__(self, **kwargs): """kw: Operation, SetID, Operand=(Type | logic.Expression | otherSet) TRANSFORM: If the Operation is 'TRANSFORM', the Operand shall be the name of a Unit type. The snapshot will consist of the identifiers of all units of that Type which are associated with the current snapshot. FILTER: If the Operation is 'FILTER', the Operand shall be a logic.Expression, and the snapshot will consist of the identifiers of Units which match the Expression. So, a typical Engine might have a set of rules which look like: --Operation-- --Set-- --Operand-- CREATE 1 Invoice # Full set FILTER 1 (Expression) # modifies Set 1 CREATE 2 Inventory # Full set FILTER 2 (Expression) # modifies Set 2 FILTER 2 (Expression) # modifies Set 2 TRANSFORM 2 Invoice # modifies Set 2 DIFFERENCE 1 2 # Set1 -= Set2 RETURN 1 # This is optional. The last RETURN statement is optional. If omitted, the last Set touched will be returned. For all operations, the Set ID indicates which Set will be modified by the operation. Using the above example, you can see that for the DIFFERENCE operation, the Set which is modified is Set 1. """ dejavu.Unit.__init__(self) if kwargs.get('Operation', '') == 'FILTER': if not isinstance(kwargs.get('Operand'), (str, unicode)): kwargs['Operand'] = pickle.dumps(kwargs['Operand']) for k, v in kwargs.iteritems(): setattr(self, k, v) def __repr__(self): op = self.Operand if self.Operation == 'FILTER': op = pickle.loads(op) return ("dejavu.engines.UnitEngineRule(%s, %s, %s)" % (self.Operation, self.SetID, repr(op))) def expr(self): """expr() -> If a FILTER rule, return the Expression, else None.""" if self.Operation == 'FILTER': op = self.Operand return pickle.loads(op) return None def on_recall(self): if self.Expiration is not None: if self.Expiration <= datetime.datetime.now(): self.forget() raise dejavu.UnrecallableError else: self.decay(minutes=15) def decay(self, **kw): """decay(**kw) -> Set Expiration to now() + timedelta(**kw).""" self.Expiration = datetime.datetime.now() + datetime.timedelta(**kw) class UnitEngine(dejavu.Unit): """A factory for Unit Collections.""" Owner = dejavu.UnitProperty() Name = dejavu.UnitProperty() Created = dejavu.UnitProperty(datetime.datetime) FinalClassName = dejavu.UnitProperty() Expiration = dejavu.UnitProperty(datetime.datetime) def __init__(self, **kwargs): dejavu.Unit.__init__(self) self.Created = datetime.datetime.today() self.Owner = u'' for k, v in kwargs.iteritems(): setattr(self, k, v) def on_forget(self): # Rules and Snapshots shouldn't persist past # the life of their Engines. Forget them. for rule in self.rules(): rule.forget() for snap in self.snapshots(): snap.forget() def on_recall(self): if self.Expiration is not None: if self.Expiration <= datetime.datetime.now(): self.forget() raise dejavu.UnrecallableError else: self.decay(minutes=15) def decay(self, **kw): """decay(**kw) -> Set Expiration to now() + timedelta(**kw).""" self.Expiration = datetime.datetime.now() + datetime.timedelta(**kw) def update_final_class(self): results = {} last_set = 1 for rule in self.rules(): last_set = rule.SetID operation = rule.Operation if operation in ('CREATE', 'TRANSFORM'): results[last_set] = rule.Operand if operation == 'RETURN': break if last_set in results: self.FinalClassName = results[last_set] def final_class(self): return self.sandbox.arena.class_by_name(self.FinalClassName) def rules(self): """An ordered list of Rules for this Engine.""" allrules = [x for x in self.UnitEngineRule()] allrules.sort(dejavu.sort(u'Sequence')) return allrules def add_rule(self, Operation, SetID=None, Operand=None): allrules = self.rules() if isinstance(Operation, UnitEngineRule): newRule = Operation else: if SetID is None: try: SetID = allrules[-1].SetID except IndexError: SetID = 1 newRule = UnitEngineRule(Operation=Operation, SetID=SetID, Operand=Operand) try: nextSeq = allrules[-1].Sequence + 1 except IndexError: nextSeq = 0 newRule.Sequence = nextSeq newRule.EngineID = self.ID newRule.Expiration = self.Expiration self.sandbox.memorize(newRule) self.update_final_class() def snapshots(self): """Unit Collections obtained by executing the rules sometime in the past.""" f = logic.filter(EngineID=self.ID) allSnap = self.sandbox.recall(UnitCollection, f) allSnap.sort(dejavu.sort(u'Timestamp')) return allSnap def take_snapshot(self, args={}): """Execute the rules and return a Unit Collection (or None).""" allrules = self.rules() snap = RuleProcessor(self.sandbox).process(allrules, args) if snap is not None: snap.EngineID = self.ID now = datetime.datetime.now() snap.Timestamp = now snap.decay(minutes=15) self.sandbox.memorize(snap) return snap def last_snapshot(self, args={}): allSnaps = self.snapshots() if len(allSnaps) == 0: aSnap = self.take_snapshot(args) else: aSnap = allSnaps[-1] return aSnap def immortalize(self): self.Expiration = None for rule in self.rules(): rule.Expiration = None def __copy__(self): newUnit = dejavu.Unit.__copy__(self) newUnit.Name = "Copy of %s" % newUnit.Name newUnit.Created = datetime.datetime.now() return newUnit def clone(self, user, temporary=True): """Copy self and all Rules of self. Memorize automatically.""" newUnit = self.__copy__() newUnit.Owner = user if temporary: newUnit.decay(minutes=15) else: newUnit.Expiration = None self.sandbox.memorize(newUnit) for rule in self.rules(): newRule = rule.__copy__() newRule.EngineID = newUnit.ID newRule.Expiration = newUnit.Expiration self.sandbox.memorize(newRule) return newUnit def permit(self, user, write_access=True): if write_access: return self.Owner in (u'Public', user) else: return self.Owner in ('System', 'Public', user) UnitEngine.one_to_many('ID', UnitEngineRule, 'EngineID') UnitEngine.one_to_many('ID', UnitCollection, 'EngineID') class RuleProcessor(object): """Processor for the Rules of a Unit Engine.""" def __init__(self, sandbox): self.sandbox = sandbox self.arena = sandbox.arena def process(self, rules, args): """Execute the rules and return a Unit Collection (or None).""" self.sets = {} self.args = args final = None for rule in rules: operation = rule.Operation func = getattr(self, 'visit_' + operation) final = rule.SetID func(final, rule.Operand) if final is None: return None else: return self.sets[final] def visit_COPY(self, setID, operand): """Copy the set whose ID = operand into another set, whose ID = setID.""" A = self.sets[setID] setID2 = int(operand) if setID2 in self.sets: # Overwrite the existing set. B = self.sets[setID2] else: # Create a new set. B = UnitCollection(Type=A.Type) self.sets[setID2] = B B.universal = A.universal A.acquire() B.acquire() try: B.Members = A.Members[:] finally: A.release() B.release() def visit_CREATE(self, setID, operand): """Create a universal set. Actual population may be deferred.""" newset = UnitCollection(Type=operand) newset.universal = True self.sets[setID] = newset def visit_DIFFERENCE(self, setID, operand): A = self.sets[setID] setID2 = int(operand) self.realize_universal(setID2) B = self.sets[setID2] A.acquire() B.acquire() try: if B.universal: # B should be every Unit, which means the difference # will always be an empty set. A.Members = [] else: # B is a subset of A. if A.universal: cls = self.arena.class_by_name(A.Type) mem = A.Members for unit in self.sandbox.recall(cls): id = unit.identity() if id not in B.Members: mem.append(id) else: A.Members = [x for x in A.Members if x not in B.Members] A.universal = False finally: A.release() B.release() def visit_FILTER(self, setID, operand): expr = pickle.loads(operand) expr.bind_args(**self.args) A = self.sets[setID] A.acquire() try: cls = self.arena.class_by_name(A.Type) mem = A.Members if A.universal: A.universal = False for unit in self.sandbox.recall(cls, expr): id = unit.identity() if id not in mem: mem.append(id) else: newset = [] idnames = [prop.key for prop in cls.identifiers] for id in mem: unit = self.sandbox.unit(cls, **dict(zip(idnames, id))) if unit and expr(unit): newset.append(id) A.Members = newset finally: A.release() def visit_FUNCTION(self, setID, operand): func = self.arena.engine_functions[operand] A = self.sets[setID] A.acquire() try: # Notice we do not populate universals before passing to func. func(self.sandbox, A) finally: A.release() def visit_INTERSECTION(self, setID, operand): A = self.sets[setID] setID2 = int(operand) B = self.sets[setID2] A.acquire() B.acquire() try: if B.universal: # If A is universal, (A and B) = universal. Defer. # If A is not universal, (A and B) = A. Pass. pass else: if A.universal: # B is a subset of A. (A and B) = B. Copy B to A. A.Members = B.Members[:] A.universal = False else: A.Members = [x for x in A.Members if x in B.Members] finally: A.release() B.release() def visit_RETURN(self, setID, operand): self.realize_universal(setID) def realize_universal(self, setID): A = self.sets[setID] if A.universal: A.acquire() try: A.universal = False cls = self.arena.class_by_name(A.Type) mem = A.Members for unit in self.sandbox.recall(cls): mem.append(unit.identity()) finally: A.release() def visit_TRANSFORM(self, setID, operand): """visit_TRANSFORM(setID, operand=farClass name). Multiple hops OK.""" A = self.sets[setID] start = self.arena.class_by_name(A.Type) end = self.arena.class_by_name(operand) nodes = self.arena.associations.shortest_path(start, end) if nodes is None: raise KeyError("No association found between '%s' and '%s'" % (start, end)) # Skip the first node, which should be A.Type nodes.pop(0) A.acquire() try: for eachType in nodes: # Add all associated Units to the collection A. ua = start._associations[eachType.__name__] cls = self.arena.class_by_name(A.Type) newset = [] if A.universal: for unit in self.sandbox.recall(cls): farUnits = ua.__get__(unit)() if not ua.to_many: if farUnits is None: farUnits = [] else: farUnits = [farUnits] for farUnit in farUnits: farid = farUnit.identity() if farid not in newset: newset.append(farid) A.universal = False else: idnames = [prop.key for prop in cls.identifiers] for id in A.Members: unit = self.sandbox.unit(cls, **dict(zip(idnames, id))) if unit: farUnits = ua.__get__(unit)() if not ua.to_many: if farUnits is None: farUnits = [] else: farUnits = [farUnits] for farUnit in farUnits: farid = farUnit.identity() if farid not in newset: newset.append(farid) A.Members = newset start = eachType A.Type = eachType.__name__ finally: A.release() def visit_UNION(self, setID, operand): A = self.sets[setID] setID2 = int(operand) B = self.sets[setID2] A.acquire() B.acquire() try: if B.universal: # (A or B) = Universal set. Make A universal. A.universal = True A.Members = [] else: if A.universal: pass else: amem = A.Members for id in B.Members: if id not in amem: amem.append(id) finally: A.release() B.release() def register_classes(arena): arena.register(UnitCollection) arena.register(UnitEngine) arena.register(UnitEngineRule)