Skip to content
Snippets Groups Projects
Commit 52c13e08 authored by Henrik tom Wörden's avatar Henrik tom Wörden
Browse files

ENH: added files for import and export

parent 2ac36963
No related branches found
No related tags found
No related merge requests found
......@@ -3,3 +3,4 @@ __pycache__
.coverage
cache.db
*.egg-info
.docker/cert
import argparse
from lxml import etree
import caosdb as db
def type_converted_list(val, pr):
# TODO exists in apiutils
"""Convert a list to a python list of the correct type."""
prrealpre = pr.replace("&lt;", "<").replace("&gt;", ">")
prreal = prrealpre[prrealpre.index("<") + 1:prrealpre.rindex(">")]
lst = [type_converted_value(i, prreal) for i in val]
return ([i[0] for i in lst], lst[0][1])
def type_converted_value(val, pr):
# TODO exists in apiutils
"""Convert val to the correct type which is indicated by the database
type string in pr.
Returns a tuple with two entries:
- the converted value
- True if the value has to be interpreted as an id acting as a reference
"""
if val is None:
return (None, False)
elif pr == db.DOUBLE:
return (float(val), False)
elif pr == db.BOOLEAN:
return (bool(val), False)
elif pr == db.INTEGER:
return (int(val), False)
elif pr == db.TEXT:
return (val, False)
elif pr == db.FILE:
return (int(val), True)
elif pr == db.REFERENCE:
return (int(val), True)
elif pr == db.DATETIME:
return (val, False)
elif pr == db.TIMESPAN:
return (val, False)
elif pr[0:4] == "LIST":
return type_converted_list(val, pr)
else:
return (int(val), True)
"""
allows to create a revisionOf record
"""
def id_query(ids):
q = "FIND Entity with " + " OR ".join(["id={}".format(id) for id in ids])
return db.execute_query(q)
def retrieve_multiple_entities(entities):
collection = db.Container()
step = 20
for i in range(len(entities)//step+1):
collection.extend(id_query(entities[i:i+step]))
return collection
def get_ids_of_related_entities(entity):
entities = []
for par in entity.parents:
entities.append(par.id)
for prop in entity.properties:
entities.append(prop.id)
value, isref = type_converted_value(prop.value, prop.datatype)
if isref:
if isinstance(value, list):
entities.extend(value)
else:
entities.append(value)
return entities
def recursively_collect_related(entity):
all_entities = db.Container()
all_entities.append(entity)
ids = set([entity.id])
new_entities = [entity]
while new_entities:
new_ids = set()
for ent in new_entities:
new_ids.update(get_ids_of_related_entities(ent))
new_ids = [id for id in new_ids if id not in ids]
new_entities = retrieve_multiple_entities(list(new_ids))
ids.update([e.id for e in new_entities])
all_entities.extend(new_entities)
return all_entities
def invert_ids(entity):
apply_to_ids(entity, lambda x: x*-1)
def apply_to_ids(entity, func):
entity.id = func(entity.id)
for par in entity.parents:
par.id = func(par.id)
for prop in entity.properties:
prop.id = func(prop.id)
value, isref = type_converted_value(prop.value, prop.datatype)
if isref:
if isinstance(value, list):
prop.value = [func(el) for el in prop.value]
else:
prop.value = func(prop.value)
def main(rec_id):
ent = db.execute_query("FIND {}".format(rec_id), unique=True)
cont = recursively_collect_related(ent)
for ent in cont:
invert_ids(ent)
xml = etree.tounicode(cont.to_xml(
local_serialization=True), pretty_print=True)
with open("test.txt", "w") as fi:
fi.write(xml)
def defineParser():
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
'-i',
'--id',
type=int,
help='the id of the record that shall be copied and then changed')
return parser
if __name__ == "__main__":
parser = defineParser()
args = parser.parse_args()
main(args.id)
import argparse
from tempfile import NamedTemporaryFile
from caosmodels.data_model import DataModel
from lxml import etree
import caosdb as db
from export_related import apply_to_ids
def main(filename):
cont = db.Container()
with open(filename) as fi:
cont = cont.from_xml(fi.read())
tmpfile = NamedTemporaryFile(delete=False)
tmpfile.close()
model = []
for el in cont:
if isinstance(el, db.File):
el.file = tmpfile.name
if (isinstance(el, db.Property) or isinstance(el, db.RecordType)):
model.append(el)
for el in model:
cont.remove(el)
for el in model:
if el.name is not None:
ret = db.execute_query("FIND {}".format(el.name))
# TODO incorrect
# if len(ret) > 0 and not isinstance(ret[0], type(el)):
# print("The following entities have the same name as the current one:")
# for r in ret:
# print(r)
# if "y" == input("Do you want to insert\n"+str(el)):
# el.insert(unique=False)
id_mapping = {}
for el in model:
id_mapping[el.id] = el
datamodel = DataModel()
datamodel.extend(model)
datamodel.sync_data_model()
def replace_by_new(old):
if old in id_mapping:
return id_mapping[old].id
else:
return old
print(cont[:5])
for el in cont:
apply_to_ids(el, replace_by_new)
print(cont[:5])
cont.insert()
def defineParser():
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("file", help='file to be imported')
return parser
if __name__ == "__main__":
parser = defineParser()
args = parser.parse_args()
main(args.file)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment