Skip to content
Snippets Groups Projects
Verified Commit 1c88bf02 authored by Timm Fitschen's avatar Timm Fitschen
Browse files

Merge branch 'f-filesystem-cleanup' into f-filesystem-core

parents 58bef57d 283006ac
Branches
No related tags found
1 merge request!86Draft: ENH: file system: core
Pipeline #47482 failed
Showing
with 87 additions and 7235 deletions
This diff is collapsed.
from linkahead.cached import *
from warnings import warn
warn(("CaosDB was renamed to LinkAhead. Please import this library as `import linkahead.cached`. Using the"
" old name, starting with caosdb, is deprecated."), DeprecationWarning)
"""Commonly used classes for CaosDB."""
from linkahead.common import *
from warnings import warn
warn(("CaosDB was renamed to LinkAhead. Please import this library as `import linkahead.common`. Using the"
" old name, starting with caosdb, is deprecated."), DeprecationWarning)
# -*- coding: utf-8 -*-
#
# ** header v3.0
# This file is a part of the CaosDB Project.
#
# Copyright (C) 2018 Research Group Biomedical Physics,
# Max-Planck-Institute for Dynamics and Self-Organization Göttingen
# Copyright (C) 2020 Timm Fitschen <t.fitschen@indiscale.com>
# Copyright (C) 2020 IndiScale GmbH <info@indiscale.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
# ** end header
#
"""missing docstring."""
from linkahead.common.administration import *
from warnings import warn
import re
import string
import random
from caosdb.common.utils import xml2str
from caosdb.connection.connection import get_connection
from caosdb.exceptions import (EntityDoesNotExistError, HTTPClientError,
HTTPForbiddenError, HTTPResourceNotFoundError,
ServerConfigurationException)
from lxml import etree
def set_server_property(key, value):
"""set_server_property.
Set a server property.
Parameters
----------
key : str
The name of the server property.
value : str
The value of the server property.
Returns
-------
None
"""
con = get_connection()
try:
con._form_data_request(method="POST", path="_server_properties",
params={key: value}).read()
except EntityDoesNotExistError:
raise ServerConfigurationException(
"Debug mode in server is probably disabled.") from None
def get_server_properties():
"""get_server_properties.
Get all server properties as a dict.
Returns
-------
dict
The server properties.
"""
con = get_connection()
try:
body = con._http_request(
method="GET", path="_server_properties")
except EntityDoesNotExistError:
raise ServerConfigurationException(
"Debug mode in server is probably disabled.") from None
xml = etree.parse(body)
props = dict()
for elem in xml.getroot():
props[elem.tag] = elem.text
return props
def get_server_property(key):
"""get_server_property.
Get a server property.
Parameters
----------
key : str
The name of the server property
Returns
-------
value : str
The string value of the server property.
Raises
------
KeyError
If the server property is no defined.
"""
return get_server_properties()[key]
def generate_password(length: int):
"""Create a random password that fulfills the security requirements
Parameters
----------
length : int
Length of the generated password. Has to be greater than 7.
Returns
-------
password : string
Generated random password of the given length
Raises
------
ValueError:
If the length is less than 8.
"""
minimum_password_length = 8
if length < minimum_password_length:
raise ValueError("CaosDB passwords have to be at least {} characters.".format(
minimum_password_length))
sample_letters = string.ascii_letters + string.digits + "!#$%*+-/:;?_"
password = ''.join((random.choice(sample_letters) for i in range(length)))
while not re.match(r"(?=.*[A-Z])(?=.*[a-z])(?=.*\d)(?=.*[\W_]).{8,}",
password):
password = ''.join((random.choice(sample_letters)
for i in range(length)))
return password
def _retrieve_user(name, realm=None, **kwargs):
con = get_connection()
try:
return con._http_request(method="GET", path="User/" + (realm + "/" + name if realm is not None else name), **kwargs).read()
except HTTPForbiddenError as e:
e.msg = "You are not permitted to retrieve this user."
raise
except HTTPResourceNotFoundError as e:
e.msg = "User does not exist."
raise
def _delete_user(name, **kwargs):
con = get_connection()
try:
return con._http_request(method="DELETE", path="User/" + name, **kwargs).read()
except HTTPForbiddenError as e:
e.msg = "You are not permitted to delete this user."
raise
except HTTPResourceNotFoundError as e:
e.msg = "User does not exist."
raise
def _update_user(name, realm=None, password=None, status=None,
email=None, entity=None, **kwargs):
con = get_connection()
params = {}
if password is not None:
params["password"] = password
if status is not None:
params["status"] = status
if email is not None:
params["email"] = email
if entity is not None:
params["entity"] = str(entity)
try:
return con.put_form_data(entity_uri_segment="User/" + (realm + "/" + name if realm is not None else name), params=params, **kwargs).read()
except HTTPResourceNotFoundError as e:
e.msg = "User does not exist."
raise e
except HTTPForbiddenError as e:
e.msg = "You are not permitted to update this user."
raise e
except HTTPClientError as e:
for elem in etree.fromstring(e.body):
if elem.tag == "Error":
e.msg = elem.get("description")
raise
def _insert_user(name, password=None, status=None, email=None, entity=None, **kwargs):
con = get_connection()
params = {"username": name}
if password is not None:
params["password"] = password
if status is not None:
params["status"] = status
if email is not None:
params["email"] = email
if entity is not None:
params["entity"] = entity
try:
return con.post_form_data(entity_uri_segment="User", params=params, **kwargs).read()
except HTTPForbiddenError as e:
e.msg = "You are not permitted to insert a new user."
raise e
except HTTPClientError as e:
for elem in etree.fromstring(e.body):
if elem.tag == "Error":
e.msg = elem.get("description")
raise e
def _insert_role(name, description, **kwargs):
con = get_connection()
try:
return con.post_form_data(entity_uri_segment="Role", params={"role_name": name, "role_description": description}, **kwargs).read()
except HTTPForbiddenError as e:
e.msg = "You are not permitted to insert a new role."
raise
except HTTPClientError as e:
if e.status == 409:
e.msg = "Role name is already in use. Choose a different name."
raise
def _update_role(name, description, **kwargs):
con = get_connection()
try:
return con.put_form_data(entity_uri_segment="Role/" + name, params={"role_description": description}, **kwargs).read()
except HTTPForbiddenError as e:
e.msg = "You are not permitted to update this role."
raise
except HTTPResourceNotFoundError as e:
e.msg = "Role does not exist."
raise
def _retrieve_role(name, **kwargs):
con = get_connection()
try:
return con._http_request(method="GET", path="Role/" + name, **kwargs).read()
except HTTPForbiddenError as e:
e.msg = "You are not permitted to retrieve this role."
raise
except HTTPResourceNotFoundError as e:
e.msg = "Role does not exist."
raise
def _delete_role(name, **kwargs):
con = get_connection()
try:
return con._http_request(method="DELETE", path="Role/" + name, **kwargs).read()
except HTTPForbiddenError as e:
e.msg = "You are not permitted to delete this role."
raise
except HTTPResourceNotFoundError as e:
e.msg = "Role does not exist."
raise
def _set_roles(username, roles, realm=None, **kwargs):
xml = etree.Element("Roles")
for r in roles:
xml.append(etree.Element("Role", name=r))
body = xml2str(xml)
con = get_connection()
try:
body = con._http_request(method="PUT", path="UserRoles/" + (realm + "/" +
username if realm is not None else username), body=body, **kwargs).read()
except HTTPForbiddenError as e:
e.msg = "You are not permitted to set this user's roles."
raise
except HTTPResourceNotFoundError as e:
e.msg = "User does not exist."
raise
except HTTPClientError as e:
if e.status == 409:
e.msg = "Role does not exist."
raise
ret = set()
for r in etree.fromstring(body)[0]:
if r.tag == "Role":
ret.add(r.get("name"))
return ret
def _get_roles(username, realm=None, **kwargs):
con = get_connection()
try:
body = con._http_request(method="GET", path="UserRoles/" + (
realm + "/" + username if realm is not None else username), **kwargs).read()
except HTTPForbiddenError as e:
e.msg = "You are not permitted to retrieve this user's roles."
raise
except HTTPResourceNotFoundError as e:
e.msg = "User does not exist."
raise
ret = set()
for r in etree.fromstring(body).xpath('/Response/Roles')[0]:
if r.tag == "Role":
ret.add(r.get("name"))
return ret
def _set_permissions(role, permission_rules, **kwargs):
"""Set permissions for a role.
Parameters
----------
role : str
The role for which the permissions are set.
permission_rules : iterable<PermissionRule>
An iterable with PermissionRule objects.
**kwargs :
Additional arguments which are passed to the HTTP request.
Returns
-------
None
"""
xml = etree.Element("PermissionRules")
for p in permission_rules:
xml.append(p._to_xml())
body = xml2str(xml)
con = get_connection()
try:
return con._http_request(method="PUT", path="PermissionRules/" + role, body=body, **kwargs).read()
except HTTPForbiddenError as e:
e.msg = "You are not permitted to set this role's permissions."
raise
except HTTPResourceNotFoundError as e:
e.msg = "Role does not exist."
raise
def _get_permissions(role, **kwargs):
con = get_connection()
try:
return PermissionRule._parse_body(con._http_request(method="GET", path="PermissionRules/" + role, **kwargs).read())
except HTTPForbiddenError as e:
e.msg = "You are not permitted to retrieve this role's permissions."
raise
except HTTPResourceNotFoundError as e:
e.msg = "Role does not exist."
raise
class PermissionRule():
"""Permission rules.
Parameters
----------
action : str
Either "grant" or "deny"
permission : str
For example "RETRIEVE:*".
priority : bool, optional
Whether the priority shall be set, defaults is False.
"""
@staticmethod
def _parse_boolean(bstr):
return str(bstr) in ["True", "true", "TRUE", "yes"]
def __init__(self, action, permission, priority=False):
self._action = action
self._permission = permission
self._priority = PermissionRule._parse_boolean(priority)
def _to_xml(self):
xml = etree.Element(self._action)
xml.set("permission", self._permission)
if self._priority is True:
xml.set("priority", "true")
return xml
@staticmethod
def _parse_element(elem):
return PermissionRule(elem.tag, elem.get(
"permission"), elem.get("priority"))
@staticmethod
def _parse_body(body):
xml = etree.fromstring(body)
ret = set()
for c in xml:
if c.tag in ["Grant", "Deny"]:
ret.add(PermissionRule._parse_element(c))
return ret
def __str__(self):
return str(self._action) + "(" + str(self._permission) + ")" + \
("P" if self._priority is True else "")
def __repr__(self):
return str(self)
def __hash__(self):
return hash(str(self).lower())
def __eq__(self, other):
return str(other).lower() == str(self).lower()
warn(("CaosDB was renamed to LinkAhead. Please import this library as `import linkahead.common.administration`. Using the"
" old name, starting with caosdb, is deprecated."), DeprecationWarning)
# -*- coding: utf-8 -*-
#
# ** header v3.0
# This file is a part of the CaosDB Project.
#
# Copyright (C) 2020 IndiScale GmbH
# Copyright (C) 2020 Henrik tom Wörden, IndiScale GmbH
# Copyright (C) 2020 Daniel Hornung (d.hornung@indiscale.com)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
# ** end header
#
import re
from linkahead.common.datatype import *
from warnings import warn
from ..exceptions import EmptyUniqueQueryError, QueryNotUniqueError
DOUBLE = "DOUBLE"
REFERENCE = "REFERENCE"
TEXT = "TEXT"
DATETIME = "DATETIME"
INTEGER = "INTEGER"
FILE = "FILE"
BOOLEAN = "BOOLEAN"
def LIST(datatype):
if hasattr(datatype, "name"):
datatype = datatype.name
return "LIST<" + str(datatype) + ">"
def get_list_datatype(datatype):
""" returns the datatype of the elements in the list """
if not isinstance(datatype, str):
return None
match = re.match("LIST(<|&lt;)(?P<datatype>.*)(>|&gt;)", datatype)
if match is not None:
return match.group("datatype")
else:
return None
def is_list_datatype(datatype):
""" returns whether the datatype is a list """
return get_list_datatype(datatype) is not None
def is_reference(datatype):
"""Returns whether the value is a reference
FILE and REFERENCE properties are examples, but also datatypes that are
RecordTypes.
Parameters
----------
datatype : str
The datatype to check.
Returns
-------
bool
True if the datatype is a not base datatype or a list of a base datatype.
Otherwise False is returned.
"""
if datatype is None:
raise ValueError("Cannot decide whether datatype is reference if None"
" is supplied")
if datatype in [DOUBLE, BOOLEAN, INTEGER, TEXT, DATETIME]:
return False
elif is_list_datatype(datatype):
return is_reference(get_list_datatype(datatype))
else:
return True
def get_referenced_recordtype(datatype):
"""Return the record type of the referenced datatype.
Raises
------
ValueError
In cases where datatype is not a reference, the list does not have
a referenced record type or the datatype is a FILE.
Parameters
----------
datatype : str
The datatype to check.
Returns
-------
str
String containing the name of the referenced datatype.
"""
if not is_reference(datatype):
raise ValueError("datatype must be a reference")
if is_list_datatype(datatype):
datatype = get_list_datatype(datatype)
if datatype is None:
raise ValueError("list does not have a list datatype")
if datatype == FILE:
raise ValueError(
"FILE references are not considered references with a record type")
return datatype
def get_id_of_datatype(datatype):
""" returns the id of a Record Type
This is not trivial, as queries may also return children. A check comparing
names is necessary.
Parameters
----------
datatype : string
A datatype, e.g. DOUBLE, or LIST<Person>
Returns
-------
The id of the RecordType with the same name as the datatype.
Raises
------
QueryNotUniqueError
If there are more than one entities with the same name as the datatype.
EmptyUniqueQueryError
If there is no entity with the name of the datatype.
"""
from caosdb import execute_query
if is_list_datatype(datatype):
datatype = get_list_datatype(datatype)
q = "FIND RECORDTYPE {}".format(datatype)
# we cannot use unique=True here, because there might be subtypes
res = execute_query(q)
res = [el for el in res if el.name.lower() == datatype.lower()]
if len(res) > 1:
raise QueryNotUniqueError(
"Name {} did not lead to unique result; Missing "
"implementation".format(datatype))
elif len(res) != 1:
raise EmptyUniqueQueryError(
"No RecordType named {}".format(datatype))
return res[0].id
warn(("CaosDB was renamed to LinkAhead. Please import this library as `import linkahead.common.datatype`. Using the"
" old name, starting with caosdb, is deprecated."), DeprecationWarning)
This diff is collapsed.
# ** header v3.0
# This file is a part of the CaosDB Project.
#
# Copyright (C) 2020 IndiScale GmbH <info@indiscale.com>
# Copyright (C) 2020 Timm Fitschen <t.fitschen@indiscale.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
# ** end header
import copy
from lxml import etree
from linkahead.common.state import *
from warnings import warn
def _translate_to_state_acis(acis):
result = set()
for aci in acis:
aci = copy.copy(aci)
if aci.role:
aci.role = "?STATE?" + aci.role + "?"
result.add(aci)
return result
class Transition:
"""Transition
Represents allowed transitions from one state to another.
Properties
----------
name : str
The name of the transition
description: str
The description of the transition
from_state : str
A state name
to_state : str
A state name
"""
def __init__(self, name, from_state, to_state, description=None):
self._name = name
self._from_state = from_state
self._to_state = to_state
self._description = description
@property
def name(self):
return self._name
@property
def description(self):
return self._description
@property
def from_state(self):
return self._from_state
@property
def to_state(self):
return self._to_state
def __repr__(self):
return f'Transition(name="{self.name}", from_state="{self.from_state}", to_state="{self.to_state}", description="{self.description}")'
def __eq__(self, other):
return (isinstance(other, Transition)
and other.name == self.name
and other.to_state == self.to_state
and other.from_state == self.from_state)
def __hash__(self):
return 23472 + hash(self.name) + hash(self.from_state) + hash(self.to_state)
@staticmethod
def from_xml(xml):
to_state = [to.get("name") for to in xml
if to.tag.lower() == "tostate"]
from_state = [from_.get("name") for from_ in xml
if from_.tag.lower() == "fromstate"]
result = Transition(name=xml.get("name"),
description=xml.get("description"),
from_state=from_state[0] if from_state else None,
to_state=to_state[0] if to_state else None)
return result
class State:
"""State
Represents the state of an entity and take care of the serialization and
deserialization of xml for the entity state.
An entity state is always a State of a StateModel.
Properties
----------
name : str
Name of the State
model : str
Name of the StateModel
description : str
Description of the State (read-only)
id : str
Id of the undelying State record (read-only)
transitions : set of Transition
All transitions which are available from this state (read-only)
"""
def __init__(self, model, name):
self.name = name
self.model = model
self._id = None
self._description = None
self._transitions = None
@property
def id(self):
return self._id
@property
def description(self):
return self._description
@property
def transitions(self):
return self._transitions
def __eq__(self, other):
return (isinstance(other, State)
and self.name == other.name
and self.model == other.model)
def __hash__(self):
return hash(self.name) + hash(self.model)
def __repr__(self):
return f"State('{self.model}', '{self.name}')"
def to_xml(self):
"""Serialize this State to xml.
Returns
-------
xml : etree.Element
"""
xml = etree.Element("State")
if self.name is not None:
xml.set("name", self.name)
if self.model is not None:
xml.set("model", self.model)
return xml
@staticmethod
def from_xml(xml):
"""Create a new State instance from an xml Element.
Parameters
----------
xml : etree.Element
Returns
-------
state : State
"""
name = xml.get("name")
model = xml.get("model")
result = State(name=name, model=model)
result._id = xml.get("id")
result._description = xml.get("description")
transitions = [Transition.from_xml(t) for t in xml if t.tag.lower() ==
"transition"]
if transitions:
result._transitions = set(transitions)
return result
@staticmethod
def create_state_acl(acl):
from .models import ACL
state_acl = ACL()
state_acl._grants = _translate_to_state_acis(acl._grants)
state_acl._denials = _translate_to_state_acis(acl._denials)
state_acl._priority_grants = _translate_to_state_acis(acl._priority_grants)
state_acl._priority_denials = _translate_to_state_acis(acl._priority_denials)
return state_acl
warn(("CaosDB was renamed to LinkAhead. Please import this library as `import linkahead.common.state`. Using the"
" old name, starting with caosdb, is deprecated."), DeprecationWarning)
class TimeZone():
"""
TimeZone, e.g. CEST, Europe/Berlin, UTC+4.
from linkahead.common.timezone import *
from warnings import warn
Attributes
----------
zone_id : string
ID of the time zone.
offset : int
Offset to UTC in seconds.
display_name : string
A human-friendly name of the time zone:
"""
def __init__(self, zone_id, offset, display_name):
self.zone_id = zone_id
self.offset = offset
self.display_name = display_name
warn(("CaosDB was renamed to LinkAhead. Please import this library as `import linkahead.common.timezone`. Using the"
" old name, starting with caosdb, is deprecated."), DeprecationWarning)
# -*- coding: utf-8 -*-
#
# ** header v3.0
# This file is a part of the CaosDB Project.
#
# Copyright (C) 2018 Research Group Biomedical Physics,
# Max-Planck-Institute for Dynamics and Self-Organization Göttingen
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
# ** end header
#
"""mising docstring."""
from lxml import etree
from multiprocessing import Lock
from uuid import uuid4
from sys import hexversion
import warnings
_uuid_lock = Lock()
from linkahead.common.utils import *
from warnings import warn
def xml2str(xml):
return etree.tostring(xml, pretty_print=True, encoding='unicode')
def uuid():
exc = None
ret = None
try:
_uuid_lock.acquire()
ret = uuid4()
except Exception as e:
exc = e
finally:
_uuid_lock.release()
if exc:
raise exc
return ret
def is_int(obj):
try:
int(obj)
return True
except ValueError:
return False
def experimental(message):
def decorator(decorated):
def caller(*args, **kwargs):
warnings.warn(message, UserWarning, stacklevel=3)
return decorated(*args, **kwargs)
return caller
return decorator
warn(("CaosDB was renamed to LinkAhead. Please import this library as `import linkahead.common.utils`. Using the"
" old name, starting with caosdb, is deprecated."), DeprecationWarning)
# -*- coding: utf-8 -*-
#
# ** header v3.0
# This file is a part of the CaosDB Project.
#
# Copyright (C) 2020 Timm Fitschen <t.fitschen@indiscale.com>
# Copyright (C) 2020 IndiScale GmbH <info@indiscale.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
# ** end header
#
""" Versioning module for anything related to entity versions.
Currently this module defines nothing but a single class, `Version`.
"""
from linkahead.common.versioning import *
from warnings import warn
from __future__ import absolute_import
from caosdb.common.utils import xml2str
from lxml import etree
class Version():
"""The version of an entity.
An entity version has a version id (string), a date (UTC timestamp), a
list of predecessors and a list of successors.
Parameters
----------
id : str, optional
See attribute `id`. Default: None
date : str, optional
See attribute `date`. Default: None
username : str, optional
See attribute `username`. Default: None
realm : str, optional
See attribute `realm`. Default: None
predecessors : list of Version, optional
See attribute `predecessors`. Default: empty list.
successors : list of Version, optional
See attribute `successors`. Default: empty list.
is_head : bool
See attribute `is_head`. Default: False
is_complete_history : bool
See attribute `is_complete_history`. Default: False
Attributes
----------
id : str
Version ID (not the entity's id).
date : str
UTC Timestamp of the version, i.e. the date and time when the entity of
this version has been inserted or modified.
username : str
The username of the user who inserted or updated this version.
realm : str
The realm of the user who inserted or updated this version.
predecessors : list of Version
Predecessors are the older entity versions which have been modified
into this version. Usually, there is only one predecessor. However,
this API allows for entities to be merged into one entity, which would
result in more than one predecessor.
successors : list of Version
Successors are newer versions of this entity. If there are successors,
this version is not the latest version of this entity. Usually, there
is only one successor. However, this API allows that a single entity
may co-exist in several versions (e.g. several proposals for the next
entity status). That would result in more than one successor.
is_head : bool or string
If true, this indicates that this version is the HEAD if true.
Otherwise it is not known whether this is the head or not. Any string
matching "true" (case-insensitively) is regarded as True.
Nota bene: This property should typically be set if the server response
indicated that this is the head version.
is_complete_history : bool or string
If true, this indicates that this version contains the full version
history. That means, that the predecessors and successors have their
respective predecessors and successors attached as well and the tree is
completely available. Any string matching "true" (case-insensitively)
is regarded as True.
Nota bene: This property should typically be set if the server response
indicated that the full version history is included in its response.
"""
# pylint: disable=redefined-builtin
def __init__(self, id=None, date=None, username=None, realm=None,
predecessors=None, successors=None, is_head=False,
is_complete_history=False):
"""Typically the `predecessors` or `successors` should not "link back" to an existing Version
object."""
self.id = id
self.date = date
self.username = username
self.realm = realm
self.predecessors = predecessors if predecessors is not None else []
self.successors = successors if successors is not None else []
self.is_head = str(is_head).lower() == "true"
self.is_complete_history = str(is_complete_history).lower() == "true"
def get_history(self):
""" Returns a flat list of Version instances representing the history
of the entity.
The list items are ordered by the relation between the versions,
starting with the oldest version.
The items in the list have no predecessors or successors attached.
Note: This method only returns reliable results if
`self.is_complete_history is True` and it will not retrieve the full
version history if it is not present.
Returns
-------
list of Version
"""
versions = []
for p in self.predecessors:
# assuming that predecessors don't have any successors
versions = p.get_history()
versions.append(Version(id=self.id, date=self.date,
username=self.username, realm=self.realm))
for s in self.successors:
# assuming that successors don't have any predecessors
versions.extend(s.get_history())
return versions
def to_xml(self, tag="Version"):
"""Serialize this version to xml.
The tag name is 'Version' per default. But since this method is called
recursively for the predecessors and successors as well, the tag name
can be configured.
The resulting xml element contains attributes 'id' and 'date' and
'Predecessor' and 'Successor' child elements.
Parameters
----------
tag : str, optional
The name of the returned xml element. Defaults to 'Version'.
Returns
-------
xml : etree.Element
"""
xml = etree.Element(tag)
if self.id is not None:
xml.set("id", self.id)
if self.date is not None:
xml.set("date", self.date)
if self.username is not None:
xml.set("username", self.username)
if self.realm is not None:
xml.set("realm", self.realm)
if self.predecessors is not None:
for p in self.predecessors:
xml.append(p.to_xml(tag="Predecessor"))
if self.is_head is True:
xml.set("head", "true")
if self.successors is not None:
for s in self.successors:
xml.append(s.to_xml(tag="Successor"))
return xml
def __str__(self):
"""Return a stringified xml representation."""
return self.__repr__()
def __repr__(self):
"""Return a stringified xml representation."""
return xml2str(self.to_xml())
@staticmethod
def from_xml(xml):
"""Parse a version object from a 'Version' xml element.
Parameters
----------
xml : etree.Element
A 'Version' xml element, with 'id', possibly 'date', `username`,
`realm`, and `head` attributes as well as 'Predecessor' and
'Successor' child elements.
Returns
-------
version : Version
a new version instance
"""
predecessors = [Version.from_xml(p) for p in xml if p.tag.lower() == "predecessor"]
successors = [Version.from_xml(s) for s in xml if s.tag.lower() == "successor"]
return Version(id=xml.get("id"), date=xml.get("date"),
is_head=xml.get("head"),
is_complete_history=xml.get("completeHistory"),
username=xml.get("username"), realm=xml.get("realm"),
predecessors=predecessors, successors=successors)
def __hash__(self):
"""Hash of the version instance.
Also hashes the predecessors and successors.
"""
return (hash(self.id)
+ hash(self.date)
+ (Version._hash_list(self.predecessors)
if self.predecessors else 26335)
+ (Version._hash_list(self.successors)
if self.successors else -23432))
@staticmethod
def _hash_list(_list):
"""Hash a list by hashing each element and its index."""
result = 12352
for idx, val in enumerate(_list):
result += hash(val) + idx
return result
@staticmethod
def _eq_list(this, that):
"""List equality.
List equality is defined as equality of each element, the order
and length.
"""
if len(this) != len(that):
return False
for v1, v2 in zip(this, that):
if v1 != v2:
return False
return True
def __eq__(self, other):
"""Equality of versions is defined by equality of id, date, and list
equality of the predecessors and successors."""
return (self.id == other.id
and self.date == other.date
and Version._eq_list(self.predecessors, other.predecessors)
and Version._eq_list(self.successors, other.successors))
warn(("CaosDB was renamed to LinkAhead. Please import this library as `import linkahead.common.versioning`. Using the"
" old name, starting with caosdb, is deprecated."), DeprecationWarning)
# -*- coding: utf-8 -*-
#
# ** header v3.0
# This file is a part of the CaosDB Project.
#
# Copyright (C) 2018 Research Group Biomedical Physics,
# Max-Planck-Institute for Dynamics and Self-Organization Göttingen
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
# ** end header
#
import os
import yaml
import warnings
try:
optional_jsonschema_validate = None
from jsonschema import validate as optional_jsonschema_validate
except ImportError:
pass
from linkahead.configuration import *
from warnings import warn
from configparser import ConfigParser
from os import environ, getcwd
from os.path import expanduser, join, isfile
def _reset_config():
global _pycaosdbconf
_pycaosdbconf = ConfigParser(allow_no_value=False)
def configure(inifile):
"""read config from file.
Return a list of files which have successfully been parsed.
"""
global _pycaosdbconf
if "_pycaosdbconf" not in globals():
_pycaosdbconf = None
if _pycaosdbconf is None:
_reset_config()
read_config = _pycaosdbconf.read(inifile)
validate_yaml_schema(config_to_yaml(_pycaosdbconf))
if "HTTPS_PROXY" in environ:
_pycaosdbconf["Connection"]["https_proxy"] = environ["HTTPS_PROXY"]
if "HTTP_PROXY" in environ:
_pycaosdbconf["Connection"]["http_proxy"] = environ["HTTP_PROXY"]
return read_config
def get_config():
global _pycaosdbconf
return _pycaosdbconf
def config_to_yaml(config):
valobj = {}
for s in config.sections():
valobj[s] = {}
for key, value in config[s].items():
# TODO: Can the type be inferred from the config object?
if key in ["timeout", "debug"]:
valobj[s][key] = int(value)
elif key in ["ssl_insecure"]:
valobj[s][key] = bool(value)
else:
valobj[s][key] = value
return valobj
def validate_yaml_schema(valobj):
if optional_jsonschema_validate:
with open(os.path.join(os.path.dirname(__file__), "schema-pycaosdb-ini.yml")) as f:
schema = yaml.load(f, Loader=yaml.SafeLoader)
optional_jsonschema_validate(instance=valobj, schema=schema["schema-pycaosdb-ini"])
else:
warnings.warn("""
Warning: The validation could not be performed because `jsonschema` is not installed.
""")
def _read_config_files():
"""Function to read config files from different paths.
Checks for path either in ``$PYCAOSDBINI`` or home directory (``.pycaosdb.ini``), and
additionally in the current working directory (``pycaosdb.ini``).
Returns
-------
ini files: list
The successfully parsed ini-files. Order: env_var or home directory, cwd. Used for testing the function.
"""
return_var = []
if "PYCAOSDBINI" in environ:
return_var.extend(configure(expanduser(environ["PYCAOSDBINI"])))
else:
return_var.extend(configure(expanduser('~/.pycaosdb.ini')))
if isfile(join(getcwd(), "pycaosdb.ini")):
return_var.extend(configure(join(getcwd(), "pycaosdb.ini")))
return return_var
warn(("CaosDB was renamed to LinkAhead. Please import this library as `import linkahead.configuration`. Using the"
" old name, starting with caosdb, is deprecated."), DeprecationWarning)
from linkahead.connection import *
from warnings import warn
warn(("CaosDB was renamed to LinkAhead. Please import this library as `import linkahead.connection`. Using the"
" old name, starting with caosdb, is deprecated."), DeprecationWarning)
from linkahead.connection.authentication import *
from warnings import warn
warn(("CaosDB was renamed to LinkAhead. Please import this library as `import linkahead.connection.authentication`. Using the"
" old name, starting with caosdb, is deprecated."), DeprecationWarning)
#! -*- coding: utf-8 -*-
#
# ** header v3.0
# This file is a part of the CaosDB Project.
#
# Copyright (C) 2018 Research Group Biomedical Physics,
# Max-Planck-Institute for Dynamics and Self-Organization Göttingen
# Copyright (C) 2020 IndiScale GmbH <info@indiscale.com>
# Copyright (C) 2020 Timm Fitschen <f.fitschen@indiscale.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
# ** end header
#
"""auth_token.
An Authentictor which only uses only a pre-supplied authentication token.
"""
from __future__ import absolute_import, unicode_literals, print_function
from .interface import AbstractAuthenticator, CaosDBServerConnection
from caosdb.connection.utils import auth_token_to_cookie
from caosdb.exceptions import LoginFailedError
from linkahead.connection.authentication.auth_token import *
from warnings import warn
def get_authentication_provider():
"""get_authentication_provider.
Return an authenticator which only uses a pre-supplied authentication
token.
Returns
-------
AuthTokenAuthenticator
"""
return AuthTokenAuthenticator()
class AuthTokenAuthenticator(AbstractAuthenticator):
"""AuthTokenAuthenticator.
Subclass of AbstractAuthenticator which provides authentication only via
a given authentication token.
Methods
-------
login
logout
configure
"""
def __init__(self):
super(AuthTokenAuthenticator, self).__init__()
self.auth_token = None
self._connection = None
def login(self):
self._login()
def _login(self):
raise LoginFailedError("The authentication token is expired or you "
"have been logged out otherwise. The "
"auth_token authenticator cannot log in "
"again. You must provide a new "
"authentication token.")
def logout(self):
self._logout()
def _logout(self):
self.logger.debug("[LOGOUT]")
if self.auth_token is not None:
headers = {'Cookie': auth_token_to_cookie(self.auth_token)}
self._connection.request(method="DELETE", path="logout",
headers=headers)
self.auth_token = None
def configure(self, **config):
if "auth_token" in config:
self.auth_token = config["auth_token"]
if "connection" in config:
self._connection = config["connection"]
if not isinstance(self._connection, CaosDBServerConnection):
raise Exception("""Bad configuration of the caosdb connection.
The `connection` must be an instance of
`CaosDBConnection`.""")
warn(("CaosDB was renamed to LinkAhead. Please import this library as `import linkahead.connection.authentication.auth_token`. Using the"
" old name, starting with caosdb, is deprecated."), DeprecationWarning)
# -*- coding: utf-8 -*-
#
# ** header v3.0
# This file is a part of the CaosDB Project.
#
# Copyright (C) 2018 Research Group Biomedical Physics,
# Max-Planck-Institute for Dynamics and Self-Organization Göttingen
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
# ** end header
#
"""external_credentials_provider."""
from __future__ import absolute_import, unicode_literals
from abc import ABCMeta
import logging
from .plain import PlainTextCredentialsProvider
# meta class compatible with Python 2 *and* 3:
ABC = ABCMeta(str('ABC'), (object, ), {str('__slots__'): ()})
from linkahead.connection.authentication.external_credentials_provider import *
from warnings import warn
class ExternalCredentialsProvider(PlainTextCredentialsProvider, ABC):
"""ExternalCredentialsProvider.
Abstract subclass of PlainTextCredentialsProvider which should be used to
implement external credentials provider (e.g. pass, keyring, or any other call
to an external program, which presents the plain text password, which is to be
used for the authentication.
Parameters
----------
callback: Function
A function which has **kwargs argument. This funktion will be called
each time a password is needed with the current connection
configuration as parameters.
"""
def __init__(self, callback):
super(ExternalCredentialsProvider, self).__init__()
self._callback = callback
self._config = None
def configure(self, **config):
"""configure.
Parameters
----------
**config
Keyword arguments containing the necessary arguments for the
concrete implementation of this class.
Attributes
----------
password : str
The password. This password is not stored in this class. A callback
is called to provide the password each time this property is
called.
Returns
-------
None
"""
if "password" in config:
if "password_method" in config:
authm = "`{}`".format(config["password_method"])
else:
authm = "an external credentials provider"
self.logger.log(logging.WARNING,
("`password` defined. You configured caosdb to "
"use %s as authentication method and yet "
"provided a password yourself. This indicates "
"a misconfiguration (e.g. in your "
"pycaosdb.ini) and should be avoided."),
authm)
self._config = dict(config)
super(ExternalCredentialsProvider, self).configure(**config)
@property
def password(self):
return self._callback(**self._config)
warn(("CaosDB was renamed to LinkAhead. Please import this library as `import linkahead.connection.authentication.external_credentials_provider`. Using the"
" old name, starting with caosdb, is deprecated."), DeprecationWarning)
# -*- coding: utf-8 -*-
#
# ** header v3.0
# This file is a part of the CaosDB Project.
#
# Copyright (C) 2018 Research Group Biomedical Physics,
# Max-Planck-Institute for Dynamics and Self-Organization Göttingen
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
# ** end header
#
"""input.
A CredentialsProvider which reads the password from the input line.
"""
from __future__ import absolute_import, unicode_literals, print_function
from .interface import CredentialsProvider, CredentialsAuthenticator
from linkahead.connection.authentication.input import *
from warnings import warn
import getpass
def get_authentication_provider():
"""get_authentication_provider.
Return an authenticator which uses the input for username/password credentials.
Returns
-------
CredentialsAuthenticator
with an InputCredentialsProvider as back-end.
"""
return CredentialsAuthenticator(InputCredentialsProvider())
class InputCredentialsProvider(CredentialsProvider):
"""InputCredentialsProvider.
A class for obtaining the password directly from the user.
Methods
-------
configure
Attributes
----------
password
username
"""
def __init__(self):
super(InputCredentialsProvider, self).__init__()
self._password = None
self._username = None
def configure(self, **config):
"""configure.
Parameters
----------
**config
Keyword arguments containing at least keywords "username" and "password".
Returns
-------
None
"""
if config.get("username"):
self._username = config["username"]
else:
self._username = input("Please enter the user name: ")
url = config["url"]
self._password = getpass.getpass(
"Please enter the password for `{}` at `{}`: ".format(
self._username, url))
@property
def password(self):
return self._password
@property
def username(self):
return self._username
warn(("CaosDB was renamed to LinkAhead. Please import this library as `import linkahead.connection.authentication.input`. Using the"
" old name, starting with caosdb, is deprecated."), DeprecationWarning)
# -*- coding: utf-8 -*-
#
# ** header v3.0
# This file is a part of the CaosDB Project.
#
# Copyright (C) 2018 Research Group Biomedical Physics,
# Max-Planck-Institute for Dynamics and Self-Organization Göttingen
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
# ** end header
#
"""This module provides the interfaces for authenticating requests to the
caosdb server.
Implementing modules muts provide a `get_authentication_provider()` method.
"""
from abc import ABCMeta, abstractmethod, abstractproperty
import logging
from caosdb.connection.utils import urlencode
from caosdb.connection.interface import CaosDBServerConnection
from caosdb.connection.utils import parse_auth_token, auth_token_to_cookie
from caosdb.exceptions import LoginFailedError
from linkahead.connection.authentication.interface import *
from warnings import warn
# meta class compatible with Python 2 *and* 3:
ABC = ABCMeta('ABC', (object, ), {'__slots__': ()})
_LOGGER = logging.getLogger(__name__)
class AbstractAuthenticator(ABC):
"""AbstractAuthenticator.
Interface for different authentication mechanisms. e.g. username/password
authentication or SSH key authentication.
Attributes
----------
logger : Logger
A logger which should be used for all logging which has to do with
authentication.
auth_token : str
A string representation of a CaosDB Auth Token.
Methods
-------
login (abstract)
logout (abstract)
configure (abstract)
on_request
on_response
"""
def __init__(self):
self.auth_token = None
self.logger = _LOGGER
@abstractmethod
def login(self):
"""login.
To be implemented by the child classes.
Returns
-------
None
"""
pass
@abstractmethod
def logout(self):
"""logout.
To be implemented by the child classes.
Returns
-------
None
"""
pass
@abstractmethod
def configure(self, **config):
"""configure.
Configure this authenticator.
Parameters
----------
**config
Keyword arguments for the configuration.
Returns
-------
None
"""
pass
def on_response(self, response):
"""on_response.
A call-back with is to be called by the connection after each
response. This method reads the latest auth cookie from the response.
Parameters
----------
response : CaosDBHTTPResponse
The response of the server
Returns
-------
"""
self.auth_token = parse_auth_token(
response.getheader("Set-Cookie"))
def on_request(self, method, path, headers, **kwargs):
# pylint: disable=unused-argument
"""on_request.
A call-back which is to be called by the connection before each
request. This method set the auth cookie for that request.
Parameters
----------
method : str
The request method.
path : str
The request path.
headers : dict
A dictionary with headers which are to be set.
**kwargs
Ignored
Returns
-------
"""
if self.auth_token is None:
self.login()
if self.auth_token is not None:
headers['Cookie'] = auth_token_to_cookie(self.auth_token)
class CredentialsAuthenticator(AbstractAuthenticator):
"""CredentialsAuthenticator.
Subclass of AbstractAuthenticator which provides authentication via
credentials (username/password). This class always needs a
credentials_provider which provides valid credentials_provider before each
login.
Parameters
----------
credentials_provider : CredentialsProvider
The source for the username and the password.
Methods
-------
login
logout
configure
"""
def __init__(self, credentials_provider):
super(CredentialsAuthenticator, self).__init__()
self._credentials_provider = credentials_provider
self._connection = None
self.auth_token = None
def login(self):
self._login()
def logout(self):
self._logout()
def _logout(self):
self.logger.debug("[LOGOUT]")
if self.auth_token is not None:
self._connection.request(method="DELETE", path="logout")
self.auth_token = None
def _login(self):
username = self._credentials_provider.username
password = self._credentials_provider.password
self.logger.debug("[LOGIN] %s", username)
# we need a username for this:
if username is None:
raise LoginFailedError("No username was given.")
if password is None:
raise LoginFailedError("No password was given")
headers = {}
headers["Content-Type"] = "application/x-www-form-urlencoded"
body = urlencode({"username": username, "password": password})
response = self._connection.request(method="POST",
path="login",
headers=headers, body=body)
response.read() # clear socket
if response.status != 200:
raise LoginFailedError("LOGIN WAS NOT SUCCESSFUL")
self.on_response(response)
return response
def configure(self, **config):
self._credentials_provider.configure(**config)
if "connection" in config:
self._connection = config["connection"]
if not isinstance(self._connection, CaosDBServerConnection):
raise Exception("""Bad configuration of the caosdb connection.
The `connection` must be an instance of
`CaosDBConnection`.""")
class CredentialsProvider(ABC):
"""CredentialsProvider.
An abstract class for username/password authentication.
Attributes
----------
password (abstract)
username (abstract)
logger : Logger
A logger which should be used for all logging which has to do with the
provision of credentials. This is usually just the "authentication"
logger.
Methods
-------
configure (abstract)
"""
def __init__(self):
self.logger = _LOGGER
@abstractmethod
def configure(self, **config):
"""configure.
Configure the credentials provider with a dict.
Parameters
----------
**config
Keyword arguments. The relevant arguments depend on the
implementing subclass of this class.
Returns
-------
None
"""
@abstractproperty
def password(self):
"""password."""
@abstractproperty
def username(self):
"""username."""
warn(("CaosDB was renamed to LinkAhead. Please import this library as `import linkahead.connection.authentication.interface`. Using the"
" old name, starting with caosdb, is deprecated."), DeprecationWarning)
# -*- coding: utf-8 -*-
#
# ** header v3.0
# This file is a part of the CaosDB Project.
#
# Copyright (C) 2018 Research Group Biomedical Physics,
# Max-Planck-Institute for Dynamics and Self-Organization Göttingen
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
# ** end header
#
"""keyring.
A CredentialsProvider which uses the external 'keyring' library to
retrieve the password.
"""
from linkahead.connection.authentication.keyring import *
from warnings import warn
import sys
import importlib
from getpass import getpass
from caosdb.exceptions import ConfigurationError
from .external_credentials_provider import ExternalCredentialsProvider
from .interface import CredentialsAuthenticator
def get_authentication_provider():
"""get_authentication_provider.
Return an authenticator which uses plain text username/password credentials.
The difference to the `plain` module is that this implementation retrieves
the password from the external gnome keyring.
Returns
-------
CredentialsAuthenticator
with a 'KeyringCaller' as back-end.
"""
return CredentialsAuthenticator(KeyringCaller(callback=_call_keyring))
def _get_external_keyring():
try:
return importlib.import_module("keyring")
except ImportError:
raise RuntimeError(
"The keyring password method requires installation of the"
"keyring python package. On linux with python < 3.5, "
"this requires the installation of dbus-python as a "
"system package.")
def _call_keyring(**config):
if "username" not in config:
raise ConfigurationError("Your configuration did not provide a "
"`username` which is needed by the "
"`KeyringCaller` to retrieve the "
"password in question.")
url = config.get("url")
username = config.get("username")
app = "caosdb — {}".format(url)
external_keyring = _get_external_keyring()
password = external_keyring.get_password(app, username)
if password is None:
print("No password for user {} on {} found in keyring."
.format(username, app))
password = getpass("Enter password to save "
"in system keyring/wallet: ")
external_keyring.set_password(app, username, password)
return password
class KeyringCaller(ExternalCredentialsProvider):
"""KeyringCaller.
A class for retrieving the password from the external 'gnome keyring' and
storing the username/password credentials as plain text strings.
Methods
-------
configure
Attributes
----------
password
username
"""
warn(("CaosDB was renamed to LinkAhead. Please import this library as `import linkahead.connection.authentication.keyring`. Using the"
" old name, starting with caosdb, is deprecated."), DeprecationWarning)
# -*- coding: utf-8 -*-
#
# ** header v3.0
# This file is a part of the CaosDB Project.
#
# Copyright (C) 2018 Research Group Biomedical Physics,
# Max-Planck-Institute for Dynamics and Self-Organization Göttingen
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
# ** end header
#
"""pass.
A CredentialsProvider which calls the external program 'pass' for the
password.
"""
from linkahead.connection.authentication.pass import *
from warnings import warn
from subprocess import check_output, CalledProcessError
from caosdb.exceptions import ConfigurationError
from .interface import CredentialsAuthenticator
from .external_credentials_provider import ExternalCredentialsProvider
def get_authentication_provider():
"""get_authentication_provider.
Return an authenticator which uses plain text username/password credentials.
The difference to the `plain` module is that this implementation retrieves
the password from the external program 'pass'.
Returns
-------
CredentialsAuthenticator
with a 'PassCaller' as back-end.
"""
return CredentialsAuthenticator(PassCaller(callback=_call_pass))
def _call_pass(**config):
if "password_identifier" not in config:
raise ConfigurationError("Your configuration did not provide a "
"`password_identifier` which is needed "
"by the `PassCaller` to retrieve the "
"password in question.")
try:
return check_output(
"pass " + config["password_identifier"],
shell=True).splitlines()[0].decode("UTF-8")
except CalledProcessError as exc:
raise RuntimeError(
"Password manager returned error code {}. This usually "
"occurs if the password_identifier in .pycaosdb.ini is "
"incorrect or missing.".format(exc.returncode))
class PassCaller(ExternalCredentialsProvider):
"""PassCaller.
A class for retrieving the password from the external program 'pass' and
storing the username/password credentials as plain text strings.
Methods
-------
configure
Attributes
----------
password
username
"""
# all the work is done in _call_pass and the super class
pass
warn(("CaosDB was renamed to LinkAhead. Please import this library as `import linkahead.connection.authentication.pass`. Using the"
" old name, starting with caosdb, is deprecated."), DeprecationWarning)
# -*- coding: utf-8 -*-
#
# ** header v3.0
# This file is a part of the CaosDB Project.
#
# Copyright (C) 2018 Research Group Biomedical Physics,
# Max-Planck-Institute for Dynamics and Self-Organization Göttingen
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
# ** end header
#
"""plain.
A CredentialsProvider which reads the password from the configuration
dict.
"""
from __future__ import absolute_import, unicode_literals, print_function
from .interface import CredentialsProvider, CredentialsAuthenticator
from linkahead.connection.authentication.plain import *
from warnings import warn
def get_authentication_provider():
"""get_authentication_provider.
Return an authenticator which uses plain text username/password credentials.
Returns
-------
CredentialsAuthenticator
with a PlainTextCredentialsProvider as back-end.
"""
return CredentialsAuthenticator(PlainTextCredentialsProvider())
class PlainTextCredentialsProvider(CredentialsProvider):
"""PlainTextCredentialsProvider.
A class for storing username/password credentials as plain text strings.
Methods
-------
configure
Attributes
----------
password
username
"""
def __init__(self):
super(PlainTextCredentialsProvider, self).__init__()
self._password = None
self._username = None
def configure(self, **config):
"""configure.
Parameters
----------
**config
Keyword arguments containing at least keywords "username" and "password".
Returns
-------
None
"""
if "password" in config:
self._password = config["password"]
if "username" in config:
self._username = config["username"]
@property
def password(self):
return self._password
@property
def username(self):
return self._username
warn(("CaosDB was renamed to LinkAhead. Please import this library as `import linkahead.connection.authentication.plain`. Using the"
" old name, starting with caosdb, is deprecated."), DeprecationWarning)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment