""" Notice in particular that UnitCollection, UnitEngineRule, and UnitEngine are all _temporary_ Units. Even when you memorize them, they won't be persistent unless you mark each instance as no longer temporary. If you use UnitEngine.permanent(), it will make all of its rules permanent (not temporary) as well. """ import threading import datetime try: import cPickle as pickle except ImportError: import pickle import dejavu import logic import sets import xray class UnitCollection(dejavu.Unit): """A Set of Unit IDs. Type: Unit Type of all Units referenced by this collection. The Unit Collection is primarily for use as an index for Units. Unit Engines use Expressions and other rules to transform a Collection as a whole. These classes consume and produce Unit Collections. The Unit Collection provides special methods for iteration, whether reading or writing, to avoid errors common with multi-process/ multi-threaded access. UnitCollection is a subclass of Unit, so that it can be managed by Sandboxes. However, due to the structure of the data contained in a UnitCollection, it is recommended that Storage Managers use different techniques to store and retrieve Unit Collections. They do not need more than the ID's of their contained Units stored, since they will recall such Units as needed. Not every Storage Manager is going to be able to handle this kind of dynamic storage; deployers-- examine your Storage Managers and make sure they can! """ _IDs = None def __init__(self, **kwargs): dejavu.Unit.__init__(self) self._IDs = sets.Set() self._mutex = threading.RLock() for k, v in kwargs.iteritems(): setattr(self, k, v) def acquire(self): self._mutex.acquire(True) def release(self): self._mutex.release() def __len__(self): return len(self._IDs) def add(self, ID): self.acquire() try: self._IDs.add(ID) finally: self.release() def unit_class(self): return self.sandbox.arena.class_by_name(self.Type) def ids(self): self.acquire() for eachID in self._IDs: yield eachID self.release() def units(self, quota=None): cls = self.unit_class() self.acquire() for i, eachID in enumerate(self._IDs): if quota and i >= quota: break unit = self.sandbox.unit(cls, ID=eachID) if unit: yield unit self.release() def xdict(self, attr): """Return a dictionary of {Unit.attr: [Unit, Unit, ...]}.""" product = {} self.acquire() try: for unit in self.units(): key = getattr(unit, attr) product.setdefault(key, []).append(unit) finally: self.release() return product def __copy__(self): newUnit = dejavu.Unit.__copy__(self) newUnit._IDs = self._IDs.copy() return newUnit UnitCollection.set_property(u'EngineID', int, index=True) UnitCollection.set_properties({u'Type': unicode, u'Timestamp': datetime.datetime, }) operations = [ # OPERAND 'COPY', # SetID of mixin 'CREATE', # New type (= class.__name__) 'DIFFERENCE', # SetID of mixin 'FILTER', # logic.Expression 'FUNCTION', # key into arena.engine_functions dict 'INTERSECTION', # SetID of mixin 'RETURN', # 'TRANSFORM', # New type (= class.__name__) 'UNION', # SetID of mixin ] class UnitEngineRule(dejavu.Unit): """A Rule for Unit Engines.""" def __init__(self, **kwargs): """kw: Operation, SetID, Operand=(Type | logic.Expression | otherSet) Expressions: If the Operation is a logic.Expression, then the snapshot will consist of the IDs of units which match the Expression. Everything else: transforms: the snapshot will consist of IDs of all units which are associated with the current snapshot. union, difference, and intersection: these all take a setID. So, a typical Engine might have a set of rules which look like: --Operation-- --Set-- --Operand-- CREATE 1 Invoice # Full set FILTER 1 (Expression) # modifies Set 1 CREATE 2 Inventory # Full set FILTER 2 (Expression) # modifies Set 2 FILTER 2 (Expression) # modifies Set 2 TRANSFORM 2 Invoice # modifies Set 2 DIFFERENCE 1 2 # Set1 -= Set2 RETURN 1 # This is optional. The last RETURN statement is optional. If omitted, the last Set touched will be returned. For all operations, the Set ID indicates which Set will be modified by the operation. Using the above example, you can see that for the DIFFERENCE operation, the Set which is modified is Set 1. """ dejavu.Unit.__init__(self) if kwargs.get('Operation', '') == 'FILTER': if not isinstance(kwargs.get('Operand'), (str, unicode)): kwargs['Operand'] = pickle.dumps(kwargs['Operand']) for k, v in kwargs.iteritems(): setattr(self, k, v) def __repr__(self): op = self.Operand if self.Operation == 'FILTER': op = pickle.loads(op) return ("dejavu.engines.UnitEngineRule(%s, %s, %s)" % (self.Operation, self.SetID, repr(op))) def expr(self): if self.Operation == 'FILTER': op = self.Operand return pickle.loads(op) return None class RuleProperty(dejavu.UnitProperty): def post(self, unit, value): eng = unit.sandbox.unit(UnitEngine, ID=unit.EngineID) if eng: eng.update_final_class() UnitEngineRule.set_property(u'Operation', str, descriptor=RuleProperty) UnitEngineRule.set_property(u'SetID', int, descriptor=RuleProperty) UnitEngineRule.set_property(u'Operand', str, descriptor=RuleProperty) UnitEngineRule.Operand.hints = {u'Size': 0} UnitEngineRule.set_property(u'Sequence', int, descriptor=RuleProperty) UnitEngineRule.set_property(u'EngineID', int, index=True) class UnitEngine(dejavu.Unit): """A factory for Unit Collections.""" def __init__(self, **kwargs): dejavu.Unit.__init__(self) self.Created = datetime.datetime.today() self.Owner = u'' for k, v in kwargs.iteritems(): setattr(self, k, v) def on_forget(self): # Rules and Snapshots shouldn't persist past # the life of their Engines. Forget them. for rule in self.rules(): rule.forget() for snap in self.snapshots(): snap.forget() def update_final_class(self): results = {} last_set = 1 for rule in self.rules(): last_set = rule.SetID operation = rule.Operation if operation in ('CREATE', 'TRANSFORM'): results[last_set] = rule.Operand if operation == 'RETURN': break if last_set in results: self.FinalClassName = results[last_set] def final_class(self): return self.sandbox.arena.class_by_name(self.FinalClassName) def rules(self): """An ordered list of Rules for this Engine.""" f = logic.filter(EngineID=self.ID) allrules = [x for x in self.sandbox.recall(UnitEngineRule, f)] allrules.sort(dejavu.sort(u'Sequence')) return allrules def add_rule(self, Operation, SetID=None, Operand=None): allrules = self.rules() if isinstance(Operation, UnitEngineRule): newRule = Operation else: if SetID is None: try: SetID = allrules[-1].SetID except IndexError: SetID = 1 newRule = UnitEngineRule(Operation=Operation, SetID=SetID, Operand=Operand) try: nextSeq = allrules[-1].Sequence + 1 except IndexError: nextSeq = 0 newRule.Sequence = nextSeq newRule.EngineID = self.ID newRule.temporary = self.temporary self.sandbox.memorize(newRule) self.update_final_class() def snapshots(self): """Unit Collections obtained by executing the rules sometime in the past.""" f = logic.filter(EngineID=self.ID) allSnap = [x for x in self.sandbox.recall(UnitCollection, f)] allSnap.sort(dejavu.sort(u'Timestamp')) return allSnap def take_snapshot(self, args={}): """Execute the rules and return a Unit Collection (or None).""" allrules = self.rules() snap = RuleProcessor(self.sandbox).process(allrules, args) if snap is not None: snap.EngineID = self.ID snap.Timestamp = datetime.datetime.now() snap.temporary = True self.sandbox.memorize(snap) return snap def last_snapshot(self, args={}): allSnaps = self.snapshots() if len(allSnaps) == 0: aSnap = self.take_snapshot(args) else: aSnap = allSnaps[-1] return aSnap def permanent(self): self.temporary = False for rule in self.rules(): rule.temporary = False def __copy__(self): newUnit = dejavu.Unit.__copy__(self) newUnit.Name = "Copy of %s" % newUnit.Name newUnit.Created = datetime.datetime.now() return newUnit def clone(self, user, temporary=True): """Copy self and all Rules of self. Memorize automatically.""" newUnit = self.__copy__() newUnit.Owner = user newUnit.temporary = temporary self.sandbox.memorize(newUnit) for rule in self.rules(): newRule = rule.__copy__() newRule.EngineID = newUnit.ID newRule.temporary = temporary self.sandbox.memorize(newRule) return newUnit def permit(self, user, write_access=True): if write_access: return self.Owner in (u'Public', user) else: return self.Owner in ('System', 'Public', user) UnitEngine.set_properties({u'Owner': unicode, u'Name': unicode, u'Created': datetime.datetime, u'FinalClassName': unicode, }) class RuleProcessor(object): """Processor for the Rules of a Unit Engine.""" def __init__(self, sandbox): self.sandbox = sandbox self.arena = sandbox.arena def process(self, rules, args): """Execute the rules and return a Unit Collection (or None).""" self.sets = {} self.args = args final = None for rule in rules: operation = rule.Operation func = getattr(self, 'visit_' + operation) final = rule.SetID func(final, rule.Operand) if final is None: return None else: return self.sets[final] def visit_COPY(self, setID, operand): """Copy the set whose ID = operand into another set, whose ID = setID.""" A = self.sets[setID] operand = int(operand) if operand in self.sets: # Overwrite the existing set. B = self.sets[operand] else: # Create a new set. B = UnitCollection(Type=A.Type) self.sets[operand] = B B.empty = A.empty A.acquire() B.acquire() try: B._IDs = A._IDs.copy() finally: A.release() B.release() def visit_CREATE(self, setID, operand): """Create an empty set. The next instruction is responsible to fill it.""" newset = UnitCollection(Type=operand) newset.empty = True self.sets[setID] = newset def realize_empty(self, setID): """realize_empty(setID). Populate the specified set only if empty.""" A = self.sets[setID] if hasattr(A, 'empty') and A.empty: A.empty = False A.acquire() try: for unit in self.sandbox.recall(self.arena.class_by_name(A.Type)): A._IDs.add(unit.ID) finally: A.release() def visit_DIFFERENCE(self, setID, operand): self.realize_empty(setID) A = self.sets[setID] B = self.sets[int(operand)] A.acquire() B.acquire() try: A._IDs = A._IDs.difference(B._IDs) finally: A.release() B.release() def visit_FILTER(self, setID, operand): expr = pickle.loads(operand) expr.bind_args(**self.args) A = self.sets[setID] if hasattr(A, 'empty') and A.empty: A.empty = False A.acquire() try: cls = self.arena.class_by_name(A.Type) for unit in self.sandbox.recall(cls, expr): A._IDs.add(unit.ID) finally: A.release() else: A.acquire() try: cls = self.arena.class_by_name(A.Type) newset = sets.Set() for id in A._IDs: unit = self.sandbox.unit(cls, ID=id) if unit and expr.evaluate(unit): newset.add(id) A._IDs = newset finally: A.release() def visit_FUNCTION(self, setID, operand): func = self.arena.engine_functions[operand] A = self.sets[setID] A.acquire() try: func(self.sandbox, A) finally: A.release() def visit_INTERSECTION(self, setID, operand): self.realize_empty(setID) A = self.sets[setID] B = self.sets[int(operand)] A.acquire() B.acquire() try: A._IDs = A._IDs.intersection(B._IDs) finally: A.release() B.release() def visit_RETURN(self, setID, operand): self.realize_empty(setID) def visit_TRANSFORM(self, setID, operand): """operand=far class name. Multiple hops are supported.""" self.realize_empty(setID) A = self.sets[setID] start = self.arena.class_by_name(A.Type) end = self.arena.class_by_name(operand) nodes = self.arena.associations.shortest_path(start, end) if nodes is None: raise KeyError("No association found between '%s' and '%s'" % (start, end)) # Skip the first node, which should be A.Type nodes.pop(0) A.acquire() try: for eachType in nodes: # Add all associated Units to the collection A. oppfunc = getattr(start, eachType.__name__) cls = self.arena.class_by_name(A.Type) newset = sets.Set() for id in A._IDs: unit = self.sandbox.unit(cls, ID=id) if unit: for farUnit in oppfunc(unit): newset.add(farUnit.ID) A._IDs = newset start = eachType A.Type = eachType.__name__ finally: A.release() def visit_UNION(self, setID, operand): self.realize_empty(setID) A = self.sets[setID] B = self.sets[int(operand)] A.acquire() B.acquire() try: A._IDs = A._IDs.union(B._IDs) finally: A.release() B.release()