#!/usr/bin/python3 import os, getopt import sys, re import readline, atexit ## at exit save readline history from pprint import pprint as pp ops, args = getopt.getopt(sys.argv[1:],"dsg:") ops=dict(ops) from lark import Lark, InlineTransformer, inline_args from owlready2 import * ### PARSER AMB + sem1 ====================================== anbgrammar=r''' start : doc (doc_h)* doc_h : "===" doc -> doc_h | /===[ \t]*\S.*/ doc -> doc_h2 doc : txt_with_trip -> doc txt_with_trip : -> twt1 | txt_with_trip "=" linev -> twt2 | txt_with_trip TXT -> twt2 | txt_with_trip trip -> twt2 trip : "#/" linev -> trip1 // subjet focus | "#/?" linev -> trip0 // strong doubt | "#=" linev -> trip2 // alias | "#" rel linev -> trip3 // subj rel val | "#" rel "=" "{" txt_with_trip "}" -> trip4 // inline doc | "#" rel "{" pushlinev txt_with_trip "}" -> trip5 // | "#" "doc" pushlinev "{" txt_with_trip "}" -> trip6 | "#" "doc" "{" txt_with_trip "}" -> trip7 | "#" "bio" "{" txt_with_trip "}" -> trip8 | "#" "{" pushlinev txt_with_trip "}" -> trip9 | "*" linevdate -> trip10 | /[+†]/ linevdate -> trip11 pushlinev : linev linevdate : LINEV linev : LINEV -> linev rel : REL -> rel REL .3 : /\w+([-:]\w+)*/ LINEV .3 : /(\.\.\.|\?|\w)[^\n#{}*+†=;]*|'[^']*'/ TXT .1 : /(http.*|[^#{}*+†=]|=(?!==)|#\s*\n|\{\{(.|\n)*?\}\})+/ FINALCOMS .6 : /__END__(.|\n)*/ WS .4 : /[ \t\n]/ COMS .3 : />.*/ %ignore COMS %ignore FINALCOMS %ignore WS ''' class CalculateAnbOnto(InlineTransformer): def add3(sf,s,v,o): sf.onto["trip"].append((s,v,o)); print(f"==={s}={v}={o}") def addn(sf,n) : sf.onto["indi"][n]= sf.onto["indi"].get(n,0)+1 def s(sf) : return sf.subj[-1] ## get current suject def pushlinev(sf,l): sf.last=l sf.subj.append(l) ## set current subject return l def trip0(sf,l) : l="?"+l; sf.subj.append(l); sf.last=l; return l def trip1(sf,l) : sf.subj.append(l); sf.last=l; return l def trip2(sf,l) : sf.add3(sf.s(),"alias",l); sf.addn(l); return l def trip3(sf,r,l): sf.add3(sf.s(),r,l); sf.last=l; sf.addn(l); return l def trip4(sf,r,t): sf.add3(sf.s(),r,t); return t def trip5(sf,r,l,t) : sf.addn(l) sf.subj.pop() sf.add3(sf.s(),r,l) return l def trip6(sf,l,t): sf.addn(l) sf.subj.pop() sf.add3(sf.s(),"about",l) sf.add3(l,"inline-doc",t) return l def trip7(sf,d) : sf.add3("?anoymous-doc","inline-doc",d); return d def trip8(sf,t): sf.add3(sf.s(),"bio",t) return t def trip9(sf,l,t): sf.subj.pop() sf.add3(sf.s(),"rt",l) return l def trip10(sf,l) : sf.add3(sf.last,"birth",l); return f"* {l}\n" def trip11(sf,p,l): sf.add3(sf.last,"death",l); return f"+ {l}\n" def twt1(sf) : return "" def twt2(sf,tw,x) : return f"{str(tw)}\n{x}" def linevdate(sf,v): v=re.sub(r'[\s;,]+$','',str(v)) v=re.sub(r'([^\.])\.$',r'\1',v) return v def linev(sf,v): v=re.sub(r'[\s;,]+$','',str(v)) v=re.sub(r'([^\.])\.$',r'\1',v) sf.onto["indi"][v]=1 return v def rel(sf,r): return str(r) def doc(sf,t): return t def doc_h(sf,t): sf.c+=1 sf.onto["doc"][sf.c]=t sf.subj=["???"] return t def doc_h2(sf,tit,t): sf.c+=1 sf.onto["doc"][sf.c]= f"={tit}\n{t}" sf.add3(tit,"isdoc",f'doc-{sf.c}') sf.subj=["???"] return t def start(sf,*d): sf.onto["docg"]=d ##print("=====start=",sf.onto) return(sf.onto) def __init__(sf) : sf.onto = { "doc":{}, "indi":{}, "trip":[], } sf.subj=["??2"] sf.last="??1" sf.id=999 sf.c=0 ## document counter def anbpp(anb): pp(anb) ''' print(f"#### ANB {file}\n", "\nIndividuos:\n", anb["indi"], "\ntriplos:\n", anb["trip"], ) ''' def anbparse(fs): r=[] parser = Lark(anbgrammar, parser='lalr', transformer=CalculateAnbOnto()) for file in fs: with open(file,"r") as f: try: anb=parser.parse(f.read()) r.append(anb) except BaseException as err: print(f'Syntax error:\n {file} {err}') exit(1) return r #====================END of anb parser grammar_off=r''' start : ontologia "."? FINALPY? ontologia : "ontologia"i ID (conceitos | relacoes | individuos| triplos)* conceitos : "conceitos"i "{" conceito ("," conceito )* ","? "}" relacoes : RELS "{" relacao ("," relacao )* ","? "}" individuos : INDS "{" individuo ("," individuo)* ","? "}" triplos : "triplos"i "{" triplo* "}" conceito : id | id "[" dataprops "]" -> conceito_2 dataprops : (id ":" type ","? )+ type : id relacao : id | id "/" id -> relacao_inv individuo : id | id "=" classprops -> individuo1 triplo : id "=" (id "=>" val ","? )+ ";" | id "=" "iof" "=>" classprops ";" -> individuo1 classprops : id ( "[" id (":"|"=") val ("," id (":"|"=") val)* ","? "]" )? val : id -> id | STR -> id | TOK -> id id : ID INDS .4 : /individuos|indivíduos/i RELS .4 : /relações|relacoes/i ID .3 : /\w+(-\w+)*/ // may be in the future... TXT .1 : /"""(.|\n)*?"""/ STR .1 : /"""(.|\n)*?"""|"([^"])*"/ TOK .1 : /'([^'])*'/ WS .1 : /[ \t\n]/ COMS .3 : /#.*/ FINALPY .5 : /__PYTHON__(.|\n)*?(?=(__END__|$))/ FINALCOMS .6 : /__END__(.|\n)*/ %ignore COMS %ignore FINALCOMS %ignore WS ''' class CalculateOnto_off(InlineTransformer): def __init__(self) : self.onto = { "meta":{"name":None}, "con":{}, "datap":{}, "rel": {}, ##FIXME {"is-a":{},"iof":{},"pof":{}}, "inv":{}, "ind":{}, "tri":{}} def start(self,o,py=None): if py: jjexec(re.sub(r'__PYTHON__','',py)) return(self.onto) def ontologia(self,name,*v): self.onto["meta"]["name"]=name def id(self,i): return str(i) def dataprops(self, *v): return l2dict(v) def type(self, v): return v def individuo(self,c): nc = norm(c) self.onto["ind"][nc] ={"dataprop":{}, "owl": [f'{nc} = Thing("{nc}")']} return c def classprops(self,c,*p): nc = norm(c) props=l2dict(p) ## FIXME continuar return (nc,props) def individuo1(self,c1,p): nc1 = norm(c1) nc2,x = p self.onto["ind"][nc1] ={"dataprop":{}, "owl": [f'{nc1} = {nc2}("{nc1}")']} for a in x: add_triplo(self,c1,a,x[a]) return c1 def relacao(self,c): nc=norm(c) self.onto["rel"][nc] ={"objprop":{}, "owl": [f'class {nc}(ObjectProperty): pass']} return c def relacao_inv(self,i1,i2): ni1=norm(i1) ni2=norm(i2) self.onto["inv"][ni2] = i1 self.onto["inv"][ni1] = i2 self.onto["rel"][ni1] ={"objprop":{}, "owl": [f'class {ni1}(ObjectProperty): pass']} self.onto["rel"][ni2] ={"objprop":{}, "owl": [f'class {ni2}(ObjectProperty): pass', f'{ni1}.inverse_property = {ni2}', f'{ni2}.inverse_property = {ni1}']} return (i1,i2) def triplo(self,i1,i2,i3,*p): add_triplo(self,i1,i2,i3) for x,y in l2lofpairs(p): add_triplo(self,i1,x,y) return (i1,i2,i3,p) def conceito(self,c): self.onto["con"][c] ={"dataprop":{}, "owl": [f'class {c}(Thing):pass']} return c def conceito_2(self, c, p): self.onto["con"][c] ={"dataprop":p, "owl": [f'class {c}(Thing):pass']} for x in p: x=norm(x) if x not in self.onto["datap"]: self.onto["datap"][x]={"domain":{},"range":{}} self.onto["datap"][x]["domain"][c]=1 self.onto["datap"][x]["range"][p[x]]=1 self.onto["datap"][x]["owl"]= [f'class {x}(DataProperty, FunctionalProperty):pass'] # domain = {c} # range = {p[x]} return c def add_triplo(self,s,p,o): ns=norm(s) np=norm(p) no=norm2(o) if (ns not in self.onto["con"] and ns not in self.onto["ind"]) : print(f"ERROR: {ns} desconhecido") if (np not in self.onto["rel"] and np not in self.onto["datap"] and np not in ["is_a","iof","pof"]): print(f"Err: rel {np} desconhecido") if (no not in self.onto["con"] and no not in self.onto["ind"] and not re.match(r'["\'0-9]',no)) : print(f"ERROR: {no} desconhecido") if ( p in self.onto["datap"] and ns in self.onto["ind"] ): self.onto["tri"][(ns,np,o)] = {"owl": [f'{ns}.{np} = {o}']} else: self.onto["tri"][(ns,np,no)]={"owl": [f'{ns}.{np}.append({no})']} ### util funcs def l2dict(v) : return dict(l2lofpairs(v)) ## [a b c d]→{a:b c:d} list to dict def l2lofpairs(v): return zip(v[0::2],v[1::2]) ## [a b c d]→[(a b)(c d)] list to listOfPairs def norm(c): a=re.sub(r'\W','_',c.strip()) a=re.sub(r'^_|_$','',a) return a def norm2(c): if re.match(r'["\'0.9]',c): return c else: return re.sub(r'\W','_',c.strip()) def jjexec(x): ## execute x and report errors to stdout try: exec(x,globals()) except BaseException as err: print(f'Syntax error: {err}') ONTO = "nada" graph = default_world.as_rdflib_graph() ### OWL generator ====================================== def gera(ont): global ONTO global graph ONTO = get_ontology("a:") # ONTO = get_ontology(ont["meta"]["name"]) ONTO.name = "#b:" ## ont["meta"]["name"] ONTO.base_iri = "a:" ## "ol:" with ONTO: class iof(ObjectProperty): pass class is_a(ObjectProperty): pass class pof(ObjectProperty): pass owl = [ i for t in ["con","rel","datap","ind","tri"] for elem in ont[t] for i in ont[t][elem]["owl"]] com = "with ONTO:\n " + "\n ".join(owl) if "-d" in ops: print("Debug ",com) try: exec(com,globals()) except BaseException as err: print(f'Syntax error: {err}') # with ONTO: # sync_reasoner() graph = default_world.as_rdflib_graph() ONTO.save(file= f"ONT_{ont['meta']['name']}.xml") def loadont(file): global ONTO global graph ONTO = get_ontology(file) ONTO.load() ONTO.load() # ONTO.base_iri = "#" ## "ol:" graph = default_world.as_rdflib_graph() ### Shell ====================================== def sh(ONTO): class SimpleCompleter: def __init__(self, options): self.options = sorted(options) def complete(self, text, state): # print(text) response = None if state == 0: if text: self.matches = [ s for s in self.options if s and s.startswith(text) ] else : self.matches = self.options[:] try : response = self.matches[state] except IndexError: response = None return response OPTIONS = [ x.name for x in ONTO.classes()] OPTIONS += [ x.name for x in ONTO.individuals()] OPTIONS += [ x.name for x in ONTO.properties()] readline.set_completer(SimpleCompleter(OPTIONS).complete) readline.parse_and_bind('tab: complete') histfile = os.path.join(os.path.expanduser("~"),".onto_history") try: readline.read_history_file(histfile) readline.set_history_length(1000) except FileNotFoundError: pass atexit.register(readline.write_history_file, histfile) while True: try: c = input("> ").strip() except EOFError: print(); break if c == "quit" : break if not re.search(r'\S',c): continue if re.match(r'i_(\S+)',c): com= re.sub(r'i_(.+)', r'pp(po(ONTO.\1))', c) elif re.match(r'c_(\S+)',c): com= re.sub(r'c_(.+)', r'pp(cpp(ONTO.\1))', c) elif re.match(r'/(\S+)',c): com= re.sub(r'^/(.+)', r'pp(ONTO.search(iri="*\1*"))', c) elif re.match(r'\?(\S+)',c): com= re.sub(r'^\?(.+?)\{(.*)',r'pp(pptable(graph.query("""SELECT DISTINCT ?\1 WHERE {\2""")))' , c) else: com= f'pp({c})' try : exec(f"{com}",globals()) except BaseException as err: print(f'Error: {err}') def ontogrep(pat,ONTO): com = f''' with ONTO: for a in ONTO.search(iri="*{pat}*"): print("===",a) if a in ONTO.properties(): pp("property") else: pp(po(a)) ''' try: exec(com,globals()) except BaseException as err: print(f'Syntax error: {err}') def cpp(cl): return {"is_a":cl.is_a, "instances":cl.instances()} def allc(): return [(c.name, c.is_a, c.instances()) for c in ONTO.classes()] def alli(): return [ po(c) for c in ONTO.individuals()] def cpo(): ## class, prediate, object FIXME return [(suj,p.name,o ) for suj in ONTO.classes() for p in ONTO.properties() for o in p[suj]] def itri(): ## individuals, predicate, object return [(suj,p.name,o ) for suj in ONTO.individuals() for p in ONTO.properties() for o in p[suj]] def po(suj): ## predicate object return (suj, suj.is_a, {p.name : p[suj] for p in ONTO.properties() if p[suj]}) def pptable(t): return [tuple(str(y) for y in x) for x in t ] def jjql(s): s=re.sub(r'^\s*(\?.+?)(\{.*)',r'select distinct \1 where \2',s) s=re.sub(r'^\s*(\*.+?)(\{.*)',r'select distinct \1 where \2',s) return pptable(list(graph.query(s))) def q(x): pp(jjql(x)) def i(x=None): pp(po(x) if x else alli()) def c(x=None): pp(cpp(x) if x else allc()) ## Main ======== def main(): global p global ONTO global graph anbs=[] for f in args: if re.match(r'.*\.(owl|rdf|xml)$',args[0]): loadont(args[0]) else: anbs.append(f) r=anbparse(anbs) for p in r: anbpp(p) exit(1) gera(p) if "-g" in ops: ontogrep(ops["-g"],ONTO) ### import code ### code.interact(local=locals()) if "-s" in ops: sh(ONTO) if __name__ == '__main__': main() _author_ ="J.Joao"