Skip to content
Snippets Groups Projects
Select Git revision
  • 75ed02fc2adca452f61882c5854c6e83bf746a85
  • main default protected
  • dev protected
  • f-linkahead-rename
  • f-real-id
  • f-filesystem-import
  • f-filesystem-link
  • f-filesystem-directory
  • f-filesystem-core
  • f-filesystem-cleanup
  • f-filesystem-main
  • f-name
  • keep_changes
  • f-permission-checks-2
  • f-mysql8-tests
  • f-retrieve-history
  • t-distinct-parents
  • v8.1.0
  • v8.0.0
  • v7.0.2
  • v7.0.1
  • v7.0.0
  • v6.0.1
  • v6.0.0
  • v5.0.0
  • v4.1.0
  • v4.0.0
  • v3.0
  • v2.0.30
29 results

patch.sh

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    crawler.py 23.00 KiB
    #!/usr/bin/env python
    # encoding: utf-8
    #
    # ** header v3.0
    # This file is a part of the CaosDB Project.
    #
    # Copyright (C) 2018 Research Group Biomedical Physics,
    # Max-Planck-Institute for Dynamics and Self-Organization Göttingen
    # Copyright (C) 2020 Indiscale GmbH <info@indiscale.com>
    # Copyright (C) 2020 Henrik tom Wörden
    # Copyright (C) 2020 Florian Spreckelsen <f.spreckelsen@indiscale.com>
    #
    # This program is free software: you can redistribute it and/or modify
    # it under the terms of the GNU Affero General Public License as
    # published by the Free Software Foundation, either version 3 of the
    # License, or (at your option) any later version.
    #
    # This program is distributed in the hope that it will be useful,
    # but WITHOUT ANY WARRANTY; without even the implied warranty of
    # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    # GNU Affero General Public License for more details.
    #
    # You should have received a copy of the GNU Affero General Public License
    # along with this program. If not, see <https://www.gnu.org/licenses/>.
    #
    # ** end header
    #
    """ Crawls a file structure and inserts Records into CaosDB based on what is
    found.
    
    CaosDB can automatically be filled with Records based on some file structure.
    The Crawler will iterate over the files and test for each file whether a CFood
    exists that matches the file path. If one does, it is instanciated to treat the
    match. This occurs in basically three steps:
    1. create a list of identifiables, i.e. unique representation of CaosDB Records
    (such as an experiment belonging to a project and a date/time)
    2. the identifiables are either found in CaosDB or they are created.
    3. the identifiables are update based on the date in the file structure
    """
    
    
    import logging
    import os
    import subprocess
    import traceback
    import uuid
    from datetime import datetime
    
    import caosdb as db
    from caosdb.exceptions import TransactionError
    
    from .cache import Cache, UpdateCache, get_pretty_xml
    from .cfood import RowCFood, add_files, get_ids_for_entities_with_names
    from .datainconsistency import DataInconsistencyError
    from .datamodel_problems import DataModelProblems
    from .guard import RETRIEVE, ProhibitedException
    from .guard import global_guard as guard
    from .suppressKnown import SuppressKnown
    
    logger = logging.getLogger("caosadvancedtools")
    
    
    def separated(text):
        return "-"*60 + "\n" + text
    
    
    class Crawler(object):
        def __init__(self, cfood_types, use_cache=False,
                     abort_on_exception=True, interactive=True, hideKnown=False,
                     debug_file=None, cache_file=None):
            """
            Parameters
            ----------
            cfood_types : list of CFood classes
                   The Crawler will use those CFoods when crawling.
            use_cache : bool, optional
                        Whether to use caching (not re-inserting probably existing
                        objects into CaosDB), defaults to False.
            abort_on_exception : if true, exceptions are raise.
                        Otherwise the crawler continues if an exception occurs.
            interactive : boolean, optional
                          If true, questions will be posed during execution of the
                          crawl function.
            debug_file : a file where debug output is saved. The path will be
                         printed when a critical error occured.
            cache_file : a file where the cached identifiables are stored. See
                         cache.py
    
            """
    
            self.cfood_types = cfood_types
            self.interactive = interactive
            self.report = db.Container()
            self.use_cache = use_cache
            self.hideKnown = hideKnown
            self.debug_file = debug_file
            self.abort_on_exception = abort_on_exception
            self.update_cache = UpdateCache()
            self.filterKnown = SuppressKnown()
            logger.addFilter(self.filterKnown)
    
            if hideKnown is False:
                for cat in ["matches", "inconsistency"]:
                    self.filterKnown.reset(cat)
    
            if self.use_cache:
                self.cache = Cache(db_file=cache_file)
    
        def iteritems(self):
            """ generates items to be crawled with an index"""
            yield 0, None
    
        def update_authorized_changes(self, run_id):
            """
            execute the pending updates of a specific run id.
    
            This should be called if the updates of a certain run were authorized.
    
            Parameters:
            -----------
            run_id: the id of the crawler run
            """
            changes = self.update_cache.get_updates(run_id)
    
            for _, _, old, new, _ in changes:
                current = db.Container()
                new_cont = db.Container()
                new_cont = new_cont.from_xml(new)
    
                for ent in new_cont:
                    current.append(db.execute_query("FIND {}".format(ent.id),
                                                    unique=True))
                current_xml = get_pretty_xml(current)
    
                # check whether previous version equals current version
                # if not, the update must not be done
    
                if current_xml != old:
                    continue
    
                new_cont.update()
    
        def collect_cfoods(self):
            """
            This is the first phase of the crawl. It collects all cfoods that shall
            be processed. The second phase is iterating over cfoods and updating
            CaosDB. This separate first step is necessary in order to allow a
            single cfood being influenced by multiple crawled items. E.g. the
            FileCrawler can have a single cfood treat multiple files.
    
            This is a very basic implementation and this function should be
            overwritten by subclasses.
    
            The basic structure of this function should be, that what ever is
            being processed is iterated and each cfood is checked whether the
            item 'matches'. If it does, a cfood is instantiated passing the item
            as an argument.
            The match can depend on the cfoods already being created, i.e. a file
            migth no longer match because it is already treaded by an earlier
            cfood.
    
            should return cfoods, tbs and errors_occured.
            # TODO do this via logging?
            tbs text returned from traceback
            errors_occured True if at least one error occured
            """
            cfoods = []
            tbs = []
            errors_occured = False
            matches = {idx: [] for idx, _ in self.iteritems()}
    
            logger.debug(separated("Matching files against CFoods"))
    
            for Cfood in self.cfood_types:
                logger.debug("Matching against {}...".format(Cfood.__name__))
    
                for idx, item in self.iteritems():
                    if Cfood.match_item(item):
                        try:
                            matches[idx].append(Cfood.__name__)
                            cfoods.append(Cfood(item))
                            logger.debug("{} matched\n{}.".format(
                                    Cfood.__name__,
                                    item))
                        except DataInconsistencyError:
                            pass
                        except Exception as e:
                            logger.debug("Failed during execution of {}!".format(
                                Cfood.__name__))
                            logger.debug(traceback.format_exc())
                            logger.debug(e)
    
                            if self.abort_on_exception:
                                raise e
    
                            errors_occured = True
                            tbs.append(e)
    
            logger.debug(separated("CFoods are collecting information..."))
    
            for cfood in cfoods:
                cfood.collect_information()
    
            logger.debug(separated("Trying to attach further items to created CFoods"))
    
            for cfood in cfoods:
                logger.debug("Matching against {}...".format(Cfood.__name__))
    
                for idx, item in self.iteritems():
                    if cfood.looking_for(item):
                        logger.debug("{} matched\n{}.".format(
                                cfood.__class__.__name__,
                                item))
                        cfood.attach(item)
                        matches[idx].append(Cfood.__name__)
    
            self.check_matches(matches)
    
            return cfoods, tbs, errors_occured
    
        def check_matches(self, matches):
            for idx, item in self.iteritems():
                if len(matches[idx]) == 0:
                    msg = ("The crawler has no matching rules for and is thus "
                           "ignoring:\n{}".format(item))
    
                    logger.warning(msg, extra={"identifier": str(item),
                                                'category': "matches"})
    
                if len(matches[idx]) > 1:
                    msg = ("Attention: More than one matching cfood!\n"
                           + "Tried to match {}\n".format(item)
                           + "\tRecordTypes:\t" + ", ".join(
                                matches[idx])+"\n")
    
                    logger.warning(msg, extra={"identifier": str(item),
                                                'category': "matches"})
    
        def cached_find_identifiables(self, identifiables):
            if self.use_cache:
                hashes = self.cache.update_ids_from_cache(identifiables)
    
            self.find_or_insert_identifiables(identifiables)
    
            if self.use_cache:
                self.cache.insert_list(hashes, identifiables)
    
        def crawl(self, security_level=RETRIEVE, path=None):
            self.run_id = uuid.uuid1()
            logger.info("Run Id: " + str(self.run_id))
            guard.set_level(level=security_level)
    
            logger.info("Scanning the objects to be treated...")
            cfoods, tbs, errors_occured = self.collect_cfoods()
    
            if self.interactive and "y" != input("Do you want to continue? (y)"):
                return
    
            logger.info("Inserting or updating Records...")
    
            for cfood in cfoods:
                try:
                    cfood.create_identifiables()
    
                    self.cached_find_identifiables(cfood.identifiables)
    
                    cfood.update_identifiables()
                    self.push_identifiables_to_CaosDB(cfood)
                except DataInconsistencyError:
                    pass
                except Exception as e:
                    logger.info("Failed during execution of {}!".format(
                        cfood.__class__.__name__))
                    logger.debug(traceback.format_exc())
                    logger.debug(e)
    
                    if self.abort_on_exception:
                        raise e
                    errors_occured = True
                    tbs.append(e)
    
            pending_changes = self.update_cache.get_updates(self.run_id)
    
            if pending_changes:
                # Sending an Email with a link to a form to authorize updates is
                # only done in SSS mode
    
                if "SHARED_DIR" in os.environ:
                    filename = self.save_form([el[3] for el in pending_changes], path)
                    self.send_mail([el[3] for el in pending_changes], filename)
    
                for i, el in enumerate(pending_changes):
    
                    logger.debug(
                        """
    UNAUTHORIZED UPDATE ({} of {}):
    ____________________\n""".format(i+1, len(pending_changes)) + str(el[3]))
                logger.info("There where unauthorized changes (see above). An "
                            "email was sent to the curator.\n"
                            "You can authorize the updates by invoking the crawler"
                            " with the run id: {rid}\n".format(rid=self.run_id,
                                                               path=path))
    
            if len(DataModelProblems.missing) > 0:
                err_msg = ("There were problems with one or more RecordType or "
                           "Property. Do they exist in the data model?\n")
    
                for ent in DataModelProblems.missing:
                    err_msg += str(ent) + "\n"
                logger.error(err_msg)
                logger.error('Crawler finished with Datamodel Errors')
            elif errors_occured:
                msg = "There were fatal errors during execution, please "
                "contact the system administrator!"
    
                if self.debug_file:
                    msg += "\nPlease provide the following path:\n{}".format(
                        self.debug_file)
                logger.error(msg)
                logger.error("Crawler terminated with failures!")
                logger.debug(tbs)
            else:
                logger.info("Crawler terminated successfully!")
    
        def save_form(self, changes, path):
            """
            Saves an html website to a file that contains a form with a button to
            authorize the given changes.
    
            The button will call the crawler with the same path that was used for
            the current run and with a parameter to authorize the changes of the
            current run.
    
            Parameters:
            -----------
            changes: The CaosDB entities in the version after the update.
            path: the path defining the subtree that is crawled
    
            """
            from xml.sax.saxutils import escape
            # TODO move path related stuff to sss_helper
            form = """
    <html>
    <head>
      <meta charset="utf-8"/>
      <meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
      <meta name="viewport" content="width=device-width, initial-scale=1">
      <title>Crawler</title>
      <link rel="stylesheet" href="{url}/webinterface/css/webcaosdb.css"/>
      <link rel="stylesheet" href="{url}/webinterface/css/bootstrap.css">
      <script src="{url}/webinterface/js/jquery.js"></script>
      <script src="{url}/webinterface/js/loglevel.js"></script>
      <script src="{url}/webinterface/js/bootstrap.js"></script>
      <script src="{url}/webinterface/js/webcaosdb.js"></script>
      <script src="{url}/webinterface/js/plotly.js"></script>
      <script src="{url}/webinterface/js/caosdb.js"></script>
      <script src="{url}/webinterface/js/state-machine.js"></script>
      <script src="{url}/webinterface/js/showdown.js"></script>
      <script src="{url}/webinterface/js/preview.js"></script>
      <script src="{url}/webinterface/js/ext_references.js"></script>
      <script src="{url}/webinterface/js/ext_bottom_line.js"></script>
    </head>
    <body>
    <form method="post" action="{url}/scripting">
        <input type="hidden" name="call" value="crawl.py"/>
        <input type="hidden" name="-p0" value=""/>
        <input type="hidden" name="-p1" value="{path}"/>
        <input type="hidden" name="-Oauthorize-run" value="{rid}"/>
        <input type="submit" value="Authorize"/>
    </form>
    <pre>
    <code>
    {changes}
    </code>
    </pre>
    <script>
            const wrapper = $(`
    <div class="container caosdb-f-main">
      <div class="row caosdb-v-main-col">
        <div class="panel-group caosdb-f-main-entities"></div>
      </div>
    </div>`);
            const code_element = $("code").remove();
            const entities_str = `<Response>${{code_element.text()}}</Response>`;
            const entities = str2xml(entities_str);
            transformation.transformEntities(entities).then((html) => {{
                wrapper.find(".caosdb-f-main-entities").append(html);
                wrapper.find(".caosdb-v-entity-header-buttons-list .glyphicon-comment").hide();
                $(document.body).append(wrapper);
                const message_bodies = $(wrapper).find(".caosdb-messages div div");
                console.log(message_bodies);
    
                for (const body of message_bodies.toArray()) {{
                    const text = body.innerHTML;
                    console.log(text);
                    body.innerHTML = markdown.textToHtml(text);
    
                }}
            }});
        </script>
    </body>
    </html>
    """.format(url=db.configuration.get_config()["Connection"]["url"],
               rid=self.run_id,
               changes=escape("\n".join(changes)),
               path=path)
    
            if "SHARED_DIR" in os.environ:
                directory = os.environ["SHARED_DIR"]
            filename = str(self.run_id)+".html"
            randname = os.path.basename(os.path.abspath(directory))
            filepath = os.path.abspath(os.path.join(directory, filename))
            filename = os.path.join(randname, filename)
            with open(filepath, "w") as f:
                f.write(form)
            return filename
    
        def send_mail(self, changes, filename):
            """ calls sendmail in order to send a mail to the curator about pending
            changes
    
            Parameters:
            -----------
            changes: The CaosDB entities in the version after the update.
            filename: path to the html site that allow the authorization
            """
    
            caosdb_config = db.configuration.get_config()
            text = """Dear Curator,
    there where changes that need your authorization. Please check the following
    carefully and if the changes are ok, click on the following link:
    
    {url}/Shared/{filename}
    
    {changes}
            """.format(url=caosdb_config["Connection"]["url"],
                       filename=filename,
                       changes="\n".join(changes))
            sendmail = caosdb_config["advancedtools"]["sendmail"]
            try:
                fro = caosdb_config["advancedtools"]["crawler.from_mail"]
                to = caosdb_config["advancedtools"]["crawler.to_mail"]
            except KeyError:
                logger.error("Server Configuration is missing a setting for "
                             "sending mails. The administrator should check "
                             "'from_mail' and 'to_mail'.")
                return
    
            p = subprocess.Popen([sendmail, "-f", fro, to], stdin=subprocess.PIPE)
            p.communicate(input=text.encode())
    
        def push_identifiables_to_CaosDB(self, cfood):
            """
            Updates the to_be_updated Container, i.e. pushes the changes to CaosDB
            """
    
            if len(cfood.to_be_updated) == 0:
                return
    
            get_ids_for_entities_with_names(cfood.to_be_updated)
    
            # remove duplicates
            tmp = db.Container()
    
            for el in cfood.to_be_updated:
                if el not in tmp:
                    tmp.append(el)
    
            cfood.to_be_updated = tmp
    
            info = "UPDATE: updating the following entities\n"
    
            for el in cfood.to_be_updated:
                info += str("\t" + el.name if el.name is not None else "\t" +
                            str(el.id))
                info += "\n"
            logger.info(info)
    
            logger.debug(cfood.to_be_updated)
            try:
                guard.safe_update(cfood.to_be_updated)
            except ProhibitedException:
                self.update_cache.insert(cfood.to_be_updated, self.run_id)
            except Exception as e:
                DataModelProblems.evaluate_exception(e)
    
        # TODO remove static?
        @staticmethod
        def find_or_insert_identifiables(identifiables):
            """ Sets the ids of identifiables (that do not have already an id from the
            cache) based on searching CaosDB and retrieves those entities.
            The remaining entities (those which can not be retrieved) have no
            correspondence in CaosDB and are thus inserted.
            """
            # looking for matching entities in CaosDB when there is no valid id
            # i.e. there was none set from a cache
    
            for ent in identifiables:
                if ent.id is None or ent.id < 0:
                    logger.debug("Looking for: {}".format(
                        ent.id if ent.id is not None else ent.name))
                    existing = Crawler.find_existing(ent)
    
                    if existing is not None:
                        ent.id = existing.id
                else:
                    logger.debug("Id is known of: {}".format(ent))
    
            # insert missing, i.e. those which are not valid
            missing_identifiables = db.Container()
            missing_identifiables.extend([ent for ent in identifiables
                                          if ent.id is None or ent.id < 0])
            # TODO the following should not be necessary. Fix it
    
            for ent in missing_identifiables:
                ent.id = None
    
            if len(missing_identifiables) > 0:
                info = "Going to insert the following entities:\n"
    
                for ent in missing_identifiables:
                    info += str(ent)+"\n"
                logger.debug(info)
    
            if len(missing_identifiables) == 0:
                logger.debug("No new entities to be inserted.")
            else:
                try:
                    guard.safe_insert(missing_identifiables)
                except Exception as e:
                    DataModelProblems.evaluate_exception(e)
    
            logger.debug("Retrieving entities from CaosDB...")
            identifiables.retrieve(unique=True, raise_exception_on_error=False)
    
        @staticmethod
        def find_existing(entity):
            """searches for an entity that matches the identifiable in CaosDB
    
            Characteristics of the identifiable like, properties, name or id are
            used for the match.
            """
    
            if entity.name is None:
                # TODO multiple parents are ignored! Sufficient?
                query_string = "FIND Record " + entity.get_parents()[0].name
                query_string += " WITH "
    
                for p in entity.get_properties():
                    query_string += ("'" + p.name + "'='" + str(get_value(p))
                                         + "' AND ")
                # remove the last AND
                query_string = query_string[:-4]
            else:
                query_string = "FIND '{}'".format(entity.name)
    
            logger.debug(query_string)
            q = db.Query(query_string)
            # the identifiable should identify an object uniquely. Thus the query
            # is using the unique keyword
            try:
                r = q.execute(unique=True)
            except TransactionError:
                r = None
    
            # if r is not None:
            #     print("Found Entity with id:", r.id)
            # else:
            #     print("Did not find an existing entity.")
    
            return r
    
    
    class FileCrawler(Crawler):
        def __init__(self, files, **kwargs):
            """
            Parameters
            ----------
            files : files to be crawled
    
            """
            super().__init__(**kwargs)
            self.files = files
            add_files({fi.path: fi for fi in files})
    
        def iteritems(self):
            for idx, p in enumerate(sorted([f.path for f in self.files])):
                yield idx, p
    
        @staticmethod
        def query_files(path):
            query_str = "FIND FILE WHICH IS STORED AT " + (
                path if path.endswith("/") else path + "/") + "**"
            q_info = "Sending the following query: '" + query_str + "'\n"
            files = db.execute_query(query_str)
            logger.info(
                q_info + "Found {} files that need to be processed.".format(
                    len(files)))
    
            return files
    
    
    class TableCrawler(Crawler):
    
        def __init__(self, table, unique_cols, recordtype, **kwargs):
            """
            Parameters
            ----------
            table : pandas DataFrame
            unique_cols : the columns that provide the properties for the
                          identifiable
            recordtype : Record Type of the Records to be created
            """
            self.table = table
    
            # TODO I do not like this yet, but I do not see a better way so far.
            class ThisRowCF(RowCFood):
                def __init__(self, item):
                    super().__init__(item, unique_cols, recordtype)
    
            super().__init__(cfood_types=[ThisRowCF], **kwargs)
    
        def iteritems(self):
            for idx, row in self.table.iterrows():
                yield idx, row
    
    
    def get_value(prop):
        """ Returns the value of a Property
    
        Parameters
        ----------
        prop : The property of which the value shall be returned.
    
        Returns
        -------
        out : The value of the property; if the value is an entity, its ID.
    
        """
    
        if isinstance(prop.value, db.Entity):
            return prop.value.id
        elif isinstance(prop.value, datetime):
            return prop.value.isoformat()
        else:
            return prop.value