"""Useful container classes. According to Stroustrup: (http://www.research.att.com/~bs/glossary.html#Gcontainer) container - (1) object that holds other objects. (2) type of object that holds other objects. (3) template that generates types of objects that hold other objects. (4) standard library template such as vector, list, and map. Find herein the containers: warehouse, Graph, Index, and Prism. This work, including the source code, documentation and related data, is placed into the public domain. The orginal author is Robert Brewer, Amor Ministries. THIS SOFTWARE IS PROVIDED AS-IS, WITHOUT WARRANTY OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE RESULTING FROM THE USE, MODIFICATION, OR REDISTRIBUTION OF THIS SOFTWARE. """ import new def warehouse(stock, factory=None): """warehouse(stock, factory=None) -> iavailable, iremainder. Iterate over stock, extending as needed. Once the 'stock' sequence is exhausted, the factory function is called to produce a new valid object upon each subsequent call to next(). If factory is None, the class of the first item in the sequence is used as a constructor. Otherwise, the factory function does not receive any arguments, so if your class has mandatory arguments to __init__, wrap the class in a function which can supply those. The most common use for warehouse is to reuse a set of existing objects, often because object creation and/or destruction is expensive. Example: available, remainder = warehouse(seq) for line in order: line.use(available.next()) for item in remainder: item.close() """ stock = iter(stock) def pull(): for item in stock: yield item if factory is None: try: local_factory = item.__class__ except NameError: raise ValueError("Empty sequence and no factory supplied.") else: local_factory = factory while True: yield local_factory() return pull(), stock class Graph(dict): """An unweighted graph. Can be directed or undirected.""" def __init__(self, data={}, directed=False): self.update(data) self.directed = directed self._cached_paths = {} def add(self, node): if node not in self: self[node] = [] self._cached_paths = {} def _make_arc(self, node, othernode): bucket = self.setdefault(node, []) if othernode not in bucket: bucket.append(othernode) def connect(self, node, othernodes): """Create an edge between node and each node in othernodes. Examples: Graph().connect('A', ('B', 'C', 'D')) becomes: --B / A---C \ --D Graph(directed=True).connect('A', ('B', 'C', 'D')) becomes: ->B / A-->C \ ->D """ try: othernodes = iter(othernodes) except TypeError: othernodes = (othernodes, ) for othernode in othernodes: self._make_arc(node, othernode) if not self.directed: self._make_arc(othernode, node) self._cached_paths = {} def chain(self, *nodes): """Create an edge between each node in sequence. Examples: Graph().chain('A', 'B', 'C', 'D') becomes: A--B--C--D Graph(directed=True).chain('A', 'B', 'C', 'D') becomes: A-->B-->C-->D """ node = nodes[0] for nextnode in nodes[1:]: self._make_arc(node, nextnode) if not self.directed: self._make_arc(nextnode, node) # Lather, rinse, repeat node = nextnode self._cached_paths = {} def shortest_path(self, start, end): """A list of nodes, or None if start is valid but no path is found.""" key = (start, end) try: return self._cached_paths[key][:] except KeyError: shortest = self._shortest_path(start, end) self._cached_paths[key] = shortest if shortest is not None: shortest = shortest[:] return shortest def _shortest_path(self, start, end, path=[]): """A list of nodes, or None if start is valid but no path is found.""" if path == []: # Test for presence of start in self. # Let the KeyError propagate out. nodes = self[start] path = [start] else: path = path + [start] if start == end: return path nodes = self[start] shortest = None for node in nodes: if node not in path: try: newpath = self._shortest_path(node, end, path) except KeyError: pass else: if newpath: if not shortest or len(newpath) < len(shortest): shortest = newpath return shortest missing=object() class Index(dict): """Index(*indices) -> A container of objects, indexed by attribute. Sometimes, we want to store objects in an associative array, but need lookup tools which are faster than simply iterating through each object in the array. For example, given a set of 1,000,000 Artwork objects, it might be too expensive to find all works by Picasso by examining each Artwork, accessing its 'Artist' attribute, and comparing that to 'Picasso'. One solution to this issue is to build multiple asociative arrays, where each keyset represents an index of attributes (such as 'Artist'). Databases are the most common implementers of this technique. Perhaps a more complex diagram will help: 'Name' 'Artist' 'Rate' Attributes / \ | / \ Guer Frenzy Picasso 125.99 300 "Handles" | | | | | | | / \ | | obj1 obj2 obj1 obj2 obj1 obj2 ^ ^ ^ ^ ^ ^ | | | | | | | +----------|----+-----------|-------+- obj2 | | | +------------------+----------------+- obj1 Notice that each of the two objects is referenced exactly three times; that is, once for each attribute which we have declared to be an indexed attribute. Also notice that these are references, not copies. Our implementation of such an Index looks like this: {'Name': {'Guernica': [obj1], 'Frenzy': [obj2]}, 'Artist': {'Picasso': [obj1, obj2]}, 'Rate': {300.00: [obj2], 125.99: [obj1]}, } A note about naming: nested dictionaries are somewhat difficult to keep in your head, because one level's "key" is another level's "value". In the Index class, the names are: Index = {attribute: {handle: bucket, handle: bucket, ..., }, attribute: {handle: bucket, handle: bucket, ..., }, } ...where each 'bucket' is a list of objects. In the example above, 'Artist' is an attribute, 'Picasso' is a handle (one of the existing values of obj.Artist), and [obj1, obj2] is a bucket. store(obj): add obj to each index based on attributes of obj. retrieve(attr, cmpfunc, *args): match objects based on indexed attributes. attr = the indexed attribute to look up. cmpfunc = a callable to which we will pass each instance attribute, expecting a true or false result. For example, to retrieve all objects with a 'Rate' attribute greater than 200, call: retrieve('Rate', lambda x: x > 200) --OR-- retrieve('Rate', operator.gt, 200) *args: any extra arguments to be passed to cmpfunc. """ def __init__(self, *indices): for attr in indices: self[attr] = {} def store(self, obj): """Add obj to self, one reference per index.""" for attr, index in self.iteritems(): bucket = index.setdefault(getattr(obj, attr), []) if obj not in bucket: bucket.append(obj) def reindex(self, obj, attr, oldhandle=missing): """The attr of obj has changed. Move it to the correct index.""" index = self[attr] if oldhandle is missing: for handle, bucket in index.iteritems(): if obj in bucket: bucket.remove(obj) if len(bucket) == 0: del index[handle] break else: self._deindex(obj, attr, oldhandle) bucket = index.setdefault(getattr(obj, attr), []) if obj not in bucket: bucket.append(obj) def _deindex(self, obj, attr, handle): index = self[attr] try: bucket = index[handle] except KeyError: pass else: try: bucket.remove(obj) except ValueError: pass if len(bucket) == 0: del index[handle] def remove(self, obj): """Delete obj from each index.""" for attr in self.iterkeys(): handle = getattr(obj, attr) self._deindex(obj, attr, handle) def handles(self, attr): """Generator for all handles for the attribute.""" # Just let KeyErrors propagate outward. for handle in self[attr].iterkeys(): yield handle def bucket(self, attr, handle): """Obtain a list of all objects in a given bucket.""" # Just let KeyErrors propagate outward. return self[attr][handle] def exists(self, attr, handle): """Return True if the handle exists in attr, False otherwise.""" return self[attr].has_key(handle) def retrieve(self, attr, cmpfunc=None, *args): """Obtain stored objects based on the value of an attribute. If cmpfunc is None, return all objects. Otherwise, we include each bucket's objects if cmpfunc(handle, *args) is True. """ product = [] if cmpfunc is None: for bucket in self[attr].itervalues(): product.extend(bucket) else: for handle, bucket in self[attr].iteritems(): if cmpfunc(handle, *args): product.extend(bucket) return product class Prism(object): """Prism(*indices) -> A colection which simplifies attribute retrieval. For many homogeneous collections, it is common to index the contained objects by an attribute of those objects. For example, if all of your objects have an 'id' attribute, you might collect them in a dictionary where each key is the id, and each value is the corresponding object. Some collections, however, benefit from lookups in multiple directions, which we might call 'facets' of the contained objects. The Prism class provides the ability to specify multiple indices/facets (all indices must be unique). In addition, multi-facet lookups are most commonly used to retrieve, not the contained object, but one of the other facets of that object. The Prism class provides this behavior via 'function refraction'; if you call Prism.far_facet_name(facet_name=value), this is equivalent to the dict-style call: indexes[facet_name].get(value).far_facet_name. For example, if you have the 'id' attribute of an object, and want to retrieve the 'color' attribute, call Prism.color(id=392). However, rather than store objects, the Prism stores values; you can think of it as a table instead of a set of objects. The issue with this is that, if one of your facets isn't a unique index, you'll get back the first match, not all matches. But sometimes that's what you want. Finally, there aren't any convenience methods yet for modifying entries; you can add and retrieve pretty well, but not mutate or remove without mucking about with internals. row_number() should help a bit. """ def __init__(self, *facets): self.facets = {} for name in facets: self.facets[name] = [] self._add_refractor(name) def _add_refractor(self, name): """Add an instancemethod to self which looks up far facets.""" def refractor(self, **facet): """Return the '%s' value associated with the specified facet.""" % name if len(facet) != 1: raise TypeError(u'%s() takes exactly one keyword argument ' u'(%s given).' % (name, len(facet))) return self.facets[name][self.row_number(**facet)] setattr(self, name, new.instancemethod(refractor, self, Prism)) def add(self, **facets): for name, row in self.facets.iteritems(): row.append(facets.get(name)) def row_number(self, **facet): k, v = facet.popitem() f = self.facets[k] try: return f.index(v) except ValueError: raise ValueError("'%s' is not a known %s" % (v, k)) def row(self, **facet): number = self.row_number(**facet) return dict([(k, v[number]) for k, v in self.facets.iteritems()])