Skip to content
Snippets Groups Projects
Verified Commit b09ad9c1 authored by Daniel Hornung's avatar Daniel Hornung
Browse files

MAINT: Styling, linting

parent c7b91387
No related branches found
No related tags found
2 merge requests!143Release 0.15.0,!135Add and fix more type hints
Pipeline #50588 passed with warnings
Showing
with 220 additions and 194 deletions
[aliases]
test=pytest
[pycodestyle]
ignore=E501,E121,E123,E126,E226,E24,E704,W503,W504
[mypy]
ignore_missing_imports = True
\ No newline at end of file
......@@ -6,6 +6,8 @@
# Max-Planck-Institute for Dynamics and Self-Organization Göttingen
# Copyright (C) 2020 Timm Fitschen <t.fitschen@indiscale.com>
# Copyright (C) 2020-2022 IndiScale GmbH <info@indiscale.com>
# Copyright (C) 2024 Indiscale GmbH <info@indiscale.com>
# Copyright (C) 2024 Joscha Schmiedt <joscha@schmiedt.dev>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
......@@ -29,7 +31,7 @@ from __future__ import annotations
import logging
import warnings
from collections.abc import Iterable
from typing import Any, Dict, List, Union, Optional, Tuple
from typing import Any, Union, Optional
from .common.datatype import is_reference
from .common.models import (SPECIAL_ATTRIBUTES, Container, Entity, File,
......@@ -94,14 +96,14 @@ def new_record(record_type: Union[str],
return r
def id_query(ids: List[int]) -> Container:
def id_query(ids: list[int]) -> Container:
warnings.warn("Please use 'create_id_query', which only creates"
"the string.", DeprecationWarning)
return execute_query(create_id_query(ids)) # type: ignore
def create_id_query(ids: List[int]) -> str:
def create_id_query(ids: list[int]) -> str:
return "FIND ENTITY WITH " + " OR ".join(
["ID={}".format(id) for id in ids])
......@@ -133,7 +135,7 @@ def retrieve_entity_with_id(eid: int):
return execute_query("FIND ENTITY WITH ID={}".format(eid), unique=True)
def retrieve_entities_with_ids(entities: List) -> Container:
def retrieve_entities_with_ids(entities: list) -> Container:
collection = Container()
step = 20
......@@ -180,7 +182,7 @@ def getCommitIn(folder):
def compare_entities(old_entity: Entity,
new_entity: Entity,
compare_referenced_records: bool = False
) -> Tuple[Dict[str, Any], Dict[str, Any]]:
) -> tuple[dict[str, Any], dict[str, Any]]:
"""Compare two entites.
Return a tuple of dictionaries, the first index belongs to additional information for old
......@@ -214,8 +216,8 @@ def compare_entities(old_entity: Entity,
identical records are stored in different objects. Default is False.
"""
olddiff: Dict[str, Any] = {"properties": {}, "parents": []}
newdiff: Dict[str, Any] = {"properties": {}, "parents": []}
olddiff: dict[str, Any] = {"properties": {}, "parents": []}
newdiff: dict[str, Any] = {"properties": {}, "parents": []}
if old_entity is new_entity:
return (olddiff, newdiff)
......@@ -295,12 +297,15 @@ def compare_entities(old_entity: Entity,
elif isinstance(prop.value, list) and isinstance(matching[0].value, list):
# all elements in both lists actually are entity objects
# TODO: check, whether mixed cases can be allowed or should lead to an error
if all([isinstance(x, Entity) for x in prop.value]) and all([isinstance(x, Entity) for x in matching[0].value]):
if (all([isinstance(x, Entity) for x in prop.value])
and all([isinstance(x, Entity) for x in matching[0].value])):
# can't be the same if the lengths are different
if len(prop.value) == len(matching[0].value):
# do a one-by-one comparison; the values are the same, if all diffs are empty
# do a one-by-one comparison:
# the values are the same if all diffs are empty
same_value = all(
[empty_diff(x, y, False) for x, y in zip(prop.value, matching[0].value)])
[empty_diff(x, y, False) for x, y
in zip(prop.value, matching[0].value)])
if not same_value:
olddiff["properties"][prop.name]["value"] = prop.value
......@@ -333,7 +338,8 @@ def compare_entities(old_entity: Entity,
return (olddiff, newdiff)
def empty_diff(old_entity: Entity, new_entity: Entity, compare_referenced_records: bool = False) -> bool:
def empty_diff(old_entity: Entity, new_entity: Entity,
compare_referenced_records: bool = False) -> bool:
"""Check whether the `compare_entities` found any differences between
old_entity and new_entity.
......@@ -606,7 +612,7 @@ def resolve_reference(prop: Property):
prop.value = retrieve_entity_with_id(prop.value)
def create_flat_list(ent_list: List[Entity], flat: List[Entity]):
def create_flat_list(ent_list: list[Entity], flat: list[Entity]):
"""
Recursively adds all properties contained in entities from ent_list to
the output list flat. Each element will only be added once to the list.
......
......@@ -5,6 +5,8 @@
# Copyright (C) 2023 IndiScale GmbH <info@indiscale.com>
# Copyright (C) 2023 Henrik tom Wörden <h.tomwoerden@indiscale.com>
# Copyright (C) 2023 Daniel Hornung <d.hornung@indiscale.com>
# Copyright (C) 2024 Indiscale GmbH <info@indiscale.com>
# Copyright (C) 2024 Joscha Schmiedt <joscha@schmiedt.dev>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
......@@ -35,7 +37,7 @@ See also
from __future__ import annotations
from enum import Enum
from functools import lru_cache
from typing import Union, Optional, Tuple, Any, Dict
from typing import Any, Optional, Union
from .exceptions import EmptyUniqueQueryError, QueryNotUniqueError
from .utils import get_entity
......@@ -46,7 +48,7 @@ from .common.models import execute_query, Entity, Container
DEFAULT_SIZE = 33333
# This dict cache is solely for filling the real cache manually (e.g. to reuse older query results)
_DUMMY_CACHE: Dict[Union[str, int], Any] = {}
_DUMMY_CACHE: dict[Union[str, int], Any] = {}
class AccessType(Enum):
......@@ -63,7 +65,7 @@ class AccessType(Enum):
def cached_get_entity_by(eid: Union[str, int, None] = None,
name: Optional[str] = None,
path: Optional[str] = None,
query: Optional[str] = None) -> Union[Entity, Tuple[None]]:
query: Optional[str] = None) -> Union[Entity, tuple[None]]:
"""Return a single entity that is identified uniquely by one argument.
You must supply exactly one argument.
......@@ -178,7 +180,7 @@ def cache_initialize(maxsize: int = DEFAULT_SIZE) -> None:
_cached_access = lru_cache(maxsize=maxsize)(_cached_access.__wrapped__)
def cache_fill(items: Dict[Union[str, int], Any],
def cache_fill(items: dict[Union[str, int], Any],
kind: AccessType = AccessType.EID,
unique: bool = True) -> None:
"""Add entries to the cache manually.
......
......@@ -24,7 +24,7 @@
# ** end header
#
from __future__ import annotations
"""missing docstring."""
"""Utility functions for server and user administration."""
import random
import re
......@@ -38,7 +38,7 @@ from ..exceptions import (EntityDoesNotExistError, HTTPClientError,
ServerConfigurationException)
from .utils import xml2str
from typing import Dict, Optional, TYPE_CHECKING, Union
from typing import Optional, TYPE_CHECKING, Union
if TYPE_CHECKING:
from ..common.models import Entity
......@@ -69,7 +69,7 @@ def set_server_property(key: str, value: str):
"Debug mode in server is probably disabled.") from None
def get_server_properties() -> Dict[str, Optional[str]]:
def get_server_properties() -> dict[str, Optional[str]]:
"""get_server_properties.
Get all server properties as a dict.
......@@ -88,7 +88,7 @@ def get_server_properties() -> Dict[str, Optional[str]]:
"Debug mode in server is probably disabled.") from None
xml = etree.parse(body)
props: Dict[str, Optional[str]] = dict()
props: dict[str, Optional[str]] = dict()
for elem in xml.getroot():
props[elem.tag] = elem.text
......@@ -184,7 +184,7 @@ def _update_user(name: str,
email: Optional[str] = None,
entity: Optional[Entity] = None, **kwargs):
con = get_connection()
params: Dict[str, Optional[str]] = {}
params: dict[str, Optional[str]] = {}
if password is not None:
params["password"] = password
......@@ -218,7 +218,7 @@ def _insert_user(name: str,
email: Optional[str] = None,
entity: Optional[Entity] = None, **kwargs):
con = get_connection()
params: Dict[str, Union[str, Entity]] = {"username": name}
params: dict[str, Union[str, Entity]] = {"username": name}
if password is not None:
params["password"] = password
......
......@@ -24,11 +24,10 @@
#
from __future__ import annotations
import re
import sys
from typing import TYPE_CHECKING
if TYPE_CHECKING and sys.version_info > (3, 7):
from typing import Literal, Union, List
if TYPE_CHECKING:
from typing import Literal, Union
from linkahead.common.models import Entity, Container
DATATYPE = Literal["DOUBLE", "REFERENCE", "TEXT", "DATETIME", "INTEGER", "FILE", "BOOLEAN"]
......@@ -46,8 +45,7 @@ BOOLEAN = "BOOLEAN"
def LIST(datatype: Union[str, Entity, DATATYPE]) -> str:
# FIXME May be ambiguous (if name duplicate) or insufficient (if only ID exists).
if hasattr(datatype, "name"):
datatype = datatype.name
datatype = getattr(datatype, "name", datatype)
return "LIST<" + str(datatype) + ">"
......@@ -179,7 +177,7 @@ def get_id_of_datatype(datatype: str) -> int:
res: Container = execute_query(q) # type: ignore
if isinstance(res, int):
raise ValueError("FIND RECORDTYPE query returned an `int`")
res: List[Entity] = [el for el in res if el.name.lower() == datatype.lower()] # type: ignore
res: list[Entity] = [el for el in res if el.name.lower() == datatype.lower()] # type: ignore
if len(res) > 1:
raise QueryNotUniqueError(
......
This diff is collapsed.
......@@ -26,7 +26,7 @@ from lxml import etree
from typing import TYPE_CHECKING
import sys
if TYPE_CHECKING and sys.version_info > (3, 7):
if TYPE_CHECKING:
from typing import Optional
from linkahead.common.models import ACL, ACI
......
......@@ -26,14 +26,13 @@
Currently this module defines nothing but a single class, `Version`.
"""
from __future__ import absolute_import, annotations
from __future__ import annotations
from .utils import xml2str
from lxml import etree
from typing import TYPE_CHECKING
import sys
if TYPE_CHECKING and sys.version_info > (3, 7):
from typing import Optional, List, Union, Literal
if TYPE_CHECKING:
from typing import Optional, List, Union
class Version():
......@@ -206,10 +205,8 @@ object."""
version : Version
a new version instance
"""
predecessors = [Version.from_xml(
p) for p in xml if p.tag.lower() == "predecessor"]
successors = [Version.from_xml(s)
for s in xml if s.tag.lower() == "successor"]
predecessors = [Version.from_xml(p) for p in xml if p.tag.lower() == "predecessor"]
successors = [Version.from_xml(s) for s in xml if s.tag.lower() == "successor"]
return Version(id=xml.get("id"), date=xml.get("date"),
is_head=xml.get("head"),
is_complete_history=xml.get("completeHistory"),
......
......@@ -37,7 +37,7 @@ from configparser import ConfigParser
from os import environ, getcwd
from os.path import expanduser, isfile, join
from typing import Dict, Union, Callable, Optional
from typing import Union, Callable, Optional
_pycaosdbconf = ConfigParser(allow_no_value=False)
......@@ -72,8 +72,8 @@ def get_config() -> ConfigParser:
return _pycaosdbconf
def config_to_yaml(config: ConfigParser) -> Dict[str, Dict[str, Union[int, str, bool]]]:
valobj: Dict[str, Dict[str, Union[int, str, bool]]] = {}
def config_to_yaml(config: ConfigParser) -> dict[str, dict[str, Union[int, str, bool]]]:
valobj: dict[str, dict[str, Union[int, str, bool]]] = {}
for s in config.sections():
valobj[s] = {}
for key, value in config[s].items():
......@@ -88,7 +88,7 @@ def config_to_yaml(config: ConfigParser) -> Dict[str, Dict[str, Union[int, str,
return valobj
def validate_yaml_schema(valobj: Dict[str, Dict[str, Union[int, str, bool]]]):
def validate_yaml_schema(valobj: dict[str, dict[str, Union[int, str, bool]]]):
if optional_jsonschema_validate:
with open(os.path.join(os.path.dirname(__file__), "schema-pycaosdb-ini.yml")) as f:
schema = yaml.load(f, Loader=yaml.SafeLoader)
......
......@@ -25,7 +25,7 @@
A CredentialsProvider which reads the password from the input line.
"""
from __future__ import absolute_import, unicode_literals, print_function, annotations
from __future__ import annotations
from .interface import CredentialsProvider, CredentialsAuthenticator
from typing import Optional
import getpass
......
# -*- coding: utf-8 -*-
#
# ** header v3.0
# This file is a part of the LinkAhead Project.
#
# Copyright (C) 2018 Research Group Biomedical Physics,
# Max-Planck-Institute for Dynamics and Self-Organization Göttingen
# Copyright (C) 2024 Indiscale GmbH <info@indiscale.com>
# Copyright (C) 2024 Joscha Schmiedt <joscha@schmiedt.dev>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
......@@ -18,14 +19,13 @@
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
# ** end header
#
"""This module provides the interfaces for authenticating requests to the
LinkAhead server.
Implementing modules muts provide a `get_authentication_provider()` method.
Implementing modules must provide a `get_authentication_provider()` method.
"""
from __future__ import annotations
from abc import ABC, abstractmethod
import logging
......@@ -33,10 +33,10 @@ from ..utils import urlencode
from ..interface import CaosDBServerConnection
from ..utils import parse_auth_token, auth_token_to_cookie
from ...exceptions import LoginFailedError
from typing import TYPE_CHECKING, Dict, Optional
from typing import TYPE_CHECKING, Optional
if TYPE_CHECKING:
from ..interface import CaosDBHTTPResponse
QueryDict = Dict[str, Optional[str]]
QueryDict = dict[str, Optional[str]]
_LOGGER = logging.getLogger(__name__)
......
# -*- coding: utf-8 -*-
#
# ** header v3.0
# This file is a part of the LinkAhead Project.
#
# Copyright (C) 2018 Research Group Biomedical Physics,
# Max-Planck-Institute for Dynamics and Self-Organization Göttingen
# Copyright (c) 2019 Daniel Hornung
# Copyright (C) 2024 Indiscale GmbH <info@indiscale.com>
# Copyright (C) 2024 Joscha Schmiedt <joscha@schmiedt.dev>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
......@@ -23,7 +23,7 @@
# ** end header
#
"""Connection to a LinkAhead server."""
from __future__ import absolute_import, print_function, unicode_literals, annotations
from __future__ import annotations
import logging
import ssl
......@@ -33,7 +33,6 @@ from builtins import str # pylint: disable=redefined-builtin
from errno import EPIPE as BrokenPipe
from socket import error as SocketError
from urllib.parse import ParseResult, quote, urlparse
from warnings import warn
from requests import Session as HTTPSession
from requests.adapters import HTTPAdapter
......@@ -52,15 +51,13 @@ try:
except ModuleNotFoundError:
version = "uninstalled"
from pkg_resources import resource_filename
from .encode import MultipartYielder, ReadableMultiparts
from .interface import CaosDBHTTPResponse, CaosDBServerConnection
from .utils import make_uri_path, parse_url, urlencode
from .utils import make_uri_path, urlencode
from typing import TYPE_CHECKING
if TYPE_CHECKING and sys.version_info > (3, 7):
from typing import Optional, List, Any, Iterator, Dict, Union
if TYPE_CHECKING:
from typing import Optional, Any, Iterator, Union
from requests.models import Response
from ssl import _SSLMethod
from .authentication.interface import AbstractAuthenticator, CredentialsAuthenticator
......@@ -171,7 +168,7 @@ class _DefaultCaosDBServerConnection(CaosDBServerConnection):
def request(self,
method: str, path: str,
headers: Optional[Dict[str, str]] = None,
headers: Optional[dict[str, str]] = None,
body: Union[str, bytes, None] = None,
**kwargs) -> _WrappedHTTPResponse:
"""request.
......@@ -250,7 +247,8 @@ class _DefaultCaosDBServerConnection(CaosDBServerConnection):
"do so via linkahead.configure_connection(...) or in a config "
"file.")
url_string: str = config["url"]
if (not url_string.lower().startswith("https://") and not url_string.lower().startswith("http://")):
if (not url_string.lower().startswith("https://")
and not url_string.lower().startswith("http://")):
raise LinkAheadConnectionError("The connection url is expected "
"to be a http or https url and "
"must include the url scheme "
......@@ -289,7 +287,7 @@ class _DefaultCaosDBServerConnection(CaosDBServerConnection):
if "timeout" in config:
self._timeout = config["timeout"]
def _setup_ssl(self, config: Dict[str, Any]):
def _setup_ssl(self, config: dict[str, Any]):
if "ssl_version" in config and config["cacert"] is not None:
ssl_version = getattr(ssl, config["ssl_version"])
else:
......@@ -431,7 +429,9 @@ def configure_connection(**kwargs):
auth_token : str (optional)
An authentication token which has been issued by the LinkAhead Server.
Implies `password_method="auth_token"` if set. An example token string would be `["O","OneTimeAuthenticationToken","anonymous",["administration"],[],1592995200000,604800000,"3ZZ4WKRB-5I7DG2Q6-ZZE6T64P-VQ","197d0d081615c52dc18fb323c300d7be077beaad4020773bb58920b55023fa6ee49355e35754a4277b9ac525c882bcd3a22e7227ba36dfcbbdbf8f15f19d1ee9",1,30000]`.
Implies `password_method="auth_token"` if set. An example token string would be
``["O","OneTimeAuthenticationToken","anonymous",["administration"],[],1592995200000,
604800000,"3ZZ4WKRB-5I7DG2Q6-ZZE6T64P-VQ","197d0d081615...1ee9",1,30000]``.
https_proxy : str, optional
Define a proxy for the https connections, e.g. `http://localhost:8888`,
......@@ -599,8 +599,8 @@ class _Connection(object): # pylint: disable=useless-object-inheritance
return self
def retrieve(self,
entity_uri_segments: Optional[List[str]] = None,
query_dict: Optional[Dict[str, Optional[str]]] = None,
entity_uri_segments: Optional[list[str]] = None,
query_dict: Optional[dict[str, Optional[str]]] = None,
**kwargs) -> CaosDBHTTPResponse:
path = make_uri_path(entity_uri_segments, query_dict)
......@@ -608,8 +608,9 @@ class _Connection(object): # pylint: disable=useless-object-inheritance
return http_response
def delete(self, entity_uri_segments: Optional[List[str]] = None,
query_dict: Optional[Dict[str, Optional[str]]] = None, **kwargs) -> CaosDBHTTPResponse:
def delete(self, entity_uri_segments: Optional[list[str]] = None,
query_dict: Optional[dict[str, Optional[str]]] = None, **kwargs) -> (
CaosDBHTTPResponse):
path = make_uri_path(entity_uri_segments, query_dict)
http_response = self._http_request(
......@@ -617,8 +618,9 @@ class _Connection(object): # pylint: disable=useless-object-inheritance
return http_response
def update(self, entity_uri_segment: Optional[List[str]],
query_dict: Optional[Dict[str, Optional[str]]] = None, **kwargs) -> CaosDBHTTPResponse:
def update(self, entity_uri_segment: Optional[list[str]],
query_dict: Optional[dict[str, Optional[str]]] = None, **kwargs) -> (
CaosDBHTTPResponse):
path = make_uri_path(entity_uri_segment, query_dict)
http_response = self._http_request(method="PUT", path=path, **kwargs)
......@@ -640,13 +642,15 @@ class _Connection(object): # pylint: disable=useless-object-inheritance
return self._form_data_request(
method="PUT", path=entity_uri_segment, params=params)
def post_form_data(self, entity_uri_segment: str, params: Dict[str, Optional[str]]) -> CaosDBHTTPResponse:
def post_form_data(self, entity_uri_segment: str, params: dict[str, Optional[str]]) -> (
CaosDBHTTPResponse):
return self._form_data_request(
method="POST",
path=entity_uri_segment,
params=params)
def _form_data_request(self, method: str, path: str, params: Dict[str, Optional[str]]) -> CaosDBHTTPResponse:
def _form_data_request(self, method: str, path: str, params: dict[str, Optional[str]]) -> (
CaosDBHTTPResponse):
body = urlencode(params)
headers = {}
headers["Content-Type"] = "application/x-www-form-urlencoded"
......@@ -658,8 +662,8 @@ class _Connection(object): # pylint: disable=useless-object-inheritance
return response
def insert(self, entity_uri_segment: Optional[List[str]],
query_dict: Optional[Dict[str, Optional[str]]] = None,
def insert(self, entity_uri_segment: Optional[list[str]],
query_dict: Optional[dict[str, Optional[str]]] = None,
body: Union[str, bytes, None] = None, **kwargs) -> CaosDBHTTPResponse:
path = make_uri_path(entity_uri_segment, query_dict)
......@@ -686,7 +690,7 @@ class _Connection(object): # pylint: disable=useless-object-inheritance
self._authenticator.logout()
def _http_request(self, method: str, path: str,
headers: Optional[Dict["str", Any]] = None,
headers: Optional[dict["str", Any]] = None,
body: Union[str, bytes, None] = None, **kwargs):
try:
return self._retry_http_request(method=method, path=path,
......@@ -713,7 +717,7 @@ class _Connection(object): # pylint: disable=useless-object-inheritance
def _retry_http_request(self,
method: str,
path: str,
headers: Optional[Dict["str", Any]],
headers: Optional[dict["str", Any]],
body: Union[str, bytes, None], **kwargs) -> CaosDBHTTPResponse:
if hasattr(body, "encode") and body is not None:
......
......@@ -5,6 +5,8 @@
#
# Copyright (C) 2018 Research Group Biomedical Physics,
# Max-Planck-Institute for Dynamics and Self-Organization Göttingen
# Copyright (C) 2024 Indiscale GmbH <info@indiscale.com>
# Copyright (C) 2024 Joscha Schmiedt <joscha@schmiedt.dev>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
......@@ -63,8 +65,8 @@ import os
import mimetypes
from email.header import Header
from typing import TYPE_CHECKING
import sys
if TYPE_CHECKING and sys.version_info > (3, 7):
if TYPE_CHECKING:
from typing import Optional
......@@ -152,14 +154,6 @@ class MultipartParam(object):
except BaseException:
raise ValueError("Could not determine filesize")
def __cmp__(self, other):
attrs = [
'name', 'value', 'filename', 'filetype', 'filesize', 'fileobj'
]
myattrs = [getattr(self, a) for a in attrs]
oattrs = [getattr(other, a) for a in attrs]
return cmp(myattrs, oattrs)
def reset(self):
"""Reset the file object's read pointer."""
if self.fileobj is not None:
......
......@@ -29,7 +29,7 @@ from warnings import warn
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from typing import Optional, Dict, Union
from typing import Optional, Union
class CaosDBHTTPResponse(ABC):
......@@ -59,7 +59,7 @@ class CaosDBHTTPResponse(ABC):
"""Status code of the response."""
@abstractmethod
def getheaders(self) -> Dict[str, str]:
def getheaders(self) -> dict[str, str]:
"""Return all headers."""
def __enter__(self):
......@@ -85,7 +85,7 @@ class CaosDBServerConnection(ABC):
def request(self,
method: str,
path: str,
headers: Optional[Dict[str, str]] = None,
headers: Optional[dict[str, str]] = None,
body: Union[str, bytes, None] = None,
**kwargs) -> CaosDBHTTPResponse:
"""Abstract method. Implement this method for HTTP requests to the
......
......@@ -5,6 +5,8 @@
#
# Copyright (C) 2018 Research Group Biomedical Physics,
# Max-Planck-Institute for Dynamics and Self-Organization Göttingen
# Copyright (C) 2024 Indiscale GmbH <info@indiscale.com>
# Copyright (C) 2024 Joscha Schmiedt <joscha@schmiedt.dev>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
......@@ -22,19 +24,18 @@
# ** end header
#
"""Utility functions for the connection module."""
from __future__ import unicode_literals, print_function, annotations
from builtins import str as unicode
from urllib.parse import (urlencode as _urlencode, quote as _quote,
urlparse, urlunparse, unquote as _unquote)
from __future__ import annotations
import re
from urllib.parse import (urlencode as _urlencode, quote as _quote,
urlparse, urlunparse, unquote)
from typing import TYPE_CHECKING
import sys
if TYPE_CHECKING and sys.version_info > (3, 7):
from typing import Optional, Dict, List
if TYPE_CHECKING:
from typing import Optional
def urlencode(query: Dict[str, Optional[str]]) -> str:
def urlencode(query: dict[str, Optional[str]]) -> str:
"""Convert a dict of into a url-encoded (unicode) string.
This is basically a python2/python3 compatibility wrapper for the respective
......@@ -84,8 +85,8 @@ modules when they are called with only the query parameter.
}))
def make_uri_path(segments: Optional[List[str]] = None,
query: Optional[Dict[str, Optional[str]]] = None) -> str:
def make_uri_path(segments: Optional[list[str]] = None,
query: Optional[dict[str, Optional[str]]] = None) -> str:
"""Url-encode all segments, concat them with slashes and append the query.
Examples
......@@ -141,18 +142,6 @@ def parse_url(url: str):
_PATTERN = re.compile(r"^SessionToken=([^;]*);.*$")
def unquote(string) -> str:
"""unquote.
Decode an urlencoded string into a plain text string.
"""
bts = _unquote(string)
if hasattr(bts, "decode"):
# python 2
return bts.decode("utf-8")
return bts
def parse_auth_token(cookie: Optional[str]) -> Optional[str]:
"""parse_auth_token.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment