Sparta Redland
Last night I spent a few hours hacking Sparta to use the Redland API as it’s underlying store. It’s not pretty code by any means as I simply added the new imports and kept tweaking until my personal use case was working. I haven’t even tried to make spartaTest.py work against it.
#!/usr/bin/env python
"""
sparta.py (a Simple API for RDF)
Copyright 2001-2004 Mark Nottingham <mnot @pobox.com>
Portions Copyright 2005 John C Barstow <jbowtie @amathaine.com>
Sparta is a simple API for RDF that binds RDF nodes to Python
objects and RDF arcs to attributes of those Python objects. As
such, it can be considered a \"data binding\" from RDF to Python.
THIS SOFTWARE IS SUPPLIED WITHOUT WARRANTY OF ANY KIND, AND MAY BE
COPIED, MODIFIED OR DISTRIBUTED IN ANY WAY, AS LONG AS THIS NOTICE
AND ACKNOWLEDGEMENT OF AUTHORSHIP REMAIN.
Requires rdflib <http ://www.rdflib.net/>.
TODO:
* redland support; http://redland.opensource.ac.uk/
* take a redland context as a factory arg
* take object type information from its rdf:type too (?)
* type list members?
* complete schema type support (date/time types; wait for PEP 321)
* document / refactor
* unit tests
CHANGES:
* rdf:Seq support (just like lists) (needs testing)
* if a property isn't unique, it'll return a PropertySet upon get.
* Factory takes an optional 'schema_store' arg to keep schemas separate.
If not specified, the store will be used.
"""
import urlparse, base64, types, sets
import RDF
from RDF import Uri as URI
from RDF import Node as BNode
#from rdflib.constants import FIRST, REST, NIL
RDFS_RANGE = "http://www.w3.org/2000/01/rdf-schema#range"
LIST = "http://www.w3.org/1999/02/22-rdf-syntax-ns#List"
SEQ = "http://www.w3.org/1999/02/22-rdf-syntax-ns#Seq"
__version__ = "0.6.7"
RDF_SEQi = "http://www.w3.org/1999/02/22-rdf-syntax-ns#_%s"
MAX_CARD = URI("http://www.w3.org/2002/07/owl#maxCardinality")
def loadFile(filename):
uri=RDF.Uri(string="file:"+filename)
storage=RDF.Storage(storage_name="hashes",
name="test",
options_string="new='yes',hash-type='memory',dir='.'")
model=RDF.Model(storage)
parser=RDF.Parser('raptor')
for s in parser.parse_as_stream(uri,uri):
model.add_statement(s)
return model
class TripleStore:
def load(self, filename):
self.model=loadFile(filename)
self.prefix_ns_map = {}
self.ns_prefix_map = {}
def prefix_mapping(self, key, url):
self.prefix_ns_map[key] = url
self.ns_prefix_map[str(url)] = key
def triples(self, (s, p, o)):
qs = RDF.Statement(subject = s,
predicate = p,
object = o)
for statement in self.model.find_statements(qs):
yield statement.subject,statement.predicate,statement.object
def add(self, (s, p, o)):
qs = RDF.Statement(s, p,o)
self.model.add_statement(qs)
def subjects(self, p, o):
if isinstance(p, str):
p = URI(p)
return self.model.get_sources(p,o)
class ThingFactory:
"""
Fed a store, return a factory that can be used to instantiate
Things into that world.
"""
def __init__(self, store, schema_store=None):
self.store = store
if schema_store is not None:
self.schema_store = schema_store
else:
self.schema_store = self.store
#self.update_prefix_mapping()
def __call__(self, name, **props):
return _Thing(self.store, self.schema_store, name, props)
def update_prefix_mapping(self):
"""
Update the prefix-to-namespace URI mapping. store.parse()
and possibly other methods will blow it away; call it
afterwards (or just set your prefixes afterwareds).
"""
for namespace, prefix in self.store.ns_prefix_map.items():
self.store.prefix_ns_map[prefix] = namespace
class _Thing:
""" An RDF Resource, as uniquely identified by a URI. Properties
of the Resource are avaiable as attributes; for example:
.prefix_localname is the property in the namespace mapped
to the \"prefix\" prefix, with the localname \"localname\".
"""
def __init__(self, store, schema_store, name, props={}):
self._store = store
self._schema_store = schema_store
self._object_types = {}
if name is None:# or name.isdigit():
self._name = BNode()
#elif isinstance(name, ID):
# self._name = name
else:
self._name = self._AttrToURI(name)
for attr, obj in props.items():
try:
self.__setattr__(attr, obj)
except TypeError: ### hack
self.__getattr__(attr).add(obj)
def __getattr__(self, attr):
if attr[0] == '_':
return self.__dict__[attr]
else:
try:
pred = self._AttrToURI(attr)
except ValueError:
raise AttributeError
results = self._store.triples((self._name, pred, None))
if self._isUniqueObject(pred):
try:
obj = results.next()[2]
obj_type = self._getObjectType(pred, obj)
return self._rdfToPython(obj, obj_type)
except StopIteration:
raise AttributeError
else:
return PropertySet(self, pred)
def __setattr__(self, attr, obj):
if attr[0] == '_':
self.__dict__[attr] = obj
else:
try:
pred = self._AttrToURI(attr)
obj_type = self._getObjectType(pred, obj)
if self._isUniqueObject(pred):
self._store.remove((self._name, pred, None))
self._store.add((self._name, pred, self._pythonToRdf(obj, obj_type)))
elif isinstance(obj, (sets.BaseSet, PropertySet)):
PropertySet(self, pred, obj.copy())
else:
raise TypeError
except ValueError:
raise AttributeError
def __delattr__(self, attr):
if attr[0] == '_':
del self.__dict__[attr]
else:
self._store.remove((self._name, self._AttrToURI(attr), None))
def _rdfToPython(self, obj, obj_type):
"""Given a RDF object and its type, return the equivalent Python object."""
#print "rdfToPython", obj, obj.__class__
if isinstance(obj, RDF.Node) and obj.is_literal(): # typed literals
return SchemaToPython.get(obj_type, SchemaToPythonDefault)[0](obj)
elif obj_type == LIST:
return self._rdfToList(obj)
elif obj_type == SEQ:
l, i = [], 1
while True:
try:
item = self._store.triples((obj, URI(RDF_SEQi % i), None)).next()[2]
l.append(self._rdfToPython(item, None)) ### type?
i += 1
except StopIteration:
return l
elif isinstance(obj, RDF.Node) and obj.is_resource():
return self.__class__(self._store, self._schema_store, obj)
else:
raise ValueError
def _pythonToRdf(self, obj, obj_type):
"""Given a Python object and its type, return the equivalent RDF object."""
if obj_type == LIST:
blank = BNode()
self._listToRdf(blank, obj) ### this actually stores things...
return blank
elif obj_type == SEQ: ### so will this
blank = BNode()
i = 1
for item in obj:
self._store.add((blank, URI(RDF_SEQi % i), self._pythonToRdf(item, None))) ### type?
i += 1
return blank
elif isinstance(obj, self.__class__):
return obj._name
else:
return RDF.Node(SchemaToPython.get(obj_type, SchemaToPythonDefault)[1](obj))
def _rdfToList(self, subj):
"""Given a RDF list, return the equivalent Python list."""
try:
first = self._store.triples((subj, FIRST, None)).next()[2]
except StopIteration:
return []
try:
rest = self._store.triples((subj, REST, None)).next()[2]
except StopIteration:
return ValueError
return [self._rdfToPython(first, None)] + self._rdfToList(rest) ### type first?
def _listToRdf(self, subj, members):
"""Given a Python list, return the eqivalent RDF list."""
first = self._pythonToRdf(members[0], None) ### type members[0]?
self._store.add((subj, FIRST, first))
if len(members) > 1:
blank = BNode()
self._store.add((subj, REST, blank))
self._listToRdf(blank, members[1:])
else:
self._store.add((subj, REST, NIL))
def _AttrToURI(self, method_name):
"""Given an attribute, return a URIRef."""
#print "AttrToURI", method_name
if isinstance(method_name, RDF.Node) and method_name.is_resource():
#print "resolve", method_name.uri
return method_name.uri
prefix, localname = method_name.split("_", 1)
return URI(\"\".join([self._store.prefix_ns_map[prefix], localname]))
def _URIToAttr(self, uri):
"""Given a URIRef or a URI, return an attribute."""
for ns_uri, prefix in self._store.ns_prefix_map.items():
#print uri, uri.__class__
if ns_uri == str(uri)[:len(ns_uri)]:
return "_".join([prefix, str(uri)[len(ns_uri):]])
raise ValueError
def _getObjectType(self, pred, obj):
"""Given a predicate and an object, figure out the object's type."""
if self._object_types.has_key(pred):
return self._object_types[pred]
else:
try:
obj_type = self._schema_store.triples((pred, RDFS_RANGE, None)).next()[2]
except StopIteration:
obj_type = None
self._object_types[pred] = obj_type
return obj_type
def _isUniqueObject(self, pred):
"""Given a predicate, figure out if the object has a cardinality greater than one."""
try:
obj_maxcard = self._schema_store.triples((pred, MAX_CARD, None)).next()[2]
except StopIteration:
return False
if isinstance(obj_maxcard, RDF.Node) and obj_maxcard.is_literal():
obj_maxcard = str(obj_maxcard)
elif isinstance(obj_maxcard, RDF.Node) and obj_maxcard.is_blank():
return True
if int(obj_maxcard) == 1:
return True
else:
return False
def __repr__(self):
return self._name
def __str__(self):
return self._URIToAttr(self._name)
def properties(self):
"""List unique properties."""
return [str(self.__class__(self._store, self._schema_store, p) )
for (s,p,o) in self._store.triples((self._name, None, None))]
class PropertySet:
"""
A set interface to the object(s) of a non-unique RDF predicate. Interface is a subset
(har, har) of sets.Set. .copy() returns a sets.Set instance.
"""
def __init__(self, subject, predicate, iterable=None):
self._subject = subject
self._predicate = predicate
self._store = subject._store
if iterable is not None:
for obj in iterable:
self.add(obj)
def __len__(self):
return len(list(self._store.triples((self._subject._name, self._predicate, None))))
def __contains__(self, obj):
if not isinstance(obj, self._subject.__class__):
obj_type = self._subject._getObjectType(self._predicate, obj)
obj = Literal(SchemaToPython.get(obj_type, SchemaToPythonDefault)[1](obj))
try:
self._store.triples((self._subject._name, self._predicate, obj)).next()
return True
except StopIteration:
return False
def __iter__(self):
for obj in self._store.triples((self._subject._name, self._predicate, None)):
obj_type = self._subject._getObjectType(self._predicate, obj)
yield self._subject._rdfToPython(obj[2], obj_type)
def copy(self):
return sets.Set(self)
def add(self, obj):
obj_type = self._subject._getObjectType(self._predicate, obj)
self._store.add((self._subject._name, self._predicate,
self._subject._pythonToRdf(obj, obj_type)))
def remove(self, obj):
if not obj in self:
raise KeyError
self.discard(obj)
def discard(self, obj):
if not isinstance(obj, self._subject.__class__):
obj_type = self._subject._getObjectType(self._predicate, obj)
obj = Literal(SchemaToPython.get(obj_type, SchemaToPythonDefault)[1](obj))
self._store.remove((self._subject._name, self._predicate, obj))
def clear(self):
self._store.remove((self._subject, self._predicate, None))
SchemaToPythonDefault = (unicode, unicode)
SchemaToPython = { # (schema->python, python->schema) Does not validate.
'http://www.w3.org/2001/XMLSchema#string': (unicode, unicode),
'http://www.w3.org/2001/XMLSchema#normalizedString': (unicode, unicode),
'http://www.w3.org/2001/XMLSchema#token': (unicode, unicode),
'http://www.w3.org/2001/XMLSchema#language': (unicode, unicode),
'http://www.w3.org/2001/XMLSchema#boolean': (bool, lambda i:unicode(i).lower()),
'http://www.w3.org/2001/XMLSchema#decimal': (float, unicode),
'http://www.w3.org/2001/XMLSchema#integer': (long, unicode),
'http://www.w3.org/2001/XMLSchema#nonPositiveInteger': (int, unicode),
'http://www.w3.org/2001/XMLSchema#long': (long, unicode),
'http://www.w3.org/2001/XMLSchema#nonNegativeInteger': (int, unicode),
'http://www.w3.org/2001/XMLSchema#negativeInteger': (int, unicode),
'http://www.w3.org/2001/XMLSchema#int': (int, unicode),
'http://www.w3.org/2001/XMLSchema#unsignedLong': (long, unicode),
'http://www.w3.org/2001/XMLSchema#positiveInteger': (int, unicode),
'http://www.w3.org/2001/XMLSchema#short': (int, unicode),
'http://www.w3.org/2001/XMLSchema#unsignedInt': (long, unicode),
'http://www.w3.org/2001/XMLSchema#byte': (int, unicode),
'http://www.w3.org/2001/XMLSchema#unsignedShort': (int, unicode),
'http://www.w3.org/2001/XMLSchema#unsignedByte': (int, unicode),
'http://www.w3.org/2001/XMLSchema#float': (float, unicode),
'http://www.w3.org/2001/XMLSchema#double': (float, unicode), # doesn't do the whole range
# duration
# dateTime
# time
# date
# gYearMonth
# gYear
# gMonthDay
# gDay
# gMonth
# hexBinary
'http://www.w3.org/2001/XMLSchema#base64Binary': (base64.decodestring, lambda i:base64.encodestring(i)[:-1]),
'http://www.w3.org/2001/XMLSchema#anyURI': (str, str),
}
if __name__ == '__main__':
# use: "python -i sparta.py [URI for RDF file]"
import sys
store = TripleStore()
store.parse(sys.argv[-1])
Thing = ThingFactory(store)
I also have put together a preliminary RDF-object mapper.
#!/usr/bin/env python """ spartaObj.py (object mapper for Sparta) Copyright 2005 John C Barstow <jbowtie @amathaine.com> THIS SOFTWARE IS SUPPLIED WITHOUT WARRANTY OF ANY KIND, AND MAY BE COPIED, MODIFIED OR DISTRIBUTED IN ANY WAY, AS LONG AS THIS NOTICE AND ACKNOWLEDGEMENT OF AUTHORSHIP REMAIN. TODO: * document / refactor * unit tests * relicense under GPL """ import sparta TYPE = "http://www.w3.org/1999/02/22-rdf-syntax-ns#type" from RDF import Uri as URI import RDF class ObjectFactory(sparta.ThingFactory): def __init__(self, store, schema_store=None): sparta.ThingFactory.__init__(self,store,schema_store) self.typemap = {} def __call__(self,name, **props): thing = sparta._Thing(self.store, self.schema_store, name, props) for t in thing.rdf_type: if isinstance(t, RDF.Node): print "node URI?", t.is_resource() if self.typemap.has_key(sparta.URI(t._name)): return self.typemap[sparta.URI(t._name)](thing) return thing def MapType(self, uri, targettype): self.typemap[uri] = targettype def instancesOf(self, uriText): l = [self(s) for s in self.store.subjects(TYPE, URI(uriText))] #print "instances of", uriText, l return l class thingType: def __init__(self, thing): self.thing = thing def __getattr__(self, attr): if self.__dict__.has_key(attr): return self.__dict__[attr] return self.thing.__getattr__(attr) def __setattr__(self,attr,obj): if attr==\"thing\": self.__dict__[attr]=obj return try: self.thing.__setattr__(attr,obj) except AttributeError: self.__dict__[attr]=obj def properties(self): return self.thing.properties()