Skip to content
Snippets Groups Projects
Select Git revision
  • bc9bfca8255c50c7280088758697e3d20f43d04f
  • main default protected
  • dev
  • f-spss-value-label-name
  • f-unmod
  • f-checkidentical
  • f-simple-breakpoint
  • f-new-debug-tree
  • f-existing-file-id
  • f-no-ident
  • f-collect-problems
  • f-refactor-debug-tree
  • v0.13.0
  • v0.12.0
  • v0.11.0
  • v0.10.1
  • v0.10.0
  • v0.9.1
  • v0.9.0
  • v0.8.0
  • v0.7.1
  • v0.7.0
  • v0.6.0
  • v0.5.0
  • v0.4.0
  • v0.3.0
  • v0.2.0
  • v0.1.0
28 results

scanner.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    scanner.py 19.87 KiB
    #!/usr/bin/env python3
    # encoding: utf-8
    #
    # This file is a part of the CaosDB Project.
    #
    # Copyright (C) 2023 Research Group Biomedical Physics,
    # Max-Planck-Institute for Dynamics and Self-Organization Göttingen
    # Alexander Schlemmer <alexander.schlemmer@ds.mpg.de>
    #
    #
    # This program is free software: you can redistribute it and/or modify
    # it under the terms of the GNU Affero General Public License as
    # published by the Free Software Foundation, either version 3 of the
    # License, or (at your option) any later version.
    #
    # This program is distributed in the hope that it will be useful,
    # but WITHOUT ANY WARRANTY; without even the implied warranty of
    # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    # GNU Affero General Public License for more details.
    #
    # You should have received a copy of the GNU Affero General Public License
    # along with this program. If not, see <https://www.gnu.org/licenses/>.
    #
    # ** end header
    #
    
    """
    This is the scanner.
    
    This was where formerly the ``_crawl(...)`` function from ``crawl.py`` was locates..
    
    This is just the functionality that extracts data from the file system.
    """
    
    from __future__ import annotations
    
    import importlib
    import logging
    import os
    import warnings
    from collections.abc import Callable
    from typing import Any, Optional, Type, Union
    
    import linkahead as db
    import yaml
    from importlib_resources import files
    from jsonschema import validate
    
    from .converters import Converter
    from .debug_tree import DebugTree
    from .stores import GeneralStore, RecordStore
    from .structure_elements import Directory, StructureElement
    from .version import check_cfood_version
    
    logger = logging.getLogger(__name__)
    
    
    def load_definition(crawler_definition_path: str):
        """
        Load a cfood from a crawler definition defined by
        crawler definition path and validate it using cfood-schema.yml.
        """
    
        # Load the cfood from a yaml file:
        with open(crawler_definition_path, "r") as f:
            crawler_definitions = list(yaml.safe_load_all(f))
    
        crawler_definition = _load_definition_from_yaml_dict(
            crawler_definitions)
    
        return _resolve_validator_paths(crawler_definition, crawler_definition_path)
    
    
    def _load_definition_from_yaml_dict(crawler_definitions: list[dict]):
        """Load crawler definitions from a list of (yaml) dicts `crawler_definitions` which
        contains either one or two documents.
    
        Doesn't resolve the validator paths in the cfood definition, so for
        internal and testing use only.
    
        """
        if len(crawler_definitions) == 1:
            # Simple case, just one document:
            crawler_definition = crawler_definitions[0]
            metadata = {}
        elif len(crawler_definitions) == 2:
            metadata = crawler_definitions[0]["metadata"] if "metadata" in crawler_definitions[0] else {
            }
            crawler_definition = crawler_definitions[1]
        else:
            raise RuntimeError(
                "Crawler definition must not contain more than two documents.")
    
        check_cfood_version(metadata)
    
        # TODO: at this point this function can already load the cfood schema extensions
        #       from the crawler definition and add them to the yaml schema that will be
        #       tested in the next lines of code:
    
        # Load the cfood schema:
        with open(str(files('caoscrawler').joinpath('cfood-schema.yml')), "r") as f:
            schema = yaml.safe_load(f)
    
        # Add custom converters to converter enum in schema:
        if "Converters" in crawler_definition:
            for key in crawler_definition["Converters"]:
                schema["cfood"]["$defs"]["converter"]["properties"]["type"]["enum"].append(
                    key)
        if len(crawler_definitions) == 2:
            if "Converters" in metadata:
                for key in metadata["Converters"]:
                    schema["cfood"]["$defs"]["converter"]["properties"]["type"]["enum"].append(
                        key)
            # TODO: We need a similar thing for "Transformers".
    
        # Validate the cfood schema:
        validate(instance=crawler_definition, schema=schema["cfood"])
    
        return crawler_definition
    
    
    def _resolve_validator_paths(definition: dict, definition_path: str):
        """Resolve path to validation files with respect to the file in which
        the crawler was defined.
    
        """
    
        for key, value in definition.items():
    
            if key == "validate" and isinstance(value, str):
                # Validator is given by a path
                if not value.startswith('/'):
                    # Not an absolute path
                    definition[key] = os.path.join(os.path.dirname(definition_path), value)
                    if not os.path.isfile(definition[key]):
                        # TODO(henrik) capture this in `crawler_main` similar to
                        # `ConverterValidationError`.
                        raise FileNotFoundError(
                            f"Couldn't find validation file {definition[key]}")
            elif isinstance(value, dict):
                # Recursively resolve all validators
                definition[key] = _resolve_validator_paths(value, definition_path)
    
        return definition
    
    
    def create_converter_registry(definition: dict):
        """
        Currently the converter registry is a dictionary containing for each converter:
        - key is the short code, abbreviation for the converter class name
        - module is the name of the module to be imported which must be installed
        - class is the converter class to load and associate with this converter entry
    
        Formerly known as "load_converters".
    
        all other info for the converter needs to be included in the converter plugin
        directory:
        schema.yml file
        README.md documentation
        """
    
        # Defaults for the converter registry:
        with open(str(files('caoscrawler').joinpath('default_converters.yml')), "r") as f:
            converter_registry: dict[str, dict[str, str]] = yaml.safe_load(f)
    
        # More converters from definition file:
        if "Converters" in definition:
            for key, entry in definition["Converters"].items():
                if key in ["Dict", "DictTextElement", "DictIntegerElement", "DictBooleanElement",
                           "DictDictElement", "DictListElement", "DictFloatElement"]:
                    warnings.warn(DeprecationWarning(f"{key} is deprecated. Please use the new"
                                                     " variant; without 'Dict' prefix or "
                                                     "'DictElement' in case of 'Dict'"))
    
                converter_registry[key] = {
                    "converter": entry["converter"],
                    "package": entry["package"]
                }
    
        # Load modules and associate classes:
        for key, value in converter_registry.items():
            module = importlib.import_module(value["package"])
            value["class"] = getattr(module, value["converter"])
        return converter_registry
    
    
    def create_transformer_registry(definition: dict[str, dict[str, str]]):
        """
        Currently the transformer registry is a dictionary containing for each transformer:
        - key is the short code, abbreviation for the converter class name
        - module is the name of the module to be imported which must be installed
        - class is the transformer function to load and associate with this converter entry
    
        all other info for the converter needs to be included in the converter plugin
        directory:
        schema.yml file
        README.md documentation
    
        Please refer to the docstring of function "scanner" for more information about the
        detailed structure of the transformer functions.
        """
    
        # Defaults for the transformer registry:
        with open(str(files('caoscrawler').joinpath('default_transformers.yml')), "r") as f:
            transformer_def: dict[str, dict[str, str]] = yaml.safe_load(f)
    
        registry: dict[str, Callable[[Any, dict], Any]] = {}
        # More transformers from definition file:
        if "Transformers" in definition:
            for key, entry in definition["Transformers"].items():
                transformer_def[key] = {
                    "function": entry["function"],
                    "package": entry["package"]
                }
    
        # Load modules and associate classes:
        for key, value in transformer_def.items():
            module = importlib.import_module(value["package"])
            registry[key] = getattr(module, value["function"])
        return registry
    
    
    def initialize_converters(crawler_definition: dict, converter_registry: dict):
        """
        takes the cfood as dict (`crawler_definition`) and creates the converter objects that
        are defined on the highest level. Child Converters will in turn be created during the
        initialization of the Converters.
        """
        converters = []
    
        for key, value in crawler_definition.items():
            # Definitions and Converters are reserved keywords
            # on the top level of the yaml file.
            # TODO: there should also be a top level keyword for the actual
            #       CFood to avoid confusion between top level keywords
            #       and the CFood.
            if key == "Definitions":
                continue
            elif key == "Converters":
                continue
            elif key == "Transformers":
                continue
            converters.append(Converter.converter_factory(
                value, key, converter_registry))
    
        return converters
    
    # --------------------------------------------------------------------------------
    # Main scanner function:
    # --------------------------------------------------------------------------------
    
    
    def scanner(items: list[StructureElement],
                converters: list[Converter],
                general_store: Optional[GeneralStore] = None,
                record_store: Optional[RecordStore] = None,
                structure_elements_path: Optional[list[str]] = None,
                converters_path: Optional[list[str]] = None,
                restricted_path: Optional[list[str]] = None,
                crawled_data: Optional[list[db.Record]] = None,
                debug_tree: Optional[DebugTree] = None,
                registered_transformer_functions: Optional[dict] = None) -> list[db.Record]:
        """Crawl a list of StructureElements and apply any matching converters.
    
        Formerly known as ``_crawl(...)``.
    
        Parameters
        ----------
        items: list[StructureElement]
            structure_elements (e.g. files and folders on one level on the hierarchy)
    
        converters: list[Converter]
            locally defined converters for treating structure elements. A locally
            defined converter could be one that is only valid for a specific subtree
            of the originally cralwed StructureElement structure.
    
        general_store, record_store: GeneralStore, RecordStore, optional
            This recursion of the crawl function should only operate on copies of
            the global stores of the Crawler object.
    
        restricted_path : list[str], optional
            traverse the data tree only along the given path. For example, when a
            directory contains files a, b and c, and b is given as ``restricted_path``, a
            and c will be ignored by the crawler. When the end of the given path is
            reached, traverse the full tree as normal. The first element of the list
            provided by ``restricted_path`` should be the name of the StructureElement
            at this level, i.e. denoting the respective element in the items
            argument.
    
        registered_transformer_functions : dict, optional
            A dictionary of transformer functions that can be used in the "transform" block
            of a converter and that allows to apply simple transformations to variables extracted
            either by the current converter or to other variables found in the current variable store.
    
            Each function is a dictionary:
    
            - The key is the name of the function to be looked up in the dictionary of registered
              transformer functions.
            - The value is the function which needs to be of the form:
                def func(in_value: Any, in_parameters: dict) -> Any:
                    pass
    
        """
        # This path_found variable stores wether the path given by restricted_path was found in the
        # data tree
        path_found = False
        if restricted_path is not None and len(restricted_path) == 0:
            restricted_path = None
    
        if crawled_data is None:
            crawled_data = []
    
        if general_store is None:
            general_store = GeneralStore()
    
        if record_store is None:
            record_store = RecordStore()
    
        if structure_elements_path is None:
            structure_elements_path = []
    
        if converters_path is None:
            converters_path = []
    
        for element in items:
            element_path = os.path.join(*(structure_elements_path + [str(element.get_name())]))
            logger.debug(f"Dealing with {element_path}")
            for converter in converters:
    
                # type is something like "matches files", replace isinstance with "type_matches"
                # match function tests regexp for example
                if (converter.typecheck(element) and (
                        restricted_path is None or element.name == restricted_path[0])
                        and converter.match(element) is not None):
                    path_found = True
                    general_store_copy = general_store.create_scoped_copy()
                    record_store_copy = record_store.create_scoped_copy()
    
                    # Create an entry for this matched structure element that contains the path:
                    general_store_copy[converter.name] = element_path
    
                    # extracts values from structure element and stores them in the
                    # variable store.
                    converter.create_values(general_store_copy, element)
    
                    # Apply transformers if there are any:
                    converter.apply_transformers(general_store_copy,
                                                 registered_transformer_functions)
    
                    keys_modified = converter.create_records(
                        general_store_copy, record_store_copy, element)
    
                    children = converter.create_children(general_store_copy, element)
    
                    if debug_tree is not None:
                        # add provenance information for each variable
                        debug_tree.debug_tree[str(element)] = (
                            general_store_copy.get_storage(), record_store_copy.get_storage())
                        debug_tree.debug_metadata["copied"][str(element)] = (
                            general_store_copy.get_dict_copied(),
                            record_store_copy.get_dict_copied())
                        debug_tree.debug_metadata["usage"][str(element)].add(
                            "/".join(converters_path + [converter.name]))
                        mod_info = debug_tree.debug_metadata["provenance"]
                        for record_name, prop_name in keys_modified:
                            # TODO: check
                            internal_id = record_store_copy.get_internal_id(
                                record_name)
                            record_identifier = record_name + \
                                "_" + str(internal_id)
                            converter.metadata["usage"].add(record_identifier)
                            mod_info[record_identifier][prop_name] = (
                                structure_elements_path + [element.get_name()],
                                converters_path + [converter.name])
    
                    scanner(children, converter.converters,
                            general_store_copy, record_store_copy,
                            structure_elements_path + [element.get_name()],
                            converters_path + [converter.name],
                            restricted_path[1:] if restricted_path is not None else None,
                            crawled_data, debug_tree,
                            registered_transformer_functions)
    
        if restricted_path and not path_found:
            raise RuntimeError("A 'restricted_path' argument was given that is not contained in "
                               "the data tree")
        # if the crawler is running out of scope, copy all records in
        # the record_store, that were created in this scope
        # to the general update container.
        scoped_records = record_store.get_records_current_scope()
        for record in scoped_records:
            crawled_data.append(record)
    
        return crawled_data
    
    
    # --------------------------------------------------------------------------------
    # Main scanning interface functions:
    # --------------------------------------------------------------------------------
    
    
    def scan_directory(dirname: str, crawler_definition_path: str,
                       restricted_path: Optional[list[str]] = None,
                       debug_tree: Optional[DebugTree] = None):
        """ Crawl a single directory.
    
        Formerly known as "crawl_directory".
    
        Convenience function that starts the crawler (calls start_crawling)
        with a single directory as the StructureElement.
    
        Parameters
        ----------
    
        restricted_path: optional, list of strings
                Traverse the data tree only along the given path. When the end of the given path
                is reached, traverse the full tree as normal. See docstring of 'scanner' for
                more details.
    
        Returns
        -------
        crawled_data : list
            the final list with the target state of Records.
        """
    
        crawler_definition = load_definition(crawler_definition_path)
        # Load and register converter packages:
        converter_registry = create_converter_registry(crawler_definition)
    
        # Load and register transformer functions:
        registered_transformer_functions = create_transformer_registry(crawler_definition)
    
        if not dirname:
            raise ValueError(
                "You have to provide a non-empty path for crawling.")
        dir_structure_name = os.path.basename(dirname)
    
        # TODO: needs to be covered somewhere else
        crawled_directory = dirname
        if not dir_structure_name and dirname.endswith('/'):
            if dirname == '/':
                # Crawling the entire file system
                dir_structure_name = "root"
            else:
                # dirname had a trailing '/'
                dir_structure_name = os.path.basename(dirname[:-1])
    
        return scan_structure_elements(Directory(dir_structure_name,
                                                 dirname),
                                       crawler_definition,
                                       converter_registry,
                                       restricted_path=restricted_path,
                                       debug_tree=debug_tree,
                                       registered_transformer_functions=registered_transformer_functions
                                       )
    
    
    def scan_structure_elements(items: Union[list[StructureElement], StructureElement],
                                crawler_definition: dict,
                                converter_registry: dict,
                                restricted_path: Optional[list[str]] = None,
                                debug_tree: Optional[DebugTree] = None,
                                registered_transformer_functions: Optional[dict] = None) -> (
                                    list[db.Record]):
        """
        Start point of the crawler recursion.
    
        Formerly known as "start_crawling".
    
        Parameters
        ----------
        items: list
             A list of structure elements (or a single StructureElement) that is used for
             generating the initial items for the crawler. This could e.g. be a Directory.
        crawler_definition : dict
             A dictionary representing the crawler definition, possibly from a yaml
             file.
        restricted_path: list[str], optional
             Traverse the data tree only along the given path. When the end of the
             given path is reached, traverse the full tree as normal. See docstring
             of 'scanner' for more details.
    
        Returns
        -------
        crawled_data : list[db.Record]
            the final list with the target state of Records.
        """
    
        # This function builds the tree of converters out of the crawler definition.
        if not isinstance(items, list):
            items = [items]
    
        # TODO: needs to be covered somewhere else
        # self.run_id = uuid.uuid1()
        converters = initialize_converters(crawler_definition, converter_registry)
    
        return scanner(
            items=items,
            converters=converters,
            restricted_path=restricted_path,
            debug_tree=debug_tree,
            registered_transformer_functions=registered_transformer_functions
        )