Select Git revision
-
Henrik tom Wörden authoredHenrik tom Wörden authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
crawl.py 23.06 KiB
#!/usr/bin/env python3
# encoding: utf-8
#
# ** header v3.0
# This file is a part of the CaosDB Project.
#
# Copyright (C) 2021 Henrik tom Wörden
# 2021 Alexander Schlemmer
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
# ** end header
#
"""
Data that is contained in a hierarchical structure is converted to a data
structure that is consistent with a predefined semantic data model.
The hierarchical sturcture can be for example a file tree. However it can be
also something different like the contents of a json file or a file tree with
json files.
This hierarchical structure is assumed to be consituted of a tree of
StructureElements. The tree is created on the fly by so called Converters which
are defined in a yaml file. The tree of StructureElements is a model
of the existing data (For example could a tree of Python file objects
(StructureElements) represent a file tree that exists on some file server).
Converters treat StructureElements and thereby create the StructureElement that
are the children of the treated StructureElement. Converters therefore create
the above named tree. The definition of a Converter also contains what
Converters shall be used to treat the generated child-StructureElements. The
definition is there a tree itself. (Question: Should there be global Converters
that are always checked when treating a StructureElement? Should Converters be
associated with generated child-StructureElements? Currently, all children are
created and checked against all Converters. It could be that one would like to
check file-StructureElements against one set of Converters and
directory-StructureElements against another)
Each StructureElement in the tree has a set of data values, i.e a dictionary of
key value pairs.
Some of those values are set due to the kind of StructureElement. For example,
a file could have the file name as such a key value pair: 'filename': <sth>.
Converters may define additional functions that create further values. For
example, a regular expresion could be used to get a date from a file name.
"""
import sys
import os
import yaml
import argparse
from argparse import RawTextHelpFormatter
import caosdb as db
from caosdb.common.datatype import is_reference
from .stores import GeneralStore, RecordStore
from .identified_cache import IdentifiedCache
from .structure_elements import StructureElement, Directory
from .converters import Converter, DirectoryConverter
from .identifiable_adapters import IdentifiableAdapter, LocalStorageIdentifiableAdapter
from collections import defaultdict
from typing import Union, Any, Optional
from caosdb.apiutils import compare_entities
from copy import deepcopy
class Crawler(object):
"""
Crawler class that encapsulates crawling functions.
Furthermore it keeps track of the storage for records (record store) and the
storage for values (general store).
"""
def __init__(self, converters: list[Converter] = [],
generalStore: Optional[GeneralStore] = None,
debug: bool = False,
identifiableAdapter: IdentifiableAdapter = None):
"""
Create a new crawler and initialize an empty RecordStore and GeneralStore.
converters: The set of converters used for this crawler.
recordStore: An initial GeneralStore which might store e.g. environment variables.
debug: Create a debugging information tree when set to True.
The debugging information tree is a variable stored in
self.debug_tree. It is a dictionary mapping directory entries
to a tuple of general stores and record stores which are valid for the directory scope.
Furthermore, it is stored in a second tree named self.debug_copied whether the
objects in debug_tree had been copied from a higher level in the hierarchy
of the structureelements.
"""
self.global_converters = converters
self.identified_cache = IdentifiedCache()
self.recordStore = RecordStore()
self.generalStore = generalStore
if generalStore is None:
self.generalStore = GeneralStore()
self.identifiableAdapter = identifiableAdapter
if identifiableAdapter is None:
self.identifiableAdapter = LocalStorageIdentifiableAdapter()
self.debug = debug
if self.debug:
# order in the tuple:
# 0: generalStore
# 1: recordStore
self.debug_tree: dict[str, tuple] = dict()
self.debug_metadata: dict[str, dict] = dict()
self.debug_metadata["copied"] = dict()
self.debug_metadata["provenance"] = defaultdict(lambda: dict())
self.debug_metadata["usage"] = defaultdict(lambda: set())
def crawl_directory(self, dirname: str, crawler_definition_path: str):
""" Crawl a single directory.
Convenience function that starts the crawler (calls start_crawling)
with a single cirectory as the StructureElement.
"""
# Load the cfood from a yaml file:
with open(crawler_definition_path, "r") as f:
crawler_definition = yaml.load(f, Loader=yaml.SafeLoader)
self.start_crawling(Directory(os.path.basename(dirname),
dirname),
crawler_definition)
@staticmethod
def create_local_converters(crawler_definition: dict):
local_converters = []
for key, value in crawler_definition.items():
if key == "Definitions":
continue
local_converters.append(Converter.converter_factory(value, key))
return local_converters
def start_crawling(self, item: StructureElement,
crawler_definition: dict):
"""
Start point of the crawler recursion.
item: A structure element that is used for generating the initial items for the crawler.
This could e.g. be a Directory.
crawler_definition: A dictionary representing the crawler definition, possibly from a yaml
file.
Return the final update list.
"""
# This function builds the tree of converters out of the crawler definition.
if not isinstance(item, Directory):
raise NotImplementedError("Currently only directories are supported as items.")
if self.generalStore is None:
raise RuntimeError("Should not happen.")
local_converters = Crawler.create_local_converters(crawler_definition)
# This recursive crawling procedure generates the update list:
self.updateList: list[db.Record] = []
self._crawl(DirectoryConverter.create_children_from_directory(item),
self.global_converters, local_converters, self.generalStore, self.recordStore,
[], [])
if self.debug:
self.debug_converters = self.global_converters + local_converters
return self.updateList
def synchronize(self):
"""
Carry out the actual synchronization.
"""
# After the crawling, the actual synchronization with the database, based on the
# update list is carried out:
return self._synchronize(self.updateList)
def can_be_checked_externally(self, record: db.Record):
"""
Returns False if there is at least one property in record which:
a) is a reference property AND
b) where the value is set to a db.Entity (instead of an ID) AND
c) where the ID of the value is not set (to an integer)
Returns True otherwise.
"""
for p in record.properties:
# TODO: implement for lists?
if (is_reference(p) and isinstance(p.value, db.Entity)
and p.value.id is None):
return False
return True
def create_flat_list(self, ent_list: list[db.Entity], flat: list[db.Entity]):
"""
Recursively adds all properties contained in entities from ent_list to
the output list flat. Each element will only be added once to the list.
"""
for ent in ent_list:
for p in ent.properties:
# TODO: implement for lists?
if isinstance(p.value, db.Entity):
if p.value not in flat:
flat.append(p.value)
self.create_flat_list([p.value], flat)
def all_references_are_existing_already(self, record):
"""
returns true if all references either have IDs or were checked remotely and not found (i.e.
they exist in the local cache)
"""
for p in record.properties:
if (is_reference(p)
# Entity instead of ID and not cached locally
# TODO: implement for lists?
and isinstance(p.value, db.Entity)
and p.value.id is None
and self.get_identified_record_from_local_cache(p.value) is None):
# might be checked when reference is resolved
return False
return True
def get_identified_record_from_local_cache(self, record: db.Record):
"""
returns the identifiable if an identifiable with the same values already exists locally
(Each identifiable that is not found on the remote server, is 'cached' locally to prevent
that the same identifiable exists twice)
"""
identifiable = self.identifiableAdapter.get_identifiable(record)
if identifiable is None:
return None
if identifiable in self.identified_cache:
return self.identified_cache[identifiable]
else:
return None
def add_identified_record_to_local_cache(self, record: db.Record):
"""
adds the given identifiable to the local cache
No identifiable with the same values must exist locally.
(Each identifiable that is not found on the remote server, is 'cached' locally to prevent
that the same identifiable exists twice)
"""
identifiable = self.identifiableAdapter.get_identifiable(record)
if identifiable is None:
raise RuntimeError()
self.identified_cache.add(identifiable=identifiable, record=record)
def copy_attributes(self, fro: db.Entity, to: db.Entity):
raise NotImplementedError()
def split_into_inserts_and_updates(self, ent_list: list[db.Entity]):
to_be_inserted = []
to_be_updated = []
flat = list(ent_list)
# assure all entities are direct members TODO Can this be removed at some point?Check only?
self.create_flat_list(ent_list, flat)
# TODO: can the following be removed at some point
for ent in flat:
if len(ent.parents) == 0:
raise RuntimeError("Records must have a parent.")
resolved_references = True
# flat contains Entities which could not yet be checked against the remote server
while resolved_references and len(flat) > 0:
resolved_references = False
for i in reversed(range(len(flat))):
record = flat[i]
# TODO remove if the exception is never raised
if (record.id is not None or record in to_be_inserted):
raise Exception("This should not be reached since treated elements are removed"
" from the list")
# Check the local cache first for duplicate
elif self.get_identified_record_from_local_cache(record) is not None:
# This record is a duplicate that can be removed. Make sure we do not lose
# information
# Update an (local) identified record that will be inserted
self.copy_attributes(
fro=record, to=self.get_identified_record_from_local_cache(record))
del flat[i]
continue
# all references need to be IDs that exist on the remote server
elif self.can_be_checked_externally(record):
# Check remotely
identified_record = self.identifiableAdapter.retrieve_identifiable(
deepcopy(record))
if identified_record is None:
# identifiable does not exist remote
to_be_inserted.append(record)
self.add_identified_record_to_local_cache(record)
del flat[i]
else:
# side effect
record.id = identified_record.id
to_be_updated.append(record)
# TODO think this through
# self.add_identified_record_to_local_cache(record)
del flat[i]
resolved_references = True
# e.g. references an identifiable that does not exist remotely
elif self.all_references_are_existing_already(record):
to_be_inserted.append(record)
self.add_identified_record_to_local_cache(record)
del flat[i]
resolved_references = True
if len(flat) > 0:
raise RuntimeError("Could not resolve all Entity references. Circular Dependency?")
return to_be_inserted, to_be_updated
def replace_entities_by_ids(self, rec: db.Record):
for el in rec.properties:
if isinstance(el.value, db.Entity):
el.value = el.value.id
elif isinstance(el.value, list):
for index, val in enumerate(el.value):
if isinstance(val, db.Entity):
el.value[index] = val.id
def remove_unnecessary_updates(self, updateList: list[db.Record]):
"""
checks whether all relevant attributes (especially Property values) are equal
"""
for i in reversed(range(len(updateList))):
record = updateList[i]
identifiable = self.identifiableAdapter.retrieve_identifiable(record)
comp = compare_entities(record, identifiable)
identical = True
for j in range(2):
# TODO: should be implemented elsewhere (?)
for label in ("parents", ):
if len(comp[j][label]) > 0:
identical = False
break
if not identical:
break
for key in comp[0]["properties"]:
for attribute in ("datatype", "importance", "unit"):
if attribute in comp[0]["properties"][key]:
attr_val = comp[0]["properties"][key][attribute]
other_attr_val = (comp[1]["properties"][key][attribute]
if attribute in comp[1]["properties"][key] else None)
if attr_val is not None and atrr_val != other_attr_val:
identical = False
break
if "value" in comp[0]["properties"][key]:
identical = False
if not identical:
break
if identical:
del updateList[i]
continue
else:
pass
def _synchronize(self, updateList: list[db.Record]):
"""
This function applies several stages:
1) Retrieve identifiables for all records in updateList.
2) Compare updateList with existing records.
3) Insert and update records based on the set of identified differences.
This function makes use of an IdentifiableAdapter which is used to retrieve
register and retrieve identifiables.
Return the final insertList and updateList as tuple.
"""
if self.identifiableAdapter is None:
raise RuntimeError("Should not happen.")
to_be_inserted, to_be_updated = self.split_into_inserts_and_updates(updateList)
# remove unnecessary updates from list
for el in to_be_updated:
self.replace_entities_by_ids(el)
self.remove_unnecessary_updates(to_be_updated)
# TODO
# self.execute_inserts_in_list(to_be_inserted)
# self.execute_updates_in_list(to_be_updated)
return (to_be_inserted, to_be_updated)
@staticmethod
def debug_build_usage_tree(converter: Converter):
res: dict[str, dict[str, Any]] = {
converter.name: {
"usage": ", ".join(converter.metadata["usage"]),
"subtree": {}
}
}
for subconv in converter.converters:
d = Crawler.debug_build_usage_tree(subconv)
k = list(d.keys())
if len(k) != 1:
raise RuntimeError("Unkonwn error during building of usage tree.")
res[converter.name]["subtree"][k[0]] = d[k[0]]
return res
def save_debug_data(self, filename: str):
paths: dict[str, Union[dict, list]] = dict()
def flatten_debug_info(key):
mod_info = self.debug_metadata[key]
paths[key] = dict()
for record_name in mod_info:
if key == "provenance":
paths[key][record_name] = dict()
for prop_name in mod_info[record_name]:
paths[key][record_name][prop_name] = {
"structure_elements_path": "/".join(
mod_info[record_name][prop_name][0]),
"converters_path": "/".join(
mod_info[record_name][prop_name][1])}
elif key == "usage":
paths[key][record_name] = ", ".join(mod_info[record_name])
for key in ("provenance", "usage"):
flatten_debug_info(key)
paths["converters_usage"] = [self.debug_build_usage_tree(
cv) for cv in self.debug_converters]
with open(filename, "w") as f:
f.write(yaml.dump(paths, sort_keys=False))
def _crawl(self, items: list[StructureElement],
global_converters: list[Converter],
local_converters: list[Converter],
generalStore: GeneralStore,
recordStore: RecordStore,
structure_elements_path: list[str], converters_path: list[str]):
"""
Crawl a list of StructureElements and apply any matching converters.
items: structure_elements (e.g. files and folders on one level on the hierarchy)
global_converters and local_converters: globally or locally defined converters for
treating structure elements. A locally defined converter could be
one that is only valid for a specific subtree of the originally
cralwed StructureElement structure.
generalStore and recordStore: This recursion of the crawl function should only operate on copies of the
global stores of the Crawler object.
"""
for element in items:
for converter in global_converters + local_converters:
# type is something like "matches files", replace isinstance with "type_matches"
# match function tests regexp for example
if (converter.typecheck(element) and
converter.match(element) is not None):
generalStore_copy = generalStore.create_scoped_copy()
recordStore_copy = recordStore.create_scoped_copy()
# extracts values from structure element and stores them in the converter
# this could e.g. be the storage of a variable from the regexp in the
# converter object or the extraction from values from a file and storage
# in the converter object
# -> rather store it in the variable storage than in the converter?
converter.create_values(generalStore_copy, element)
keys_modified = converter.create_records(
generalStore_copy, recordStore_copy, element)
children = converter.create_children(generalStore_copy, element)
if self.debug:
# add provenance information for each varaible
self.debug_tree[str(element)] = (
generalStore_copy.get_storage(), recordStore_copy.get_storage())
self.debug_metadata["copied"][str(element)] = (
generalStore_copy.get_dict_copied(), recordStore_copy.get_dict_copied())
self.debug_metadata["usage"][str(element)].add(
"/".join(converters_path + [converter.name]))
mod_info = self.debug_metadata["provenance"]
for record_name, prop_name in keys_modified:
# TODO: check
internal_id = recordStore_copy.get_internal_id(record_name)
record_identifier = record_name + "_" + str(internal_id)
converter.metadata["usage"].add(record_identifier)
mod_info[record_identifier][prop_name] = (structure_elements_path + [element.get_name()],
converters_path + [converter.name])
self._crawl(children, global_converters, converter.converters,
generalStore_copy, recordStore_copy,
structure_elements_path + [element.get_name()],
converters_path + [converter.name])
# if the crawler is running out of scope, copy all records in the recordStore, that were created in this scope
# to the general update container.
scoped_records = recordStore.get_records_current_scope()
for record in scoped_records:
self.updateList.append(record)
return self.updateList
def main(*args):
pass
def parse_args():
parser = argparse.ArgumentParser(description=__doc__,
formatter_class=RawTextHelpFormatter)
parser.add_argument("path",
help="the subtree of files below the given path will "
"be considered. Use '/' for everything.")
return parser.parse_args()
if __name__ == "__main__":
args = parse_args()
sys.exit(main(*args))