diff --git a/sample-management-custom/caosdb-server/scripting/bin/crawl_sample_data_async.py b/sample-management-custom/caosdb-server/scripting/bin/crawl_sample_data_async.py index 7d99c3a234ba4b4b805138485b18b589a7014f15..3c3c09335377d068f01c03b7d299199c87c52d71 100755 --- a/sample-management-custom/caosdb-server/scripting/bin/crawl_sample_data_async.py +++ b/sample-management-custom/caosdb-server/scripting/bin/crawl_sample_data_async.py @@ -47,8 +47,9 @@ from bis_utils import (get_do_not_insert_type_names, return_value_if_not_none, send_mail_with_defaults, SPECIAL_TREATMENT_SAMPLE) +from sample_helpers.sample_upload_get_person import get_person from sample_helpers.sample_upload_post_processing import post_process_samples -from sample_helpers.utils import get_column_header_name +from sample_helpers.utils import get_column_header_name, get_entity_name # suppress warning of diff function apilogger = logging.getLogger("linkahead.apiutils") @@ -396,48 +397,6 @@ def get_nagoya_case(data): return nagoya_case -def get_person(text) -> db.Record: - """ - Return the BIS ID of the person that is specifed as 'Main User' or 'Sampling Person' in data. - - If the Person is not present in the database, an Exception is raised. Creating new Person Reconrds can only be done by a priviledged user. - """ - # Check in which format the person is identified: - person_identifier = text.split(", ") - if len(person_identifier) == 1: - person = _get_person_by_abbreviation(person_identifier[0]) - else: - person = _get_person_by_fullname(person_identifier[1], person_identifier[0]) - - return person - - -def _get_person_by_fullname(first_name, last_name): - # seach for person in db - res = db.execute_query( - "FIND RECORD Person WITH 'First name' = '{}' AND 'Last name' = '{}'".format(first_name, last_name)) - # if person doesn't exist in database... - if len(res) == 0: - # There is not enought data in the template to create a new Person record. Hence, we have to raise an Exception - error_msg = "There is no person Record with 'First name' = '{}' AND 'Last name' = '{}' in the database. ".format( - first_name, last_name) - raise DataInconsistencyError(error_msg) - else: - return res[0] - - -def _get_person_by_abbreviation(abbreviation): - # seach for person in db - res = db.execute_query("FIND RECORD Person WITH 'Abbreviation' = '{}'".format(abbreviation)) - # if person doesn't exist in database... - if len(res) == 0: - # There is not enought data in the template to create a new Person record. Hence, we have to raise an Exception - error_msg = "There is no Person Record with Abbreviation = '{}'".format(abbreviation) - raise DataInconsistencyError(error_msg) - else: - return res[0] - - def synchroize(records, additional_property_ents, htmluserlog_public): crawler = Crawler(securityMode=SecurityMode.UPDATE) identifiables_definition_file = os.path.expanduser("~/identifiables.yml") diff --git a/sample-management-custom/caosdb-server/scripting/bin/sample_helpers/sample_upload_get_person.py b/sample-management-custom/caosdb-server/scripting/bin/sample_helpers/sample_upload_get_person.py new file mode 100644 index 0000000000000000000000000000000000000000..49c34f69d15ff5741696d21c7304a99f431b3064 --- /dev/null +++ b/sample-management-custom/caosdb-server/scripting/bin/sample_helpers/sample_upload_get_person.py @@ -0,0 +1,70 @@ +# +# Copyright (C) 2025 Indiscale GmbH <info@indiscale.com> +# Copyright (C) 2025 Florian Spreckelsen <f.spreckelsen@indiscale.com> +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <https://www.gnu.org/licenses/>. +# +import linkahead as db + +from caosadvancedtools.datainconsistency import DataInconsistencyError + +from .utils import get_entity_name + + +def get_person(text: str) -> db.Record: + """Return the Person Record that is specifed as 'Main User' or + 'Sampling Person' in data. + + If the Person is not present in the database, an Exception is + raised. Creating new Person Reconrds can only be done by a + priviledged user. + + """ + # Check in which format the person is identified: + person_identifier = text.split(", ") + if len(person_identifier) == 1: + person = _get_person_by_id(person_identifier[0]) + else: + person = _get_person_by_fullname(person_identifier[1], person_identifier[0]) + + return person + + +def _get_person_by_fullname(first_name: str, last_name: str) -> db.Record: + # seach for person in db + res = db.execute_query( + f"FIND '{get_entity_name('Person')}' WITH '{ + get_entity_name('first_name_prop')}'='{first_name}' " + f"AND '{get_entity_name('last_name_prop')}'='{last_name}'" + ) + # if person doesn't exist in database... + if len(res) == 0: + # There is not enought data in the template to create a new Person record. Hence, we have to raise an Exception + error_msg = ( + f"There is no person Record with '{get_entity_name('first_name_prop')}' = '{ + first_name}' " + f"AND '{get_entity_name('last_name_prop')}' = '{last_name}' in the database. ") + raise DataInconsistencyError(error_msg) + else: + return res[0] + + +def _get_person_by_id(eid: str) -> db.Record: + + try: + return db.get_entity_by_id(eid, role=get_entity_name("Person")) + except db.EmptyUniqueQueryError: + raise DataInconsistencyError( + f"Could not find a {get_entity_name('Person')} with ID {eid}." + )