Skip to content
Snippets Groups Projects

F awi sams

Merged Florian Spreckelsen requested to merge f-awi-sams into main
1 unresolved thread
1 file
+ 4
1
Compare changes
  • Side-by-side
  • Inline
#
# Copyright (C) 2025 Indiscale GmbH <info@indiscale.com>
# Copyright (C) 2025 Florian Spreckelsen <f.spreckelsen@indiscale.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
import linkahead as db
import pandas as pd
from caosadvancedtools.datainconsistency import DataInconsistencyError
from .sample_upload_get_person import get_person
from .utils import get_column_header_name, get_entity_name, update_property, return_value_if_not_none
def add_event_to_sample(sample: db.Record, data: pd.Series) -> db.Record:
"""Take a given sample and attach an event Record from the
data. Then, return the sample.
"""
_perform_sanity_checks(sample, data)
event_rt = db.get_entity_by_name(get_entity_name("event_rt"), role="RECORDTYPE")
event = db.Record().add_parent(event_rt)
# We performed the sanity checks so we can assume that if the
# Start/Stop Latitude exists, all start/stop data exist.
positions = []
position_prop = db.get_entity_by_name(get_entity_name("Position"))
# Take care of positions
for mode in ["start", "stop"]:
if (get_column_header_name(f"Latitude {mode}") in data and
return_value_if_not_none(data[get_column_header_name(f"Latitude {mode}")]) is not None):
positions.append(
_create_position(
mode=mode,
lat=return_value_if_not_none(data[get_column_header_name(f"Latitude {mode}")]),
lng=return_value_if_not_none(data[get_column_header_name(f"Longitude {mode}")]),
ele=return_value_if_not_none(data[get_column_header_name(f"Elevation {mode}")]),
)
)
if positions:
event = update_property(event, position_prop.id, positions,
datatype=db.LIST(get_entity_name("Position")), property_name=position_prop.name)
# Further event properties
for event_p in ["Biome", "Campaign", "Device", "EventType",
"igsn_doi_prop", "level", "locality_description_prop",
"locality_name_prop", "Sphere"]:
prop = db.get_entity_by_name(get_entity_name(event_p))
if (get_column_header_name(event_p) in data and
return_value_if_not_none(data[get_column_header_name(event_p)]) is not None):
event = update_property(event, prop.id, return_value_if_not_none(
data[get_column_header_name(event_p)]), property_name=prop.name)
# Special treatment for (list of) persons
resp_prop = db.get_entity_by_name(get_entity_name("responsible_person_event"))
if (get_column_header_name("responsible_person_event") in data and
return_value_if_not_none(data[get_column_header_name("responsible_person_event")]) is not None):
resps = return_value_if_not_none(data[get_column_header_name("responsible_person_event")])
if not isinstance(resps, list):
resps = [resps]
recs = [get_person(resp) for resp in resps]
event = update_property(event, resp_prop.id, recs, datatype=db.LIST(
"Person"), property_name=resp_prop.name)
for time_p in ["start_date_prop", "end_date_prop"]:
prop = db.get_entity_by_name(get_entity_name(time_p))
if (get_column_header_name(time_p) in data and
return_value_if_not_none(data[get_column_header_name(time_p)]) is not None):
event = update_property(event, prop.id, return_value_if_not_none(
data[get_column_header_name(time_p)]), property_name=prop.name)
# only add if there was any event data at all:
if len(event.properties) > 0:
sample = update_property(sample, event_rt.id, event, property_name=event_rt.name)
return sample
def _create_position(mode: str, lat: float, lng: float, ele: float):
pos = db.Record()
if mode.lower() == "start":
pos.add_parent(get_entity_name("StartPosition"))
elif mode.lower() == "stop":
pos.add_parent(get_entity_name("StopPosition"))
else:
pos.add_parent(get_entity_name("Position"))
pos.add_property(name=get_entity_name("latitude"), value=lat)
pos.add_property(name=get_entity_name("longitude"), value=lng)
pos.add_property(name=get_entity_name("elevation"), value=ele)
return pos
def _perform_sanity_checks(sample, data):
if (get_column_header_name("end_date") in data and
return_value_if_not_none(data[get_column_header_name("end_date")]) is not None):
if (get_column_header_name("start_date") not in data or
return_value_if_not_none(data[get_column_header_name("start_date")]) is None):
raise DataInconsistencyError(
f"Sample with {get_entity_name('entity_id')} {sample.id} has a "
f"{get_column_header_name('end_date')} but no valid "
f"{get_column_header_name('start_date')}."
)
for name in ["start", "stop"]:
bool_list = [get_column_header_name(f"{val}_{name}") in data for val in [
"latitude", "longitude", "elevation"]]
raise_error = False
if any(bool_list):
if not all(bool_list):
raise_error = True
elif any([return_value_if_not_none(data[get_column_header_name(f"{val}_{name}")]) is None for val in ["latitude", "longitude", "elevation"]]):
raise_error = True
if raise_error:
raise DataInconsistencyError(
f"Sample with {get_entity_name('entity_id')} {sample.id} has an "
f"invalid {name} position. Please make sure that latitude, longitude, "
"and elevation are provided."
)
# only need to check lat since we already checked that if lat is
# present, lng and ele are present, too
if (get_column_header_name("latitude_stop") in data and
return_value_if_not_none(data[get_column_header_name("latitude_stop")]) is not None):
if (get_column_header_name("latitude_start") not in data or
return_value_if_not_none(data[get_column_header_name("latitude_start")]) is not None):
raise DataInconsistencyError(
f"Sample with {get_entity_name('entity_id')} {sample.id} has a "
f"{get_entity_name('StopPosition')} but no valid "
f"{get_entity_name('StartPosition')}."
)
Loading