Skip to content
Snippets Groups Projects
Verified Commit 4e75f06b authored by Timm Fitschen's avatar Timm Fitschen
Browse files

WIP: fsm v0.2

parent dd81d45a
No related branches found
No related tags found
1 merge request!3F fsm
...@@ -3,9 +3,80 @@ import caosdb as db ...@@ -3,9 +3,80 @@ import caosdb as db
from caosdb import administration as admin from caosdb import administration as admin
_ORIGINAL_EXT_ENTITY_STATE = "" _ORIGINAL_EXT_ENTITY_STATE = ""
# _DELETE_ROLES = ["reviewer", "team-leader", "normal"]
# _DELETE_USERS = ["reviewer_user", "team-leader_user", "normal_user"]
_DELETE_ROLES = []
_DELETE_USERS = []
_PASSWORD = "1234asdf!P"
def switch_to_admin_user():
db.configure_connection()
def setup_users():
for role in ["reviewer", "team-leader", "normal"]:
_DELETE_ROLES.append(role)
db.administration._insert_role(name=role, description="A test role")
username = role + "_user"
_DELETE_USERS.append(username)
db.administration._insert_user(
name=username,
password=_PASSWORD,
status="ACTIVE")
db.administration._set_roles(username=username, roles=[role])
db.administration._set_permissions(
role="reviewer", permission_rules=[
db.administration.PermissionRule(
"Grant", "TRANSACTION:*"),
db.administration.PermissionRule(
"Grant", "STATE:TRANSITION:EditTransition"),
db.administration.PermissionRule(
"Grant", "STATE:TRANSITION:ReviewTransition"),
db.administration.PermissionRule(
"Grant", "STATE:TRANSITION:Transition1"),
db.administration.PermissionRule(
"Grant", "STATE:TRANSITION:Transition4"),
])
db.administration._set_permissions(
role="normal", permission_rules=[
db.administration.PermissionRule(
"Grant", "TRANSACTION:*"),
db.administration.PermissionRule(
"Grant", "STATE:TRANSITION:Transition4"),
db.administration.PermissionRule(
"Grant", "STATE:TRANSITION:EditTransition"),
])
db.administration._set_permissions(
role="team-leader", permission_rules=[
db.administration.PermissionRule(
"Grant", "TRANSACTION:*"),
db.administration.PermissionRule(
"Grant", "STATE:TRANSITION:*"),
])
def switch_to_test_user(role):
db.configure_connection(username=role + "_user",
password=_PASSWORD,
password_method="plain")
def teardown_module(): def teardown_module():
for user in _DELETE_USERS:
try:
db.administration._delete_user(name=user)
except BaseException:
pass
for role in _DELETE_ROLES:
try:
db.administration._delete_role(name=role)
except BaseException:
pass
d = db.execute_query("FIND ENTITY WITH ID > 99") d = db.execute_query("FIND ENTITY WITH ID > 99")
if len(d) > 0: if len(d) > 0:
d.delete(flags={"forceFinalState": "true"}) d.delete(flags={"forceFinalState": "true"})
...@@ -21,6 +92,8 @@ def setup_module(): ...@@ -21,6 +92,8 @@ def setup_module():
except KeyError: except KeyError:
pass pass
teardown_module() teardown_module()
setup_users()
db.RecordType("State").insert() db.RecordType("State").insert()
db.RecordType("StateModel").insert() db.RecordType("StateModel").insert()
db.RecordType("Transition").insert() db.RecordType("Transition").insert()
...@@ -29,9 +102,19 @@ def setup_module(): ...@@ -29,9 +102,19 @@ def setup_module():
db.Property(name="initial", datatype="State").insert() db.Property(name="initial", datatype="State").insert()
db.Property(name="final", datatype="State").insert() db.Property(name="final", datatype="State").insert()
db.Record("State1").add_parent("State").insert() st1 = db.Record(
db.Record("State2").add_parent("State").insert() "State1",
db.Record("State3").add_parent("State").insert() description="DescState1").add_parent("State").insert(
flags={
"ACL": None})
state_acl = db.ACL()
state_acl.grant(role="role1", permission="UPDATE:DESCRIPTION")
state_acl = db.State.create_state_acl(state_acl)
st1.acl = state_acl.combine(st1.acl)
st1.update_acl()
db.Record("State2", description="DescState2").add_parent("State").insert()
db.Record("State3", description="DescState3").add_parent("State").insert()
# 1-> # 1->
db.Record("Transition1").add_parent("Transition").add_property( db.Record("Transition1").add_parent("Transition").add_property(
"from", "State1").add_property("to", "State2").insert() "from", "State1").add_property("to", "State2").insert()
...@@ -60,6 +143,7 @@ def setup_module(): ...@@ -60,6 +143,7 @@ def setup_module():
def teardown(): def teardown():
switch_to_admin_user()
d = db.execute_query("FIND TestRT") d = db.execute_query("FIND TestRT")
if len(d) > 0: if len(d) > 0:
d.delete(flags={"forceFinalState": "true"}) d.delete(flags={"forceFinalState": "true"})
...@@ -100,6 +184,7 @@ def test_plugin_disabled(): ...@@ -100,6 +184,7 @@ def test_plugin_disabled():
def test_state_message(): def test_state_message():
state_id = db.execute_query("FIND Record State1", unique=True).id
rec = db.Record() rec = db.Record()
rec.add_parent("TestRT") rec.add_parent("TestRT")
rec.state = db.State(model="Model1", name="State1") rec.state = db.State(model="Model1", name="State1")
...@@ -108,10 +193,29 @@ def test_state_message(): ...@@ -108,10 +193,29 @@ def test_state_message():
rec_insert = rec.insert(sync=False) rec_insert = rec.insert(sync=False)
assert rec_insert.get_property("State") is None assert rec_insert.get_property("State") is None
assert rec_insert.state == rec.state assert rec_insert.state == rec.state
assert rec_insert.state.description == "DescState1"
assert rec_insert.state.id == str(state_id)
assert rec_insert.state.transitions == set(
[db.Transition(name="Transition1", from_state="State1", to_state="State2")])
rec_retrieve = db.Record.retrieve(rec_insert.id) rec_retrieve = db.Record.retrieve(rec_insert.id)
assert rec_retrieve.get_property("State") is None assert rec_retrieve.get_property("State") is None
assert rec_retrieve.state == rec.state assert rec_retrieve.state == rec.state
assert rec_retrieve.state.description == "DescState1"
assert rec_retrieve.state.id == str(state_id)
assert rec_retrieve.state.transitions == set(
[db.Transition(name="Transition1", from_state="State1", to_state="State2")])
# test sparseState flag
rec_retrieve = db.Record(
id=rec_insert.id).retrieve(
flags={
"sparseState": "true"})
assert rec_retrieve.get_property("State") is None
assert rec_retrieve.state.id == str(state_id)
assert rec_retrieve.state.name is None
assert rec_retrieve.state.description is None
assert rec_retrieve.state.transitions is None
def test_state_query(): def test_state_query():
...@@ -141,6 +245,78 @@ def test_state_transition(): ...@@ -141,6 +245,78 @@ def test_state_transition():
assert rec_retrieve.state == rec_update.state assert rec_retrieve.state == rec_update.state
def test_transition_permissions():
rec = db.Record()
rec.add_parent("TestRT")
rec.state = db.State(model="Model1", name="State1")
rec.insert()
rec_state_1 = db.Record.retrieve(rec.id)
assert rec_state_1.state == db.State(model="Model1", name="State1")
assert rec_state_1.state.transitions == set([db.Transition(name="Transition1",
from_state="State1",
to_state="State2")])
switch_to_test_user("team-leader")
rec_state_1 = db.Record.retrieve(rec.id)
assert rec_state_1.state == db.State(model="Model1", name="State1")
assert rec_state_1.state.transitions == set([db.Transition(name="Transition1",
from_state="State1",
to_state="State2")])
switch_to_test_user("reviewer")
rec_state_1 = db.Record.retrieve(rec.id)
assert rec_state_1.state == db.State(model="Model1", name="State1")
assert rec_state_1.state.transitions == set([db.Transition(name="Transition1",
from_state="State1",
to_state="State2")])
switch_to_test_user("normal")
rec_state_1 = db.Record.retrieve(rec.id)
assert rec_state_1.state == db.State(model="Model1", name="State1")
assert rec_state_1.state.transitions is None
rec.state = db.State(model="Model1", name="State2")
with pytest.raises(db.TransactionError) as exc:
rec.update(sync=False)
assert "You are not allowed to do this" in str(exc.value)
switch_to_test_user("reviewer")
rec.update(sync=False)
switch_to_test_user("team-leader")
rec_state_2 = db.Record.retrieve(rec.id)
assert rec_state_2.state == db.State(model="Model1", name="State2")
assert rec_state_2.state.transitions == set([db.Transition(name="Transition2",
from_state="State2",
to_state="State3"),
db.Transition(name="Transition4",
from_state="State2",
to_state="State2")])
switch_to_test_user("reviewer")
rec_state_2 = db.Record.retrieve(rec.id)
assert rec_state_2.state == db.State(model="Model1", name="State2")
assert rec_state_2.state.transitions == set([db.Transition(name="Transition4",
from_state="State2",
to_state="State2")])
rec.state = db.State(model="Model1", name="State3")
with pytest.raises(db.TransactionError) as exc:
rec.update(sync=False)
switch_to_test_user("team-leader")
rec.update(sync=False)
rec_state_3 = db.Record.retrieve(rec.id)
assert rec_state_3.state == db.State(model="Model1", name="State3")
assert rec_state_3.state.transitions == set([db.Transition(name="Transition3",
from_state="State3",
to_state="State1")])
switch_to_test_user("reviewer")
rec_state_3 = db.Record.retrieve(rec.id)
assert rec_state_3.state == db.State(model="Model1", name="State3")
assert rec_state_3.state.transitions is None
def test_transition_not_allowed(): def test_transition_not_allowed():
"""unallowed transitions return errors and do not update the entity""" """unallowed transitions return errors and do not update the entity"""
rec = db.Record() rec = db.Record()
...@@ -214,7 +390,6 @@ def test_multiple_states(): ...@@ -214,7 +390,6 @@ def test_multiple_states():
rec.messages = TestState() rec.messages = TestState()
with pytest.raises(db.TransactionError): with pytest.raises(db.TransactionError):
rec_insert = rec.insert(sync=False) rec_insert = rec.insert(sync=False)
print(rec_insert)
def test_broken_state_missing_model(): def test_broken_state_missing_model():
...@@ -268,7 +443,6 @@ def test_transition_with_out_state_change(): ...@@ -268,7 +443,6 @@ def test_transition_with_out_state_change():
# second update with transition to state2 # second update with transition to state2
rec_update = db.Record(id=rec_insert.id).retrieve() rec_update = db.Record(id=rec_insert.id).retrieve()
print(rec_update)
assert rec_update.state == db.State(model="Model1", name="State1") assert rec_update.state == db.State(model="Model1", name="State1")
assert rec_update.description == "old description" assert rec_update.description == "old description"
...@@ -279,7 +453,6 @@ def test_transition_with_out_state_change(): ...@@ -279,7 +453,6 @@ def test_transition_with_out_state_change():
# third update without state change # third update without state change
rec_update = db.Record(id=rec_insert.id).retrieve() rec_update = db.Record(id=rec_insert.id).retrieve()
print(rec_update)
assert rec_update.state == db.State(model="Model1", name="State2") assert rec_update.state == db.State(model="Model1", name="State2")
assert rec_update.description == "updated description 2" assert rec_update.description == "updated description 2"
...@@ -290,3 +463,227 @@ def test_transition_with_out_state_change(): ...@@ -290,3 +463,227 @@ def test_transition_with_out_state_change():
rec_final = db.Record.retrieve(rec_insert.id) rec_final = db.Record.retrieve(rec_insert.id)
assert rec_final.description == "updated description 3" assert rec_final.description == "updated description 3"
assert rec_final.state == db.State(model="Model1", name="State2") assert rec_final.state == db.State(model="Model1", name="State2")
def test_transfer_state_acl():
rec = db.Record()
rec.add_parent("TestRT")
rec.state = db.State(model="Model1", name="State1")
insert_rec = rec.insert(flags={"ACL": None})
state_acl = db.ACL()
state_acl.grant(role="role1", permission="UPDATE:DESCRIPTION")
# the acl has been transfered from the state record
assert insert_rec.acl == state_acl
def test_full_edit_review_publish_cycle():
edit_state = db.Record(
"EditState",
description="Any user can edit, only team-leader can delete.").add_parent("State").insert(
flags={
"ACL": None})
edit_acl = db.ACL()
edit_acl.grant(role="team-leader", permission="*")
edit_acl.grant(role="reviewer", permission="UPDATE:*")
edit_acl.grant(role="normal", permission="UPDATE:*")
edit_acl = db.State.create_state_acl(edit_acl)
edit_state.acl = edit_acl.combine(edit_state.acl)
edit_state.update_acl()
review_state = db.Record(
"ReviewState",
description="Only users with the 'reviewer' role can edit, only team-leader can delete.").add_parent("State").insert(
flags={
"ACL": None})
review_acl = db.ACL()
review_acl.grant(role="team-leader", permission="*")
review_acl.grant(role="reviewer", permission="UPDATE:*")
review_acl = db.State.create_state_acl(review_acl)
review_state.acl = review_acl.combine(review_state.acl)
review_state.update_acl()
published_state = db.Record(
"PublishedState",
description="Entity is read-only for everyone.").add_parent("State").insert(
flags={
"ACL": None})
published_acl = db.ACL()
published_acl = db.State.create_state_acl(published_acl)
published_state.acl = published_acl.combine(published_state.acl)
published_state.update_acl()
db.Record("EditTransition").add_parent("Transition").add_property(
"from", "EditState").add_property("to", "EditState").insert()
db.Record("StartReviewTransition").add_parent("Transition").add_property(
"from", "EditState").add_property("to", "ReviewState").insert()
db.Record("ReviewTransition").add_parent("Transition").add_property(
"from", "ReviewState").add_property("to", "ReviewState").insert()
db.Record("RejectTransition").add_parent("Transition").add_property(
"from", "ReviewState").add_property("to", "EditState").insert()
db.Record("PublishTransition").add_parent("Transition").add_property(
"from", "ReviewState").add_property("to", "PublishedState").insert()
db.Record("UnpublishTransition").add_parent("Transition").add_property(
"from", "PublishedState").add_property("to", "EditState").insert()
db.Record("EditReviewPublish").add_parent("StateModel").add_property(
"Transition",
datatype=db.LIST("Transition"),
value=[
"EditTransition",
"StartReviewTransition",
"ReviewTransition",
"RejectTransition",
"PublishTransition",
"UnpublishTransition"]).add_property(
"initial",
"EditState").add_property(
"final",
"EditState").insert()
db.Property("TestProperty", datatype=db.TEXT).insert()
def val():
s = "val"
i = 0
while True:
i += 1
yield s + str(i)
val = val()
# tests begin
rec = db.Record().add_parent("TestRT")
rec.add_property("TestProperty", "val1")
rec.state = db.State(model="EditReviewPublish", name="EditState")
rec.insert()
# as team-leader
switch_to_test_user("team-leader")
rec.get_property("TestProperty").value = next(val)
rec.update()
# as reviewer
switch_to_test_user("reviewer")
rec.get_property("TestProperty").value = next(val)
rec.update()
# as other user
switch_to_test_user("normal")
rec.get_property("TestProperty").value = next(val)
rec.update()
# start review
switch_to_test_user("team-leader")
rec.state = db.State(model="EditReviewPublish", name="ReviewState")
rec.update()
# as team-leader
switch_to_test_user("team-leader")
rec.get_property("TestProperty").value = next(val)
rec.update()
# as reviewer
switch_to_test_user("reviewer")
rec.get_property("TestProperty").value = next(val)
rec.update()
# as other user
switch_to_test_user("normal")
rec.get_property("TestProperty").value = next(val)
with pytest.raises(db.TransactionError) as exc:
rec.update(sync=False)
assert "You are not allowed to do this." in str(exc.value)
# reject
switch_to_test_user("team-leader")
rec.state = db.State(model="EditReviewPublish", name="EditState")
rec.update()
# as team-leader
rec.get_property("TestProperty").value = next(val)
rec.update()
# as reviewer
switch_to_test_user("reviewer")
rec.get_property("TestProperty").value = next(val)
rec.update()
# as other user
switch_to_test_user("normal")
rec.get_property("TestProperty").value = next(val)
rec.update()
# start review
switch_to_test_user("team-leader")
rec.state = db.State(model="EditReviewPublish", name="ReviewState")
rec.update()
# as team-leader
rec.get_property("TestProperty").value = next(val)
rec.update()
# as reviewer
switch_to_test_user("reviewer")
rec.get_property("TestProperty").value = next(val)
rec.update()
# as other user
switch_to_test_user("normal")
rec.get_property("TestProperty").value = next(val)
with pytest.raises(db.TransactionError) as exc:
rec.update(sync=False)
assert "You are not allowed to do this." in str(exc.value)
# publish
switch_to_test_user("team-leader")
rec.state = db.State(model="EditReviewPublish", name="PublishedState")
with pytest.raises(db.TransactionError) as exc:
rec.update(sync=False)
# updating the property and the state fails
assert "You are not allowed to do this." in str(exc.value)
rec = db.Record(id=rec.id).retrieve()
rec.state = db.State(model="EditReviewPublish", name="PublishedState")
rec.update(sync=False)
# as team-leader
rec.get_property("TestProperty").value = next(val)
with pytest.raises(db.TransactionError) as exc:
rec.update(sync=False)
assert "You are not allowed to do this." in str(exc.value)
# as reviewer
switch_to_test_user("reviewer")
rec.get_property("TestProperty").value = next(val)
with pytest.raises(db.TransactionError) as exc:
rec.update(sync=False)
assert "You are not allowed to do this." in str(exc.value)
# as other user
switch_to_test_user("normal")
rec.get_property("TestProperty").value = next(val)
with pytest.raises(db.TransactionError) as exc:
rec.update(sync=False)
assert "You are not allowed to do this." in str(exc.value)
# unpublish
switch_to_test_user("team-leader")
rec.state = db.State(model="EditReviewPublish", name="EditState")
rec.update()
# as team-leader
rec.get_property("TestProperty").value = next(val)
rec.update()
# as reviewer
switch_to_test_user("reviewer")
rec.get_property("TestProperty").value = next(val)
rec.update()
# as other user
switch_to_test_user("normal")
rec.get_property("TestProperty").value = next(val)
rec.update()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment