From 4e75f06bd4cd5eda4d9c365cc4fc933fc5378b8f Mon Sep 17 00:00:00 2001 From: Timm Fitschen <t.fitschen@indiscale.com> Date: Thu, 19 Nov 2020 19:29:30 +0100 Subject: [PATCH] WIP: fsm v0.2 --- tests/test_state.py | 409 +++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 403 insertions(+), 6 deletions(-) diff --git a/tests/test_state.py b/tests/test_state.py index 36b786b..07cdc03 100644 --- a/tests/test_state.py +++ b/tests/test_state.py @@ -3,9 +3,80 @@ import caosdb as db from caosdb import administration as admin _ORIGINAL_EXT_ENTITY_STATE = "" +# _DELETE_ROLES = ["reviewer", "team-leader", "normal"] +# _DELETE_USERS = ["reviewer_user", "team-leader_user", "normal_user"] +_DELETE_ROLES = [] +_DELETE_USERS = [] +_PASSWORD = "1234asdf!P" + + +def switch_to_admin_user(): + db.configure_connection() + + +def setup_users(): + for role in ["reviewer", "team-leader", "normal"]: + _DELETE_ROLES.append(role) + db.administration._insert_role(name=role, description="A test role") + + username = role + "_user" + _DELETE_USERS.append(username) + db.administration._insert_user( + name=username, + password=_PASSWORD, + status="ACTIVE") + db.administration._set_roles(username=username, roles=[role]) + + db.administration._set_permissions( + role="reviewer", permission_rules=[ + db.administration.PermissionRule( + "Grant", "TRANSACTION:*"), + db.administration.PermissionRule( + "Grant", "STATE:TRANSITION:EditTransition"), + db.administration.PermissionRule( + "Grant", "STATE:TRANSITION:ReviewTransition"), + db.administration.PermissionRule( + "Grant", "STATE:TRANSITION:Transition1"), + db.administration.PermissionRule( + "Grant", "STATE:TRANSITION:Transition4"), + + ]) + db.administration._set_permissions( + role="normal", permission_rules=[ + db.administration.PermissionRule( + "Grant", "TRANSACTION:*"), + db.administration.PermissionRule( + "Grant", "STATE:TRANSITION:Transition4"), + db.administration.PermissionRule( + "Grant", "STATE:TRANSITION:EditTransition"), + ]) + db.administration._set_permissions( + role="team-leader", permission_rules=[ + db.administration.PermissionRule( + "Grant", "TRANSACTION:*"), + db.administration.PermissionRule( + "Grant", "STATE:TRANSITION:*"), + ]) + + +def switch_to_test_user(role): + db.configure_connection(username=role + "_user", + password=_PASSWORD, + password_method="plain") def teardown_module(): + for user in _DELETE_USERS: + try: + db.administration._delete_user(name=user) + except BaseException: + pass + for role in _DELETE_ROLES: + try: + db.administration._delete_role(name=role) + except BaseException: + pass + d = db.execute_query("FIND ENTITY WITH ID > 99") if len(d) > 0: d.delete(flags={"forceFinalState": "true"}) @@ -21,6 +92,8 @@ def setup_module(): except KeyError: pass teardown_module() + setup_users() + db.RecordType("State").insert() db.RecordType("StateModel").insert() db.RecordType("Transition").insert() @@ -29,9 +102,19 @@ def setup_module(): db.Property(name="initial", datatype="State").insert() db.Property(name="final", datatype="State").insert() - db.Record("State1").add_parent("State").insert() - db.Record("State2").add_parent("State").insert() - db.Record("State3").add_parent("State").insert() + st1 = db.Record( + "State1", + description="DescState1").add_parent("State").insert( + flags={ + "ACL": None}) + state_acl = db.ACL() + state_acl.grant(role="role1", permission="UPDATE:DESCRIPTION") + state_acl = db.State.create_state_acl(state_acl) + st1.acl = state_acl.combine(st1.acl) + st1.update_acl() + + db.Record("State2", description="DescState2").add_parent("State").insert() + db.Record("State3", description="DescState3").add_parent("State").insert() # 1-> db.Record("Transition1").add_parent("Transition").add_property( "from", "State1").add_property("to", "State2").insert() @@ -60,6 +143,7 @@ def setup_module(): def teardown(): + switch_to_admin_user() d = db.execute_query("FIND TestRT") if len(d) > 0: d.delete(flags={"forceFinalState": "true"}) @@ -100,6 +184,7 @@ def test_plugin_disabled(): def test_state_message(): + state_id = db.execute_query("FIND Record State1", unique=True).id rec = db.Record() rec.add_parent("TestRT") rec.state = db.State(model="Model1", name="State1") @@ -108,10 +193,29 @@ def test_state_message(): rec_insert = rec.insert(sync=False) assert rec_insert.get_property("State") is None assert rec_insert.state == rec.state + assert rec_insert.state.description == "DescState1" + assert rec_insert.state.id == str(state_id) + assert rec_insert.state.transitions == set( + [db.Transition(name="Transition1", from_state="State1", to_state="State2")]) rec_retrieve = db.Record.retrieve(rec_insert.id) assert rec_retrieve.get_property("State") is None assert rec_retrieve.state == rec.state + assert rec_retrieve.state.description == "DescState1" + assert rec_retrieve.state.id == str(state_id) + assert rec_retrieve.state.transitions == set( + [db.Transition(name="Transition1", from_state="State1", to_state="State2")]) + + # test sparseState flag + rec_retrieve = db.Record( + id=rec_insert.id).retrieve( + flags={ + "sparseState": "true"}) + assert rec_retrieve.get_property("State") is None + assert rec_retrieve.state.id == str(state_id) + assert rec_retrieve.state.name is None + assert rec_retrieve.state.description is None + assert rec_retrieve.state.transitions is None def test_state_query(): @@ -141,6 +245,78 @@ def test_state_transition(): assert rec_retrieve.state == rec_update.state +def test_transition_permissions(): + rec = db.Record() + rec.add_parent("TestRT") + + rec.state = db.State(model="Model1", name="State1") + rec.insert() + + rec_state_1 = db.Record.retrieve(rec.id) + assert rec_state_1.state == db.State(model="Model1", name="State1") + assert rec_state_1.state.transitions == set([db.Transition(name="Transition1", + from_state="State1", + to_state="State2")]) + switch_to_test_user("team-leader") + rec_state_1 = db.Record.retrieve(rec.id) + assert rec_state_1.state == db.State(model="Model1", name="State1") + assert rec_state_1.state.transitions == set([db.Transition(name="Transition1", + from_state="State1", + to_state="State2")]) + switch_to_test_user("reviewer") + rec_state_1 = db.Record.retrieve(rec.id) + assert rec_state_1.state == db.State(model="Model1", name="State1") + assert rec_state_1.state.transitions == set([db.Transition(name="Transition1", + from_state="State1", + to_state="State2")]) + switch_to_test_user("normal") + rec_state_1 = db.Record.retrieve(rec.id) + assert rec_state_1.state == db.State(model="Model1", name="State1") + assert rec_state_1.state.transitions is None + + rec.state = db.State(model="Model1", name="State2") + with pytest.raises(db.TransactionError) as exc: + rec.update(sync=False) + assert "You are not allowed to do this" in str(exc.value) + + switch_to_test_user("reviewer") + rec.update(sync=False) + + switch_to_test_user("team-leader") + rec_state_2 = db.Record.retrieve(rec.id) + assert rec_state_2.state == db.State(model="Model1", name="State2") + assert rec_state_2.state.transitions == set([db.Transition(name="Transition2", + from_state="State2", + to_state="State3"), + db.Transition(name="Transition4", + from_state="State2", + to_state="State2")]) + + switch_to_test_user("reviewer") + rec_state_2 = db.Record.retrieve(rec.id) + assert rec_state_2.state == db.State(model="Model1", name="State2") + assert rec_state_2.state.transitions == set([db.Transition(name="Transition4", + from_state="State2", + to_state="State2")]) + + rec.state = db.State(model="Model1", name="State3") + with pytest.raises(db.TransactionError) as exc: + rec.update(sync=False) + + switch_to_test_user("team-leader") + rec.update(sync=False) + rec_state_3 = db.Record.retrieve(rec.id) + assert rec_state_3.state == db.State(model="Model1", name="State3") + assert rec_state_3.state.transitions == set([db.Transition(name="Transition3", + from_state="State3", + to_state="State1")]) + + switch_to_test_user("reviewer") + rec_state_3 = db.Record.retrieve(rec.id) + assert rec_state_3.state == db.State(model="Model1", name="State3") + assert rec_state_3.state.transitions is None + + def test_transition_not_allowed(): """unallowed transitions return errors and do not update the entity""" rec = db.Record() @@ -214,7 +390,6 @@ def test_multiple_states(): rec.messages = TestState() with pytest.raises(db.TransactionError): rec_insert = rec.insert(sync=False) - print(rec_insert) def test_broken_state_missing_model(): @@ -268,7 +443,6 @@ def test_transition_with_out_state_change(): # second update with transition to state2 rec_update = db.Record(id=rec_insert.id).retrieve() - print(rec_update) assert rec_update.state == db.State(model="Model1", name="State1") assert rec_update.description == "old description" @@ -279,7 +453,6 @@ def test_transition_with_out_state_change(): # third update without state change rec_update = db.Record(id=rec_insert.id).retrieve() - print(rec_update) assert rec_update.state == db.State(model="Model1", name="State2") assert rec_update.description == "updated description 2" @@ -290,3 +463,227 @@ def test_transition_with_out_state_change(): rec_final = db.Record.retrieve(rec_insert.id) assert rec_final.description == "updated description 3" assert rec_final.state == db.State(model="Model1", name="State2") + + +def test_transfer_state_acl(): + rec = db.Record() + rec.add_parent("TestRT") + rec.state = db.State(model="Model1", name="State1") + insert_rec = rec.insert(flags={"ACL": None}) + + state_acl = db.ACL() + state_acl.grant(role="role1", permission="UPDATE:DESCRIPTION") + + # the acl has been transfered from the state record + assert insert_rec.acl == state_acl + + +def test_full_edit_review_publish_cycle(): + edit_state = db.Record( + "EditState", + description="Any user can edit, only team-leader can delete.").add_parent("State").insert( + flags={ + "ACL": None}) + + edit_acl = db.ACL() + edit_acl.grant(role="team-leader", permission="*") + edit_acl.grant(role="reviewer", permission="UPDATE:*") + edit_acl.grant(role="normal", permission="UPDATE:*") + edit_acl = db.State.create_state_acl(edit_acl) + edit_state.acl = edit_acl.combine(edit_state.acl) + edit_state.update_acl() + + review_state = db.Record( + "ReviewState", + description="Only users with the 'reviewer' role can edit, only team-leader can delete.").add_parent("State").insert( + flags={ + "ACL": None}) + + review_acl = db.ACL() + review_acl.grant(role="team-leader", permission="*") + review_acl.grant(role="reviewer", permission="UPDATE:*") + review_acl = db.State.create_state_acl(review_acl) + review_state.acl = review_acl.combine(review_state.acl) + review_state.update_acl() + + published_state = db.Record( + "PublishedState", + description="Entity is read-only for everyone.").add_parent("State").insert( + flags={ + "ACL": None}) + + published_acl = db.ACL() + published_acl = db.State.create_state_acl(published_acl) + published_state.acl = published_acl.combine(published_state.acl) + published_state.update_acl() + + db.Record("EditTransition").add_parent("Transition").add_property( + "from", "EditState").add_property("to", "EditState").insert() + db.Record("StartReviewTransition").add_parent("Transition").add_property( + "from", "EditState").add_property("to", "ReviewState").insert() + db.Record("ReviewTransition").add_parent("Transition").add_property( + "from", "ReviewState").add_property("to", "ReviewState").insert() + db.Record("RejectTransition").add_parent("Transition").add_property( + "from", "ReviewState").add_property("to", "EditState").insert() + db.Record("PublishTransition").add_parent("Transition").add_property( + "from", "ReviewState").add_property("to", "PublishedState").insert() + db.Record("UnpublishTransition").add_parent("Transition").add_property( + "from", "PublishedState").add_property("to", "EditState").insert() + + db.Record("EditReviewPublish").add_parent("StateModel").add_property( + "Transition", + datatype=db.LIST("Transition"), + value=[ + "EditTransition", + "StartReviewTransition", + "ReviewTransition", + "RejectTransition", + "PublishTransition", + "UnpublishTransition"]).add_property( + "initial", + "EditState").add_property( + "final", + "EditState").insert() + db.Property("TestProperty", datatype=db.TEXT).insert() + + def val(): + s = "val" + i = 0 + while True: + i += 1 + yield s + str(i) + val = val() + # tests begin + + rec = db.Record().add_parent("TestRT") + rec.add_property("TestProperty", "val1") + rec.state = db.State(model="EditReviewPublish", name="EditState") + rec.insert() + + # as team-leader + switch_to_test_user("team-leader") + rec.get_property("TestProperty").value = next(val) + rec.update() + + # as reviewer + switch_to_test_user("reviewer") + rec.get_property("TestProperty").value = next(val) + rec.update() + + # as other user + switch_to_test_user("normal") + rec.get_property("TestProperty").value = next(val) + rec.update() + + # start review + switch_to_test_user("team-leader") + rec.state = db.State(model="EditReviewPublish", name="ReviewState") + rec.update() + + # as team-leader + switch_to_test_user("team-leader") + rec.get_property("TestProperty").value = next(val) + rec.update() + + # as reviewer + switch_to_test_user("reviewer") + rec.get_property("TestProperty").value = next(val) + rec.update() + + # as other user + switch_to_test_user("normal") + rec.get_property("TestProperty").value = next(val) + with pytest.raises(db.TransactionError) as exc: + rec.update(sync=False) + assert "You are not allowed to do this." in str(exc.value) + + # reject + switch_to_test_user("team-leader") + rec.state = db.State(model="EditReviewPublish", name="EditState") + rec.update() + + # as team-leader + rec.get_property("TestProperty").value = next(val) + rec.update() + + # as reviewer + switch_to_test_user("reviewer") + rec.get_property("TestProperty").value = next(val) + rec.update() + + # as other user + switch_to_test_user("normal") + rec.get_property("TestProperty").value = next(val) + rec.update() + + # start review + switch_to_test_user("team-leader") + rec.state = db.State(model="EditReviewPublish", name="ReviewState") + rec.update() + + # as team-leader + rec.get_property("TestProperty").value = next(val) + rec.update() + + # as reviewer + switch_to_test_user("reviewer") + rec.get_property("TestProperty").value = next(val) + rec.update() + + # as other user + switch_to_test_user("normal") + rec.get_property("TestProperty").value = next(val) + with pytest.raises(db.TransactionError) as exc: + rec.update(sync=False) + assert "You are not allowed to do this." in str(exc.value) + + # publish + switch_to_test_user("team-leader") + rec.state = db.State(model="EditReviewPublish", name="PublishedState") + with pytest.raises(db.TransactionError) as exc: + rec.update(sync=False) + # updating the property and the state fails + assert "You are not allowed to do this." in str(exc.value) + + rec = db.Record(id=rec.id).retrieve() + rec.state = db.State(model="EditReviewPublish", name="PublishedState") + rec.update(sync=False) + + # as team-leader + rec.get_property("TestProperty").value = next(val) + with pytest.raises(db.TransactionError) as exc: + rec.update(sync=False) + assert "You are not allowed to do this." in str(exc.value) + + # as reviewer + switch_to_test_user("reviewer") + rec.get_property("TestProperty").value = next(val) + with pytest.raises(db.TransactionError) as exc: + rec.update(sync=False) + assert "You are not allowed to do this." in str(exc.value) + + # as other user + switch_to_test_user("normal") + rec.get_property("TestProperty").value = next(val) + with pytest.raises(db.TransactionError) as exc: + rec.update(sync=False) + assert "You are not allowed to do this." in str(exc.value) + + # unpublish + switch_to_test_user("team-leader") + rec.state = db.State(model="EditReviewPublish", name="EditState") + rec.update() + + # as team-leader + rec.get_property("TestProperty").value = next(val) + rec.update() + + # as reviewer + switch_to_test_user("reviewer") + rec.get_property("TestProperty").value = next(val) + rec.update() + + # as other user + switch_to_test_user("normal") + rec.get_property("TestProperty").value = next(val) + rec.update() -- GitLab