Skip to content
Snippets Groups Projects
Select Git revision
  • ddb61975da88cf35afe86241fa6ce8d3e3e973e7
  • main default protected
  • dev
  • f-unmod
  • f-checkidentical
  • f-simple-breakpoint
  • f-new-debug-tree
  • f-existing-file-id
  • f-no-ident
  • f-collect-problems
  • f-refactor-debug-tree
  • v0.13.0
  • v0.12.0
  • v0.11.0
  • v0.10.1
  • v0.10.0
  • v0.9.1
  • v0.9.0
  • v0.8.0
  • v0.7.1
  • v0.7.0
  • v0.6.0
  • v0.5.0
  • v0.4.0
  • v0.3.0
  • v0.2.0
  • v0.1.0
27 results

crawl.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    crawl.py 23.06 KiB
    #!/usr/bin/env python3
    # encoding: utf-8
    #
    # ** header v3.0
    # This file is a part of the CaosDB Project.
    #
    # Copyright (C) 2021 Henrik tom Wörden
    #               2021 Alexander Schlemmer
    #
    # This program is free software: you can redistribute it and/or modify
    # it under the terms of the GNU Affero General Public License as
    # published by the Free Software Foundation, either version 3 of the
    # License, or (at your option) any later version.
    #
    # This program is distributed in the hope that it will be useful,
    # but WITHOUT ANY WARRANTY; without even the implied warranty of
    # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    # GNU Affero General Public License for more details.
    #
    # You should have received a copy of the GNU Affero General Public License
    # along with this program. If not, see <https://www.gnu.org/licenses/>.
    #
    # ** end header
    #
    
    """
    Data that is contained in a hierarchical structure is converted to a data
    structure that is consistent with a predefined semantic data model.
    
    The hierarchical sturcture can be for example a file tree. However it can be
    also something different like the contents of a json file or a file tree with
    json files.
    
    
    This hierarchical structure is assumed to be consituted of a tree of
    StructureElements. The tree is created on the fly by so called Converters which
    are defined in a yaml file. The tree of StructureElements is a model
    of the existing data (For example could a tree of Python file objects
    (StructureElements) represent a file tree that exists on some file server).
    
    Converters treat StructureElements and thereby create the StructureElement that
    are the children of the treated StructureElement. Converters therefore create
    the above named tree. The definition of a Converter also contains what
    Converters shall be used to treat the generated child-StructureElements. The
    definition is there a tree itself. (Question: Should there be global Converters
    that are always checked when treating a StructureElement? Should Converters be
    associated with generated child-StructureElements? Currently, all children are
    created and checked against all Converters. It could be that one would like to
    check file-StructureElements against one set of Converters and
    directory-StructureElements against another)
    
    Each StructureElement in the tree has a set of data values, i.e a dictionary of
    key value pairs.
    Some of those values are set due to the kind of StructureElement. For example,
    a file could have the file name as such a key value pair: 'filename': <sth>.
    Converters may define additional functions that create further values. For
    example, a regular expresion could be used to get a date from a file name.
    
    
    """
    
    import sys
    import os
    import yaml
    import argparse
    from argparse import RawTextHelpFormatter
    import caosdb as db
    from caosdb.common.datatype import is_reference
    from .stores import GeneralStore, RecordStore
    from .identified_cache import IdentifiedCache
    from .structure_elements import StructureElement, Directory
    from .converters import Converter, DirectoryConverter
    from .identifiable_adapters import IdentifiableAdapter, LocalStorageIdentifiableAdapter
    from collections import defaultdict
    from typing import Union, Any, Optional
    from caosdb.apiutils import compare_entities
    from copy import deepcopy
    
    
    class Crawler(object):
        """
        Crawler class that encapsulates crawling functions.
        Furthermore it keeps track of the storage for records (record store) and the
        storage for values (general store).
        """
    
        def __init__(self, converters: list[Converter] = [],
                     generalStore: Optional[GeneralStore] = None,
                     debug: bool = False,
                     identifiableAdapter: IdentifiableAdapter = None):
            """
            Create a new crawler and initialize an empty RecordStore and GeneralStore.
    
            converters: The set of converters used for this crawler.
            recordStore: An initial GeneralStore which might store e.g. environment variables.
    
            debug: Create a debugging information tree when set to True.
                   The debugging information tree is a variable stored in
                   self.debug_tree. It is a dictionary mapping directory entries
                   to a tuple of general stores and record stores which are valid for the directory scope.
                   Furthermore, it is stored in a second tree named self.debug_copied whether the
                   objects in debug_tree had been copied from a higher level in the hierarchy
                   of the structureelements.
            """
            self.global_converters = converters
    
            self.identified_cache = IdentifiedCache()
            self.recordStore = RecordStore()
    
            self.generalStore = generalStore
            if generalStore is None:
                self.generalStore = GeneralStore()
    
            self.identifiableAdapter = identifiableAdapter
            if identifiableAdapter is None:
                self.identifiableAdapter = LocalStorageIdentifiableAdapter()
    
            self.debug = debug
            if self.debug:
                # order in the tuple:
                # 0: generalStore
                # 1: recordStore
                self.debug_tree: dict[str, tuple] = dict()
                self.debug_metadata: dict[str, dict] = dict()
                self.debug_metadata["copied"] = dict()
                self.debug_metadata["provenance"] = defaultdict(lambda: dict())
                self.debug_metadata["usage"] = defaultdict(lambda: set())
    
        def crawl_directory(self, dirname: str, crawler_definition_path: str):
            """ Crawl a single directory.
    
            Convenience function that starts the crawler (calls start_crawling)
            with a single cirectory as the StructureElement.
            """
    
            # Load the cfood from a yaml file:
            with open(crawler_definition_path, "r") as f:
                crawler_definition = yaml.load(f, Loader=yaml.SafeLoader)
    
            self.start_crawling(Directory(os.path.basename(dirname),
                                          dirname),
                                crawler_definition)
    
        @staticmethod
        def create_local_converters(crawler_definition: dict):
            local_converters = []
    
            for key, value in crawler_definition.items():
                if key == "Definitions":
                    continue
                local_converters.append(Converter.converter_factory(value, key))
    
            return local_converters
    
        def start_crawling(self, item: StructureElement,
                           crawler_definition: dict):
            """
            Start point of the crawler recursion.
    
            item: A structure element that is used for generating the initial items for the crawler.
                  This could e.g. be a Directory.
            crawler_definition: A dictionary representing the crawler definition, possibly from a yaml
                  file.
    
            Return the final update list.
            """
    
            # This function builds the tree of converters out of the crawler definition.
    
            if not isinstance(item, Directory):
                raise NotImplementedError("Currently only directories are supported as items.")
    
            if self.generalStore is None:
                raise RuntimeError("Should not happen.")
    
            local_converters = Crawler.create_local_converters(crawler_definition)
            # This recursive crawling procedure generates the update list:
            self.updateList: list[db.Record] = []
            self._crawl(DirectoryConverter.create_children_from_directory(item),
                        self.global_converters, local_converters, self.generalStore, self.recordStore,
                        [], [])
    
            if self.debug:
                self.debug_converters = self.global_converters + local_converters
    
            return self.updateList
    
        def synchronize(self):
            """
            Carry out the actual synchronization.
            """
    
            # After the crawling, the actual synchronization with the database, based on the
            # update list is carried out:
    
            return self._synchronize(self.updateList)
    
        def can_be_checked_externally(self, record: db.Record):
            """
            Returns False if there is at least one property in record which:
            a) is a reference property AND
            b) where the value is set to a db.Entity (instead of an ID) AND
            c) where the ID of the value is not set (to an integer)
    
            Returns True otherwise.
            """
            for p in record.properties:
                # TODO: implement for lists?
                if (is_reference(p) and isinstance(p.value, db.Entity)
                        and p.value.id is None):
                    return False
            return True
    
        def create_flat_list(self, ent_list: list[db.Entity], flat: list[db.Entity]):
            """
            Recursively adds all properties contained in entities from ent_list to
            the output list flat. Each element will only be added once to the list.
            """
            for ent in ent_list:
                for p in ent.properties:
                    # TODO: implement for lists?
                    if isinstance(p.value, db.Entity):
                        if p.value not in flat:
                            flat.append(p.value)
                        self.create_flat_list([p.value], flat)
    
        def all_references_are_existing_already(self, record):
            """
            returns true if all references either have IDs or were checked remotely and not found (i.e.
            they exist in the local cache)
            """
            for p in record.properties:
                if (is_reference(p)
                    # Entity instead of ID and not cached locally
                    # TODO: implement for lists?
                        and isinstance(p.value, db.Entity)
                    and p.value.id is None
                        and self.get_identified_record_from_local_cache(p.value) is None):
                    # might be checked when reference is resolved
                    return False
            return True
    
        def get_identified_record_from_local_cache(self, record: db.Record):
            """
            returns the identifiable if an identifiable with the same values already exists locally
            (Each identifiable that is not found on the remote server, is 'cached' locally to prevent
            that the same identifiable exists twice)
            """
            identifiable = self.identifiableAdapter.get_identifiable(record)
            if identifiable is None:
                return None
            if identifiable in self.identified_cache:
                return self.identified_cache[identifiable]
            else:
                return None
    
        def add_identified_record_to_local_cache(self, record: db.Record):
            """
            adds the given identifiable to the local cache
    
            No identifiable with the same values must exist locally.
            (Each identifiable that is not found on the remote server, is 'cached' locally to prevent
            that the same identifiable exists twice)
            """
            identifiable = self.identifiableAdapter.get_identifiable(record)
            if identifiable is None:
                raise RuntimeError()
            self.identified_cache.add(identifiable=identifiable, record=record)
    
        def copy_attributes(self, fro: db.Entity, to: db.Entity):
            raise NotImplementedError()
    
        def split_into_inserts_and_updates(self, ent_list: list[db.Entity]):
            to_be_inserted = []
            to_be_updated = []
            flat = list(ent_list)
            # assure all entities are direct members TODO Can this be removed at some point?Check only?
            self.create_flat_list(ent_list, flat)
    
            # TODO: can the following be removed at some point
            for ent in flat:
                if len(ent.parents) == 0:
                    raise RuntimeError("Records must have a parent.")
    
            resolved_references = True
            # flat contains Entities which could not yet be checked against the remote server
            while resolved_references and len(flat) > 0:
                resolved_references = False
    
                for i in reversed(range(len(flat))):
                    record = flat[i]
    
                    # TODO remove if the exception is never raised
                    if (record.id is not None or record in to_be_inserted):
                        raise Exception("This should not be reached since treated elements are removed"
                                        " from the list")
                    # Check the local cache first for duplicate
                    elif self.get_identified_record_from_local_cache(record) is not None:
                        # This record is a duplicate that can be removed. Make sure we do not lose
                        # information
                        # Update an (local) identified record that will be inserted
                        self.copy_attributes(
                            fro=record, to=self.get_identified_record_from_local_cache(record))
                        del flat[i]
                        continue
    
                    # all references need to be IDs that exist on the remote server
                    elif self.can_be_checked_externally(record):
    
                        # Check remotely
                        identified_record = self.identifiableAdapter.retrieve_identifiable(
                            deepcopy(record))
                        if identified_record is None:
                            # identifiable does not exist remote
                            to_be_inserted.append(record)
                            self.add_identified_record_to_local_cache(record)
                            del flat[i]
                        else:
                            # side effect
                            record.id = identified_record.id
                            to_be_updated.append(record)
                            # TODO think this through
                            # self.add_identified_record_to_local_cache(record)
                            del flat[i]
                        resolved_references = True
    
                    # e.g. references an identifiable that does not exist remotely
                    elif self.all_references_are_existing_already(record):
                        to_be_inserted.append(record)
                        self.add_identified_record_to_local_cache(record)
                        del flat[i]
                        resolved_references = True
            if len(flat) > 0:
                raise RuntimeError("Could not resolve all Entity references. Circular Dependency?")
    
            return to_be_inserted, to_be_updated
    
        def replace_entities_by_ids(self, rec: db.Record):
            for el in rec.properties:
                if isinstance(el.value, db.Entity):
                    el.value = el.value.id
                elif isinstance(el.value, list):
                    for index, val in enumerate(el.value):
                        if isinstance(val, db.Entity):
                            el.value[index] = val.id
    
        def remove_unnecessary_updates(self, updateList: list[db.Record]):
            """
            checks whether all relevant attributes (especially Property values) are equal
            """
            for i in reversed(range(len(updateList))):
                record = updateList[i]
                identifiable = self.identifiableAdapter.retrieve_identifiable(record)
    
                comp = compare_entities(record, identifiable)
                identical = True
                for j in range(2):
                    # TODO: should be implemented elsewhere (?)
                    for label in ("parents", ):
                        if len(comp[j][label]) > 0:
                            identical = False
                            break
                    if not identical:
                        break
                for key in comp[0]["properties"]:
                    for attribute in ("datatype", "importance", "unit"):
    
                        if attribute in comp[0]["properties"][key]:
                            attr_val = comp[0]["properties"][key][attribute]
                            other_attr_val = (comp[1]["properties"][key][attribute]
                                              if attribute in comp[1]["properties"][key] else None)
                            if attr_val is not None and atrr_val != other_attr_val:
                                identical = False
                                break
    
                    if "value" in comp[0]["properties"][key]:
                        identical = False
    
                    if not identical:
                        break
    
                if identical:
                    del updateList[i]
                    continue
                else:
                    pass
    
        def _synchronize(self, updateList: list[db.Record]):
            """
            This function applies several stages:
            1) Retrieve identifiables for all records in updateList.
            2) Compare updateList with existing records.
            3) Insert and update records based on the set of identified differences.
    
            This function makes use of an IdentifiableAdapter which is used to retrieve
            register and retrieve identifiables.
    
            Return the final insertList and updateList as tuple.
            """
    
            if self.identifiableAdapter is None:
                raise RuntimeError("Should not happen.")
    
            to_be_inserted, to_be_updated = self.split_into_inserts_and_updates(updateList)
    
            # remove unnecessary updates from list
            for el in to_be_updated:
                self.replace_entities_by_ids(el)
    
            self.remove_unnecessary_updates(to_be_updated)
    
            # TODO
            # self.execute_inserts_in_list(to_be_inserted)
            # self.execute_updates_in_list(to_be_updated)
    
            return (to_be_inserted, to_be_updated)
    
        @staticmethod
        def debug_build_usage_tree(converter: Converter):
            res: dict[str, dict[str, Any]] = {
                converter.name: {
                    "usage": ", ".join(converter.metadata["usage"]),
                    "subtree": {}
                }
            }
    
            for subconv in converter.converters:
                d = Crawler.debug_build_usage_tree(subconv)
                k = list(d.keys())
                if len(k) != 1:
                    raise RuntimeError("Unkonwn error during building of usage tree.")
                res[converter.name]["subtree"][k[0]] = d[k[0]]
            return res
    
        def save_debug_data(self, filename: str):
            paths: dict[str, Union[dict, list]] = dict()
    
            def flatten_debug_info(key):
                mod_info = self.debug_metadata[key]
                paths[key] = dict()
                for record_name in mod_info:
                    if key == "provenance":
                        paths[key][record_name] = dict()
                        for prop_name in mod_info[record_name]:
                            paths[key][record_name][prop_name] = {
                                "structure_elements_path": "/".join(
                                    mod_info[record_name][prop_name][0]),
                                "converters_path": "/".join(
                                    mod_info[record_name][prop_name][1])}
                    elif key == "usage":
                        paths[key][record_name] = ", ".join(mod_info[record_name])
            for key in ("provenance", "usage"):
                flatten_debug_info(key)
    
            paths["converters_usage"] = [self.debug_build_usage_tree(
                cv) for cv in self.debug_converters]
    
            with open(filename, "w") as f:
                f.write(yaml.dump(paths, sort_keys=False))
    
        def _crawl(self, items: list[StructureElement],
                   global_converters: list[Converter],
                   local_converters: list[Converter],
                   generalStore: GeneralStore,
                   recordStore: RecordStore,
                   structure_elements_path: list[str], converters_path: list[str]):
            """
            Crawl a list of StructureElements and apply any matching converters.
    
            items: structure_elements (e.g. files and folders on one level on the hierarchy)
            global_converters and local_converters: globally or locally defined converters for
                                treating structure elements. A locally defined converter could be
                                one that is only valid for a specific subtree of the originally
                                cralwed StructureElement structure.
            generalStore and recordStore: This recursion of the crawl function should only operate on copies of the
                                global stores of the Crawler object.
            """
            for element in items:
                for converter in global_converters + local_converters:
                    # type is something like "matches files", replace isinstance with "type_matches"
                    # match function tests regexp for example
                    if (converter.typecheck(element) and
                            converter.match(element) is not None):
                        generalStore_copy = generalStore.create_scoped_copy()
                        recordStore_copy = recordStore.create_scoped_copy()
                        # extracts values from structure element and stores them in the converter
                        # this could e.g. be the storage of a variable from the regexp in the
                        # converter object or the extraction from values from a file and storage
                        # in the converter object
                        # -> rather store it in the variable storage than in the converter?
                        converter.create_values(generalStore_copy, element)
    
                        keys_modified = converter.create_records(
                            generalStore_copy, recordStore_copy, element)
    
                        children = converter.create_children(generalStore_copy, element)
                        if self.debug:
                            # add provenance information for each varaible
                            self.debug_tree[str(element)] = (
                                generalStore_copy.get_storage(), recordStore_copy.get_storage())
                            self.debug_metadata["copied"][str(element)] = (
                                generalStore_copy.get_dict_copied(), recordStore_copy.get_dict_copied())
                            self.debug_metadata["usage"][str(element)].add(
                                "/".join(converters_path + [converter.name]))
                            mod_info = self.debug_metadata["provenance"]
                            for record_name, prop_name in keys_modified:
                                # TODO: check
                                internal_id = recordStore_copy.get_internal_id(record_name)
                                record_identifier = record_name + "_" + str(internal_id)
                                converter.metadata["usage"].add(record_identifier)
                                mod_info[record_identifier][prop_name] = (structure_elements_path + [element.get_name()],
                                                                          converters_path + [converter.name])
    
                        self._crawl(children, global_converters, converter.converters,
                                    generalStore_copy, recordStore_copy,
                                    structure_elements_path + [element.get_name()],
                                    converters_path + [converter.name])
            # if the crawler is running out of scope, copy all records in the recordStore, that were created in this scope
            # to the general update container.
            scoped_records = recordStore.get_records_current_scope()
            for record in scoped_records:
                self.updateList.append(record)
            return self.updateList
    
    
    def main(*args):
        pass
    
    
    def parse_args():
        parser = argparse.ArgumentParser(description=__doc__,
                                         formatter_class=RawTextHelpFormatter)
        parser.add_argument("path",
                            help="the subtree of files below the given path will "
                            "be considered. Use '/' for everything.")
    
        return parser.parse_args()
    
    
    if __name__ == "__main__":
        args = parse_args()
        sys.exit(main(*args))