Skip to content
Snippets Groups Projects
Commit a2471d5e authored by Florian Spreckelsen's avatar Florian Spreckelsen
Browse files

Merge branch 'f-insert-auth' into 'dev'

F insert auth

See merge request !56
parents 91118aa2 c1e043e2
No related branches found
No related tags found
2 merge requests!59FIX: if multiple updates for one entity exist, the retrieve would result in an...,!56F insert auth
Pipeline #28196 passed
...@@ -8,6 +8,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ...@@ -8,6 +8,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added ### ### Added ###
- Unauthorized inserts can now be cached. Note that the Crawler cannot postpone
inserts but the Cache has the functionality now.
### Changed ### ### Changed ###
### Deprecated ### ### Deprecated ###
......
...@@ -17,7 +17,7 @@ OUT=/tmp/crawler.output ...@@ -17,7 +17,7 @@ OUT=/tmp/crawler.output
ls ls
cat pycaosdb.ini cat pycaosdb.ini
python3 -c "import caosdb; print('CaosDB Version:', caosdb.__version__)" python3 -c "import caosdb; print('CaosDB Version:', caosdb.__version__)"
rm -rf cache.db rm -rf /tmp/caosdb_identifiable_cache.db
set -e set -e
echo "Clearing database" echo "Clearing database"
python3 clear_database.py python3 clear_database.py
......
...@@ -23,8 +23,8 @@ ...@@ -23,8 +23,8 @@
# #
# ** end header # ** end header
# TODO this is implementing a cache on client side. Should it be on # Note: This is implementing a cache on client side. It would be great if the server would provide
# server side? # something to replace this.
import os import os
import sqlite3 import sqlite3
from copy import deepcopy from copy import deepcopy
...@@ -68,6 +68,25 @@ def get_pretty_xml(cont): ...@@ -68,6 +68,25 @@ def get_pretty_xml(cont):
class AbstractCache(ABC): class AbstractCache(ABC):
def __init__(self, db_file=None, force_creation=False):
"""
db_file: The path of the database file.
if force_creation is set to True, the file will be created
regardless of a file at the same path already exists.
"""
if db_file is None:
tmppath = tempfile.gettempdir()
self.db_file = os.path.join(tmppath, self.get_default_file_name())
else:
self.db_file = db_file
if not os.path.exists(self.db_file) or force_creation:
self.create_cache()
else:
self.check_cache()
@abstractmethod @abstractmethod
def get_cache_schema_version(self): def get_cache_schema_version(self):
""" """
...@@ -93,24 +112,6 @@ class AbstractCache(ABC): ...@@ -93,24 +112,6 @@ class AbstractCache(ABC):
""" """
pass pass
def __init__(self, db_file=None, force_creation=False):
"""
db_file: The path of the database file.
if force_creation is set to True, the file will be created
regardless of a file at the same path already exists.
"""
if db_file is None:
self.db_file = self.get_default_file_name()
else:
self.db_file = db_file
if not os.path.exists(self.db_file) or force_creation:
self.create_cache()
else:
self.check_cache()
def check_cache(self): def check_cache(self):
""" """
Check whether the cache in db file self.db_file exists and conforms Check whether the cache in db file self.db_file exists and conforms
...@@ -172,7 +173,6 @@ class AbstractCache(ABC): ...@@ -172,7 +173,6 @@ class AbstractCache(ABC):
return results return results
# TODO: A better name would be IdentifiablesCache
class IdentifiableCache(AbstractCache): class IdentifiableCache(AbstractCache):
""" """
stores identifiables (as a hash of xml) and their respective ID. stores identifiables (as a hash of xml) and their respective ID.
...@@ -185,7 +185,7 @@ class IdentifiableCache(AbstractCache): ...@@ -185,7 +185,7 @@ class IdentifiableCache(AbstractCache):
return 2 return 2
def get_default_file_name(self): def get_default_file_name(self):
return "cache.db" return "caosdb_identifiable_cache.db"
def __init__(self, db_file=None, force_creation=False): def __init__(self, db_file=None, force_creation=False):
super().__init__(db_file, force_creation) super().__init__(db_file, force_creation)
...@@ -198,8 +198,6 @@ class IdentifiableCache(AbstractCache): ...@@ -198,8 +198,6 @@ class IdentifiableCache(AbstractCache):
- identifiables is the actual cache. - identifiables is the actual cache.
- version is a table with version information about the cache. - version is a table with version information about the cache.
""" """
conn = sqlite3.connect(self.db_file)
c = conn.cursor()
self.run_sql_commands([ self.run_sql_commands([
('''CREATE TABLE identifiables (digest TEXT PRIMARY KEY, caosdb_id INTEGER, caosdb_version TEXT)''',), ('''CREATE TABLE identifiables (digest TEXT PRIMARY KEY, caosdb_id INTEGER, caosdb_version TEXT)''',),
('''CREATE TABLE version (schema INTEGER)''',), ('''CREATE TABLE version (schema INTEGER)''',),
...@@ -325,25 +323,18 @@ class IdentifiableCache(AbstractCache): ...@@ -325,25 +323,18 @@ class IdentifiableCache(AbstractCache):
class UpdateCache(AbstractCache): class UpdateCache(AbstractCache):
""" """
stores unauthorized updates stores unauthorized inserts and updates
If the Guard is set to a mode that does not allow an update, the update can If the Guard is set to a mode that does not allow an insert or update, the insert or update can
be stored in this cache such that it can be authorized and done later. be stored in this cache such that it can be authorized and performed later.
""" """
def get_cache_schema_version(self): def get_cache_schema_version(self):
return 1 return 3
def get_default_file_name(self): def get_default_file_name(self):
return "/tmp/crawler_update_cache.db" return "/tmp/crawler_update_cache.db"
def __init__(self, db_file=None, force_creation=False):
if db_file is None:
tmppath = tempfile.gettempdir()
tmpf = os.path.join(tmppath, "crawler_update_cache.db")
db_file = tmpf
super().__init__(db_file=db_file, force_creation=force_creation)
@staticmethod @staticmethod
def get_previous_version(cont): def get_previous_version(cont):
""" Retrieve the current, unchanged version of the entities that shall """ Retrieve the current, unchanged version of the entities that shall
...@@ -357,59 +348,75 @@ class UpdateCache(AbstractCache): ...@@ -357,59 +348,75 @@ class UpdateCache(AbstractCache):
return old_ones return old_ones
def insert(self, cont, run_id): def insert(self, cont, run_id, insert=False):
"""Insert a pending, unauthorized update """Insert a pending, unauthorized insert or update
Parameters Parameters
---------- ----------
cont: Container with the records to be updated containing the desired cont: Container with the records to be inserted or updated containing the desired
version, i.e. the state after the update. version, i.e. the state after the update.
run_id: int run_id: int
The id of the crawler run The id of the crawler run
insert: bool
Whether the entities in the container shall be inserted or updated.
""" """
cont = put_in_container(cont) cont = put_in_container(cont)
old_ones = UpdateCache.get_previous_version(cont)
if insert:
old_ones = ""
else:
old_ones = UpdateCache.get_previous_version(cont)
new_ones = cont new_ones = cont
old_hash = Cache.hash_entity(old_ones) if insert:
old_hash = ""
else:
old_hash = Cache.hash_entity(old_ones)
new_hash = Cache.hash_entity(new_ones) new_hash = Cache.hash_entity(new_ones)
conn = sqlite3.connect(self.db_file) self.run_sql_commands([('''INSERT INTO updates VALUES (?, ?, ?, ?, ?)''',
c = conn.cursor() (old_hash, new_hash, str(old_ones), str(new_ones),
c.execute('''INSERT INTO updates VALUES (?, ?, ?, ?, ?)''', str(run_id)))])
(old_hash, new_hash, str(old_ones), str(new_ones),
str(run_id)))
conn.commit()
conn.close()
def create_cache(self): def create_cache(self):
""" initialize the cache """ """ initialize the cache """
conn = sqlite3.connect(self.db_file) self.run_sql_commands([
c = conn.cursor() ('''CREATE TABLE updates (olddigest TEXT, newdigest TEXT, oldrep TEXT,
c.execute('''CREATE TABLE updates (olddigest text, newdigest text, newrep TEXT, run_id TEXT, primary key (olddigest, newdigest, run_id))''',),
oldrep text, newrep text, run_id text, ('''CREATE TABLE version (schema INTEGER)''',),
primary key (olddigest, newdigest, run_id))''') ("INSERT INTO version VALUES (?)", (self.get_cache_schema_version(),))])
conn.commit()
conn.close()
def get_updates(self, run_id): def get(self, run_id, querystring):
""" returns the pending updates for a given run id """ returns the pending updates for a given run id
Parameters: Parameters:
----------- -----------
run_id: the id of the crawler run run_id: the id of the crawler run
querystring: the sql query
""" """
conn = sqlite3.connect(self.db_file) return self.run_sql_commands([(querystring, (str(run_id),))], fetchall=True)
c = conn.cursor()
c.execute('''Select * FROM updates WHERE run_id=?''', def get_inserts(self, run_id):
(str(run_id),)) """ returns the pending updates for a given run id
res = c.fetchall()
conn.commit() Parameters:
conn.close() -----------
run_id: the id of the crawler run
"""
return self.get(run_id, '''Select * FROM updates WHERE olddigest='' AND run_id=?''')
def get_updates(self, run_id):
""" returns the pending updates for a given run id
Parameters:
-----------
run_id: the id of the crawler run
"""
return res return self.get(run_id, '''Select * FROM updates WHERE olddigest!='' AND run_id=?''')
class Cache(IdentifiableCache): class Cache(IdentifiableCache):
......
...@@ -209,28 +209,70 @@ class Crawler(object): ...@@ -209,28 +209,70 @@ class Crawler(object):
run_id: the id of the crawler run run_id: the id of the crawler run
""" """
cache = UpdateCache() cache = UpdateCache()
inserts = cache.get_inserts(run_id)
all_inserts = 0
all_updates = 0
for _, _, _, new, _ in inserts:
new_cont = db.Container()
new_cont = new_cont.from_xml(new)
new_cont.insert(unique=False)
logger.info("Successfully inserted {} records!".format(len(new_cont)))
all_inserts += len(new_cont)
logger.info("Finished with authorized updates.")
changes = cache.get_updates(run_id) changes = cache.get_updates(run_id)
for _, _, old, new, _ in changes: for _, _, old, new, _ in changes:
current = db.Container()
new_cont = db.Container() new_cont = db.Container()
new_cont = new_cont.from_xml(new) new_cont = new_cont.from_xml(new)
ids = []
tmp = []
update_incomplete = False
# remove duplicate entities
for el in new_cont:
if el.id not in ids:
ids.append(el.id)
tmp.append(el)
else:
update_incomplete = True
new_cont = tmp
if new[0].version:
valids = db.Container()
nonvalids = db.Container()
for ent in new_cont:
remote_ent = db.Entity(id=ent.id).retrieve()
if ent.version == remote_ent.version:
valids.append(remote_ent)
else:
update_incomplete = True
nonvalids.append(remote_ent)
valids.update(unique=False)
logger.info("Successfully updated {} records!".format(
len(valids)))
logger.info("{} Records were not updated because the version in the server "
"changed!".format(len(nonvalids)))
all_updates += len(valids)
else:
current = db.Container()
for ent in new_cont: for ent in new_cont:
current.append(db.execute_query("FIND {}".format(ent.id), current.append(db.Entity(id=ent.id).retrieve())
unique=True)) current_xml = get_pretty_xml(current)
current_xml = get_pretty_xml(current)
# check whether previous version equals current version # check whether previous version equals current version
# if not, the update must not be done # if not, the update must not be done
if current_xml != old: if current_xml != old:
continue continue
new_cont.update(unique=False) new_cont.update(unique=False)
logger.info("Successfully updated {} records!".format( logger.info("Successfully updated {} records!".format(
len(new_cont))) len(new_cont)))
all_updates += len(new_cont)
logger.info("Some updates could not be applied. Crawler has to rerun.")
logger.info("Finished with authorized updates.") logger.info("Finished with authorized updates.")
return all_inserts, all_updates
def collect_cfoods(self): def collect_cfoods(self):
""" """
......
...@@ -42,8 +42,8 @@ class CacheTest(unittest.TestCase): ...@@ -42,8 +42,8 @@ class CacheTest(unittest.TestCase):
return c return c
def setUp(self): def setUp(self):
self.cache = UpdateCache(db_file=NamedTemporaryFile(delete=False).name) self.cache = UpdateCache(db_file=NamedTemporaryFile(delete=False).name,
self.cache.create_cache() force_creation=True)
self.run_id = "235234" self.run_id = "235234"
def test_insert(self): def test_insert(self):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment