Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
sync_node.py 10.05 KiB
#!/usr/bin/env python3
# encoding: utf-8
#
# This file is a part of the LinkAhead Project.
#
# Copyright (C) 2024 Henrik tom Wörden
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#


from __future__ import annotations

import logging
from typing import TYPE_CHECKING, Any, Optional, Union

import linkahead as db
import yaml
from linkahead.common.models import Parent, _ParentList, _Properties
from warnings import warn

from .exceptions import ImpossibleMergeError

if TYPE_CHECKING:
    from .identifiable import Identifiable

logger = logging.getLogger(__name__)


class TempID(int):
    """A special kind of int for negative temporary IDs.

    This allows to identify TempIDs in the presence of String IDs.
    A string ID might look like a negative integer.
    """
    pass


class SyncNode(db.Entity):
    """represents the information of an Entity as it shall be created in LinkAhead

    The following information is taken from an db.Entity object during initialization or when the
    object is updated using the `update` member function:
    - id
    - role
    - path
    - file
    - name
    - description
    - parents
    - properties

    Typically, this class is used in the following way:
    1. A SyncNode is initialized with a db.Entity object.
    2. The SyncNode object is possibly updated one or more times with other SyncNode objects.
    3. A db.Entity object is created (`export_entity`) that contains the combined information.
    """

    def __init__(
        self, entity: db.Entity, registered_identifiable: Optional[db.RecordType] = None,
        **kwargs
    ):
        super().__init__(name=entity.name,
                         id=entity.id,
                         description=entity.description,
                         **kwargs)
        # db.Entity properties
        self.role = entity.role
        self.path = entity.path
        self.file = entity.file
        self.parents = _ParentList().extend(entity.parents)
        self.properties = _Properties().extend(entity.properties)
        self._check_for_multiproperties()
        # other members
        self.identifiable: Optional[Identifiable] = None
        self.registered_identifiable = registered_identifiable

    def update(self, other: SyncNode) -> None:
        """update this node with information of given ``other`` SyncNode.

        parents are added if they are not yet in the list
        properties are added in any case. This may lead to duplication of properties.
        We allow this duplication here and remove it when we create a db.Entity (export_entity
        function) because if property values are SyncNode objects, they might not be comparable (no
        ID, no identifiable) yet.
        """

        if other.identifiable is not None and self.identifiable is not None:
            if (
                other.identifiable.get_representation()
                != self.identifiable.get_representation()
            ):
                raise ValueError(
                    "The SyncNode that is used with update must have an equivalent"
                    f" identifiable. I.e. you cannot merge entities with differing identifiables"
                    "The identifiables where:\n"
                    f"{self.identifiable._create_hashable_string(self.identifiable)}\n"
                    f"and\n{other.identifiable._create_hashable_string(other.identifiable)}."
                )

        if other.identifiable:
            self.identifiable = other.identifiable
        for attr in ["id", "role", "path", "file", "name", "description"]:
            if other.__getattribute__(attr) is not None:
                if self.__getattribute__(attr) is None:
                    self.__setattr__(attr, other.__getattribute__(attr))
                else:
                    if self.__getattribute__(attr) != other.__getattribute__(attr):
                        raise ImpossibleMergeError(
                            f"Trying to update {attr} but this would lead to an "
                            f"override of the value '{self.__getattribute__(attr)}' "
                            f"by the value '{other.__getattribute__(attr)}'",
                            pname=attr, values=(self.__getattribute__(attr),
                                                other.__getattribute__(attr))
                        )
        for p in other.parents:
            if not parent_in_list(p, self.parents):
                self.parents.append(p)
        for p in other.properties:
            self.properties.append(p)

    def export_entity(self) -> db.Entity:
        """create a db.Entity object from this SyncNode

        Properties are only added once (based on id or name). If values do not match, an Error is
        raised. If values are SyncNode objects with IDs, they are considered equal if their IDs are
        equal.
        """
        ent = None
        if self.role == "Record":
            ent = db.Record()
        elif self.role == "File":
            ent = db.File()
        else:
            raise RuntimeError("Invalid role")
        for attr in ["id", "role", "path", "file", "name", "description"]:
            ent.__setattr__(attr, self.__getattribute__(attr))
        for p in self.parents:
            ent.add_parent(p)
        for p in self.properties:
            entval: Any = ent.get_property(p)
            if entval is None:
                ent.add_property(id=p.id, name=p.name, value=p.value)
            else:
                entval = entval.value
                unequal = False
                pval = p.value
                if isinstance(entval, list) != isinstance(pval, list):
                    unequal = True
                if not isinstance(entval, list):
                    entval = [entval]
                if not isinstance(pval, list):
                    pval = [pval]
                if len(entval) != len(pval):
                    unequal = True
                else:
                    for e_el, p_el in zip(entval, pval):
                        if isinstance(e_el, SyncNode) and e_el.id is not None:
                            e_el = e_el.id
                        if isinstance(p_el, SyncNode) and p_el.id is not None:
                            p_el = p_el.id
                        if e_el != p_el:
                            unequal = True

                if unequal:
                    logger.error(
                        "The Crawler is trying to create an entity,"
                        " but there are conflicting property values."
                        f"Problematic Property: {p.name}\n"
                        f"First value:\n{entval}\n"
                        f"Second value:\n{pval}\n"
                        f"{self}"
                    )
                    ime = ImpossibleMergeError(
                        "Cannot merge Entities", pname=p.name, values=(entval, pval)
                    )
                    raise ime
        return ent

    def __repr__(self) -> str:
        """ somewhat concise text representation of the SyncNode """
        res = f"\n=====================================================\n{self.role}\n"
        res += yaml.dump(
            {
                "id": self.id,
                "name": self.name,
                "path": self.path,
                "parents": [el.name for el in self.parents],
            },
            allow_unicode=True,
        )
        res += "---------------------------------------------------\n"
        res += "properties:\n"
        d: dict[str, Any] = {}
        for p in self.properties:
            v = p.value
            d[p.name] = []
            if not isinstance(p.value, list):
                v = [v]
            for el in v:
                if isinstance(el, SyncNode):
                    d[p.name].append(
                        {
                            "id": el.id,
                            "name": el.name,
                            "path": el.path,
                            "parents": [e.name for e in el.parents],
                        }
                    )
                else:
                    d[p.name].append(el)

        return (
            res
            + yaml.dump(d, allow_unicode=True)
            + "=====================================================\n"
        )

    def _check_for_multiproperties(self):
        """ warns if multiproperties are present """
        ids = set()
        names = set()
        for p in self.properties:
            if p.name is not None:
                if p.name in names:
                    warn("Multiproperties are not supported by the crawler.")
                names.add(p.name)
            if p.id is not None:
                if p.id in ids:
                    warn("Multiproperties are not supported by the crawler.")
                ids.add(p.id)


def parent_in_list(parent: Parent, plist: _ParentList) -> bool:
    """helper function that checks whether a parent with the same name or ID is in the plist"""
    missing = False
    if parent.name is not None:
        if parent.name not in plist._element_by_name:
            missing = True
    if parent.id is not None:
        if str(parent.id) not in plist._element_by_id:
            missing = True
    return not missing


def property_in_list(prop: db.Property, plist: _Properties) -> bool:
    """helper function that checks whether a property with the same name or ID is in the plist"""
    missing = False
    if prop.name is not None:
        if prop.name not in plist._element_by_name:
            missing = True
    if prop.id is not None:
        if str(prop.id) not in plist._element_by_id:
            missing = True
    return not missing