diff --git a/CHANGELOG.md b/CHANGELOG.md index c1fa88411d45d57c7fc0517f1d02f33dc497c1aa..54331f5ebb34f4b9af7c3592358a62dc5e1d5b2d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added (for new features) +* Tests for entity state [caosdb-server!62](https://gitlab.com/caosdb/caosdb-server/-/merge_requests/62) * Tests for version history * Tests for inheritance bug (caosdb/caosdb-server!54) * Tests for versioning diff --git a/Makefile b/Makefile index b2b57fc5eb95a30f104242bb26f79fb036e5c683..9689f1e3b68d0bd7f501879bcb6c854c0153a88e 100644 --- a/Makefile +++ b/Makefile @@ -49,6 +49,5 @@ help: autopep8: autopep8 -ri tests - # Meta target to call the other targets. all: autopep8 test diff --git a/tests/test_affiliation.py b/tests/test_affiliation.py index bdd73d117488ceb09210fdcdc0a92ba3b0794c3e..70a619c5662fa6afb0e4bf949afae720322ecbd2 100644 --- a/tests/test_affiliation.py +++ b/tests/test_affiliation.py @@ -65,7 +65,6 @@ def teardown(): def test_affiliation_there(): par = db.RecordType(name="TestRT1") ch = db.Record(name="TestRec").add_parent(par) - print(ch) assert_is_not_none(ch.get_parent("TestRT1")) assert_true(hasattr(ch.get_parent("TestRT1"), "affiliation")) diff --git a/tests/test_state.py b/tests/test_state.py new file mode 100644 index 0000000000000000000000000000000000000000..bbe517378403ddf97c5a898a656a20ddc9bdfa28 --- /dev/null +++ b/tests/test_state.py @@ -0,0 +1,801 @@ +import pytest +import caosdb as db +from caosdb import administration as admin + +_ORIGINAL_EXT_ENTITY_STATE = "" +_DELETE_ROLES = ["reviewer", "team-leader", "normal"] +_DELETE_USERS = ["reviewer_user", "team-leader_user", "normal_user"] +# _DELETE_ROLES = [] +# _DELETE_USERS = [] +_PASSWORD = "1234asdf!P" + + +def switch_to_admin_user(): + db.configure_connection() + + +def setup_users(): + for role in ["reviewer", "team-leader", "normal"]: + _DELETE_ROLES.append(role) + db.administration._insert_role(name=role, description="A test role") + + username = role + "_user" + _DELETE_USERS.append(username) + db.administration._insert_user( + name=username, + password=_PASSWORD, + status="ACTIVE") + db.administration._set_roles(username=username, roles=[role]) + + db.administration._set_permissions( + role="reviewer", permission_rules=[ + db.administration.PermissionRule( + "Grant", "TRANSACTION:*"), + db.administration.PermissionRule( + "Grant", "STATE:TRANSITION:EditTransition"), + db.administration.PermissionRule( + "Grant", "STATE:TRANSITION:ReviewTransition"), + db.administration.PermissionRule( + "Grant", "STATE:TRANSITION:Transition1"), + db.administration.PermissionRule( + "Grant", "STATE:TRANSITION:Transition4"), + + ]) + db.administration._set_permissions( + role="normal", permission_rules=[ + db.administration.PermissionRule( + "Grant", "TRANSACTION:*"), + db.administration.PermissionRule( + "Grant", "STATE:TRANSITION:Transition4"), + db.administration.PermissionRule( + "Grant", "STATE:TRANSITION:EditTransition"), + ]) + db.administration._set_permissions( + role="team-leader", permission_rules=[ + db.administration.PermissionRule( + "Grant", "TRANSACTION:*"), + db.administration.PermissionRule( + "Grant", "STATE:*"), + ]) + + +def switch_to_test_user(role): + db.configure_connection(username=role + "_user", + password=_PASSWORD, + password_method="plain") + + +def teardown_module(): + for user in _DELETE_USERS: + try: + db.administration._delete_user(name=user) + except BaseException: + pass + for role in _DELETE_ROLES: + try: + db.administration._delete_role(name=role) + except BaseException: + pass + + d = db.execute_query("FIND ENTITY WITH ID > 99") + if len(d) > 0: + d.delete(flags={"forceFinalState": "true"}) + admin.set_server_property("EXT_ENTITY_STATE", + _ORIGINAL_EXT_ENTITY_STATE) + + +def setup_module(): + global _ORIGINAL_EXT_ENTITY_STATE + try: + _ORIGINAL_EXT_ENTITY_STATE = admin.get_server_property( + "EXT_ENTITY_STATE") + except KeyError: + pass + teardown_module() + setup_users() + + db.RecordType("State").insert() + db.RecordType("StateModel").insert() + db.RecordType("Transition").insert() + db.Property(name="from", datatype="State").insert() + db.Property(name="to", datatype="State").insert() + db.Property(name="initial", datatype="State").insert() + db.Property(name="final", datatype="State").insert() + + st1 = db.Record( + "State1", + description="DescState1").add_parent("State").insert( + flags={ + "ACL": None}) + state_acl = db.ACL() + state_acl.grant(role="role1", permission="UPDATE:DESCRIPTION") + state_acl = db.State.create_state_acl(state_acl) + st1.acl = state_acl.combine(st1.acl) + st1.update_acl() + + db.Record("State2", description="DescState2").add_parent("State").insert() + db.Record("State3", description="DescState3").add_parent("State").insert() + # 1-> + db.Record("Transition1").add_parent("Transition").add_property( + "from", "State1").add_property("to", "State2").insert() + # 2->3 + db.Record("Transition2").add_parent("Transition").add_property( + "from", "State2").add_property("to", "State3").insert() + # 3->1 + db.Record("Transition3").add_parent("Transition").add_property( + "from", "State3").add_property("to", "State1").insert() + # 2->2 + db.Record("Transition4").add_parent("Transition").add_property( + "from", "State2").add_property("to", "State2").insert() + + db.Record("Model1").add_parent("StateModel").add_property( + "Transition", + datatype=db.LIST("Transition"), + value=[ + "Transition1", + "Transition2", + "Transition3", + "Transition4"]).add_property( + "initial", + "State1").add_property( + "final", + "State1").insert() + + +def teardown(): + switch_to_admin_user() + d = db.execute_query("FIND TestRT") + if len(d) > 0: + d.delete(flags={"forceFinalState": "true"}) + + +def setup(): + admin.set_server_property("EXT_ENTITY_STATE", "ENABLED") + teardown() + db.RecordType("TestRT").insert() + + +def test_plugin_disabled(): + """Test the behavior when the state machine extension is disabled on the server.""" + admin.set_server_property("EXT_ENTITY_STATE", "DISABLED") + + rec = db.Record() + rec.add_parent("TestRT") + rec.state = db.State(model="Model1", name="State1") + assert rec.get_property("State") is None + + rec_insert = rec.insert(sync=False) + assert rec_insert.get_property("State") is None + assert rec_insert.state == rec.state + + # however, state was not stored + rec_retrieve = db.Record.retrieve(rec_insert.id) + assert rec_retrieve.get_property("State") is None + assert rec_retrieve.state is None + + # also, properties are not interpreted as state + rec = db.Record() + rec.add_parent("TestRT") + rec.add_property("State", "State2") + rec_insert = rec.insert(sync=False) + + rec_retrieve = db.Record.retrieve(rec_insert.id) + assert rec_retrieve.get_property("State").value is not None + assert rec_retrieve.state is None + + +def test_state_message(): + """State is constant between creation, insertion, retrieval.""" + state_id = db.execute_query("FIND Record State1", unique=True).id + rec = db.Record() + rec.add_parent("TestRT") + rec.state = db.State(model="Model1", name="State1") + assert rec.get_property("State") is None + + rec_insert = rec.insert(sync=False) + assert rec_insert.get_property("State") is None + assert rec_insert.state == rec.state + assert rec_insert.state.description == "DescState1" + assert rec_insert.state.id == str(state_id) + assert rec_insert.state.transitions == set( + [db.Transition(name="Transition1", from_state="State1", to_state="State2")]) + + rec_retrieve = db.Record.retrieve(rec_insert.id) + assert rec_retrieve.get_property("State") is None + assert rec_retrieve.state == rec.state + assert rec_retrieve.state.description == "DescState1" + assert rec_retrieve.state.id == str(state_id) + assert rec_retrieve.state.transitions == set( + [db.Transition(name="Transition1", from_state="State1", to_state="State2")]) + + # test sparseState flag + rec_retrieve = db.Record( + id=rec_insert.id).retrieve( + flags={ + "sparseState": "true"}) + assert rec_retrieve.get_property("State") is None + assert rec_retrieve.state.id == str(state_id) + assert rec_retrieve.state.name is None + assert rec_retrieve.state.description is None + assert rec_retrieve.state.transitions is None + + +def test_state_query(): + rec = db.Record() + rec.add_parent("TestRT") + rec.state = db.State(model="Model1", name="State1") + rec.insert() + assert rec.get_property("State") is None + + result = db.execute_query("FIND TestRT WITH State = State1", unique=True) + assert result.id == rec.id + + assert result.get_property("State") is None + assert result.state == db.State(model="Model1", name="State1") + + +def test_state_transition(): + rec = db.Record() + rec.add_parent("TestRT") + rec.state = db.State(model="Model1", name="State1") + rec.insert() + + rec.state = db.State(model="Model1", name="State2") + rec_update = rec.update(sync=False) + + rec_retrieve = db.Record.retrieve(rec.id) + assert rec_retrieve.state == rec_update.state + + +def test_transition_permissions(): + rec = db.Record() + rec.add_parent("TestRT") + + rec.state = db.State(model="Model1", name="State1") + rec.insert() + + rec_state_1 = db.Record.retrieve(rec.id) + assert rec_state_1.state == db.State(model="Model1", name="State1") + assert rec_state_1.state.transitions == set([db.Transition(name="Transition1", + from_state="State1", + to_state="State2")]) + switch_to_test_user("team-leader") + rec_state_1 = db.Record.retrieve(rec.id) + assert rec_state_1.state == db.State(model="Model1", name="State1") + assert rec_state_1.state.transitions == set([db.Transition(name="Transition1", + from_state="State1", + to_state="State2")]) + switch_to_test_user("reviewer") + rec_state_1 = db.Record.retrieve(rec.id) + assert rec_state_1.state == db.State(model="Model1", name="State1") + assert rec_state_1.state.transitions == set([db.Transition(name="Transition1", + from_state="State1", + to_state="State2")]) + switch_to_test_user("normal") + rec_state_1 = db.Record.retrieve(rec.id) + assert rec_state_1.state == db.State(model="Model1", name="State1") + assert rec_state_1.state.transitions is None + + rec.state = db.State(model="Model1", name="State2") + with pytest.raises(db.TransactionError) as exc: + rec.update(sync=False) + assert "You are not allowed to do this" in str(exc.value) + + switch_to_test_user("reviewer") + rec.update(sync=False) + + switch_to_test_user("team-leader") + rec_state_2 = db.Record.retrieve(rec.id) + assert rec_state_2.state == db.State(model="Model1", name="State2") + assert rec_state_2.state.transitions == set([db.Transition(name="Transition2", + from_state="State2", + to_state="State3"), + db.Transition(name="Transition4", + from_state="State2", + to_state="State2")]) + + switch_to_test_user("reviewer") + rec_state_2 = db.Record.retrieve(rec.id) + assert rec_state_2.state == db.State(model="Model1", name="State2") + assert rec_state_2.state.transitions == set([db.Transition(name="Transition4", + from_state="State2", + to_state="State2")]) + + rec.state = db.State(model="Model1", name="State3") + with pytest.raises(db.TransactionError) as exc: + rec.update(sync=False) + + switch_to_test_user("team-leader") + rec.update(sync=False) + rec_state_3 = db.Record.retrieve(rec.id) + assert rec_state_3.state == db.State(model="Model1", name="State3") + assert rec_state_3.state.transitions == set([db.Transition(name="Transition3", + from_state="State3", + to_state="State1")]) + + switch_to_test_user("reviewer") + rec_state_3 = db.Record.retrieve(rec.id) + assert rec_state_3.state == db.State(model="Model1", name="State3") + assert rec_state_3.state.transitions is None + + +def test_transition_not_allowed(): + """Unallowed transitions return errors and do not update the entity.""" + rec = db.Record() + rec.add_parent("TestRT") + rec.state = db.State(model="Model1", name="State1") + rec_insert = rec.insert(sync=False) + + rec_insert.state = db.State(model="Model1", name="State3") + with pytest.raises(db.TransactionError): + rec_update = rec_insert.update( + sync=False, raise_exception_on_error=False) + assert len(rec_update.get_errors()) == 1 + assert rec_update.get_errors( + )[0].description == "Transition not allowed." + db.common.models.raise_errors(rec_update) + + rec_retrieve = db.Record.retrieve(rec_insert.id) + assert rec_retrieve.state == rec.state + + +def test_wrong_initial(): + """the first state has to be an initial state""" + rec = db.Record() + rec.add_parent("TestRT") + rec.state = db.State(model="Model1", name="State2") + with pytest.raises(db.TransactionError): + rec.insert() + + assert len(rec.get_errors()) == 1 + assert rec.get_errors()[0].description == "Initial state not allowed." + + +def test_wrong_final(): + """deletion of the entity or the state is only possible in final states""" + rec = db.Record() + rec.add_parent("TestRT") + rec.state = db.State(model="Model1", name="State1") + rec.insert() + + rec.state = db.State(model="Model1", name="State2") + rec.update() + + with pytest.raises(db.TransactionError): + rec.delete() + + assert len(rec.get_errors()) == 1 + assert rec.get_errors()[0].description == "Final state not allowed." + + rec.state = db.State(model="Model1", name="State3") + rec.update() + rec.state = db.State(model="Model1", name="State1") + rec.update() + rec.delete() + + +def test_multiple_states(): + """currently, only one state is allowed""" + rec = db.Record() + rec.add_parent("TestRT") + + state1 = db.State(model="Model1", name="State1") + state2 = db.State(model="Model1", name="State2") + + class TestState: + def to_xml(self, xml): + xml.append(state1.to_xml()) + xml.append(state2.to_xml()) + + def clear_server_messages(self): + pass + rec.messages = TestState() + with pytest.raises(db.TransactionError): + rec_insert = rec.insert(sync=False) + + +def test_broken_state_missing_model(): + rec = db.Record() + rec.add_parent("TestRT") + rec.state = db.State(name="State1", model=None) + with pytest.raises(db.TransactionError): + rec.insert() + assert len(rec.get_errors()) == 1 + assert rec.get_errors()[0].description == "State model not specified." + + +def test_broken_state_missing_state_name(): + rec = db.Record() + rec.add_parent("TestRT") + rec.state = db.State(model="Model1", name=None) + with pytest.raises(db.TransactionError): + rec.insert() + assert len(rec.get_errors()) == 1 + assert rec.get_errors()[0].description == "State not specified." + + +def test_state_not_in_state_model(): + rec = db.Record() + rec.add_parent("TestRT") + rec.state = db.State(model="Model1", name="UnknownState") + with pytest.raises(db.TransactionError): + rec.insert() + assert len(rec.get_errors()) == 1 + assert rec.get_errors()[ + 0].description == "State does not exist in this StateModel." + + +def test_transition_without_state_change(): + rec = db.Record() + rec.description = "old description" + rec.add_parent("TestRT") + rec.state = db.State(model="Model1", name="State1") + rec_insert = rec.insert(sync=False) + + # first update attempt (should fail, because 1->1 not allowed) + rec_insert.description = "updated description 1" + with pytest.raises(db.TransactionError): + # transition 1 -> 1 not allowed + rec_update = rec_insert.update( + sync=False, raise_exception_on_error=False) + assert len(rec_update.get_errors()) == 1 + assert rec_update.get_errors( + )[0].description == "Transition not allowed." + db.common.models.raise_errors(rec_update) + + # second update with transition to state2 + rec_update = db.Record(id=rec_insert.id).retrieve() + assert rec_update.state == db.State(model="Model1", name="State1") + assert rec_update.description == "old description" + + rec_update.description = "updated description 2" + rec_update.state = db.State(name="State2", model="Model1") + # transition 1 -> 2 is allowed. + rec_update.update() + + # third update without state change + rec_update = db.Record(id=rec_insert.id).retrieve() + assert rec_update.state == db.State(model="Model1", name="State2") + assert rec_update.description == "updated description 2" + + rec_update.description = "updated description 3" + # transition 2 -> 2 is also allowed. + rec_update.update() + + rec_final = db.Record.retrieve(rec_insert.id) + assert rec_final.description == "updated description 3" + assert rec_final.state == db.State(model="Model1", name="State2") + + +def test_transfer_state_acl(): + rec = db.Record() + rec.add_parent("TestRT") + rec.state = db.State(model="Model1", name="State1") + insert_rec = rec.insert(flags={"ACL": None}) + + state_acl = db.ACL().combine(db.get_global_acl()) + state_acl.grant(role="role1", permission="UPDATE:DESCRIPTION") + + # the acl has been transfered from the state record + assert insert_rec.acl == state_acl + + +def test_full_edit_review_publish_cycle(): + edit_state = db.Record( + "EditState", + description="Any user can edit, only team-leader can delete.").add_parent("State").insert( + flags={ + "ACL": None}) + + edit_acl = db.ACL() + edit_acl.grant(role="team-leader", permission="*") + edit_acl.grant(role="reviewer", permission="UPDATE:*") + edit_acl.grant(role="normal", permission="UPDATE:*") + edit_acl = db.State.create_state_acl(edit_acl) + edit_state.acl = edit_acl.combine(edit_state.acl) + edit_state.update_acl() + + review_state = db.Record( + "ReviewState", + description="Only users with the 'reviewer' role can edit, only team-leader can delete.").add_parent("State").insert( + flags={ + "ACL": None}) + + review_acl = db.ACL() + review_acl.grant(role="team-leader", permission="*") + review_acl.grant(role="reviewer", permission="UPDATE:*") + review_acl = db.State.create_state_acl(review_acl) + review_state.acl = review_acl.combine(review_state.acl) + review_state.update_acl() + + published_state = db.Record( + "PublishedState", + description="Entity is read-only for everyone.").add_parent("State").insert( + flags={ + "ACL": None}) + + published_acl = db.ACL() + published_acl = db.State.create_state_acl(published_acl) + published_state.acl = published_acl.combine(published_state.acl) + published_state.update_acl() + + db.Record("EditTransition").add_parent("Transition").add_property( + "from", "EditState").add_property("to", "EditState").insert() + db.Record("StartReviewTransition").add_parent("Transition").add_property( + "from", "EditState").add_property("to", "ReviewState").insert() + db.Record("ReviewTransition").add_parent("Transition").add_property( + "from", "ReviewState").add_property("to", "ReviewState").insert() + db.Record("RejectTransition").add_parent("Transition").add_property( + "from", "ReviewState").add_property("to", "EditState").insert() + db.Record("PublishTransition").add_parent("Transition").add_property( + "from", "ReviewState").add_property("to", "PublishedState").insert() + db.Record("UnpublishTransition").add_parent("Transition").add_property( + "from", "PublishedState").add_property("to", "EditState").insert() + + db.Record("EditReviewPublish").add_parent("StateModel").add_property( + "Transition", + datatype=db.LIST("Transition"), + value=[ + "EditTransition", + "StartReviewTransition", + "ReviewTransition", + "RejectTransition", + "PublishTransition", + "UnpublishTransition"]).add_property( + "initial", + "EditState").add_property( + "final", + "EditState").insert() + db.Property("TestProperty", datatype=db.TEXT).insert() + + def val(): + s = "val" + i = 0 + while True: + i += 1 + yield s + str(i) + val = val() + # tests begin + + rec = db.Record().add_parent("TestRT") + rec.add_property("TestProperty", "val1") + rec.state = db.State(model="EditReviewPublish", name="EditState") + rec.insert() + + # as team-leader + switch_to_test_user("team-leader") + rec.get_property("TestProperty").value = next(val) + rec.update() + + # as reviewer + switch_to_test_user("reviewer") + rec.get_property("TestProperty").value = next(val) + rec.update() + + # as other user + switch_to_test_user("normal") + rec.get_property("TestProperty").value = next(val) + rec.update() + + # start review + switch_to_test_user("team-leader") + rec.state = db.State(model="EditReviewPublish", name="ReviewState") + rec.update() + + # as team-leader + switch_to_test_user("team-leader") + rec.get_property("TestProperty").value = next(val) + rec.update() + + # as reviewer + switch_to_test_user("reviewer") + rec.get_property("TestProperty").value = next(val) + rec.update() + + # as other user + switch_to_test_user("normal") + rec.get_property("TestProperty").value = next(val) + with pytest.raises(db.TransactionError) as exc: + rec.update(sync=False) + assert "You are not allowed to do this." in str(exc.value) + + # reject + switch_to_test_user("team-leader") + rec.state = db.State(model="EditReviewPublish", name="EditState") + rec.update() + + # as team-leader + rec.get_property("TestProperty").value = next(val) + rec.update() + + # as reviewer + switch_to_test_user("reviewer") + rec.get_property("TestProperty").value = next(val) + rec.update() + + # as other user + switch_to_test_user("normal") + rec.get_property("TestProperty").value = next(val) + rec.update() + + # start review + switch_to_test_user("team-leader") + rec.state = db.State(model="EditReviewPublish", name="ReviewState") + rec.update() + + # as team-leader + rec.get_property("TestProperty").value = next(val) + rec.update() + + # as reviewer + switch_to_test_user("reviewer") + rec.get_property("TestProperty").value = next(val) + rec.update() + + # as other user + switch_to_test_user("normal") + rec.get_property("TestProperty").value = next(val) + with pytest.raises(db.TransactionError) as exc: + rec.update(sync=False) + assert "You are not allowed to do this." in str(exc.value) + + # publish + switch_to_test_user("team-leader") + rec.state = db.State(model="EditReviewPublish", name="PublishedState") + with pytest.raises(db.TransactionError) as exc: + rec.update(sync=False) + # updating the property and the state fails + assert "You are not allowed to do this." in str(exc.value) + + rec = db.Record(id=rec.id).retrieve() + rec.state = db.State(model="EditReviewPublish", name="PublishedState") + rec.update(sync=False) + + # as team-leader + rec.get_property("TestProperty").value = next(val) + with pytest.raises(db.TransactionError) as exc: + rec.update(sync=False) + assert "You are not allowed to do this." in str(exc.value) + + # as reviewer + switch_to_test_user("reviewer") + rec.get_property("TestProperty").value = next(val) + with pytest.raises(db.TransactionError) as exc: + rec.update(sync=False) + assert "You are not allowed to do this." in str(exc.value) + + # as other user + switch_to_test_user("normal") + rec.get_property("TestProperty").value = next(val) + with pytest.raises(db.TransactionError) as exc: + rec.update(sync=False) + assert "You are not allowed to do this." in str(exc.value) + + # unpublish + switch_to_test_user("team-leader") + rec.state = db.State(model="EditReviewPublish", name="EditState") + rec.update() + + # as team-leader + rec.get_property("TestProperty").value = next(val) + rec.update() + + # as reviewer + switch_to_test_user("reviewer") + rec.get_property("TestProperty").value = next(val) + rec.update() + + # as other user + switch_to_test_user("normal") + rec.get_property("TestProperty").value = next(val) + rec.update() + + +def test_automatic_record_state(): + rt = db.RecordType("StateFullRT").add_parent("TestRT") + rt.state = db.State(model="Model1", name="State1") + rt.insert() + + rec = db.Record("TestRec").add_parent("StateFullRT") + rec.insert() + + assert rec.state == db.State(model="Model1", name="State1") + + rec_retrieve = db.Record(id=rec.id).retrieve() + assert rec_retrieve.state == db.State(model="Model1", name="State1") + + +def test_unauthorized_final(): + rec = db.Record().add_parent("TestRT") + rec.state = db.State(model="Model1", name="State1") + rec.insert() + + switch_to_test_user("normal") + rec.state = None + with pytest.raises(db.TransactionError) as exc: + rec.update(sync=False) + assert "You are not allowed to do this." in str(exc.value) + + rec_retrieve = db.Record(id=rec.id).retrieve() + assert rec_retrieve.state == db.State(model="Model1", name="State1") + + switch_to_test_user("team-leader") + rec.update() + + assert rec.state is None + + rec_retrieve = db.Record(id=rec.id).retrieve() + assert rec_retrieve.state is None + + +def test_unauthorized_initial(): + rec = db.Record().add_parent("TestRT") + rec.insert() + + switch_to_test_user("normal") + rec.state = db.State(model="Model1", name="State1") + with pytest.raises(db.TransactionError) as exc: + # normal user lacks the permission for the initial state + rec.update(sync=False) + assert "You are not allowed to do this." in str(exc.value) + rec_retrieve = db.Record(id=rec.id).retrieve() + assert rec_retrieve.state is None + + switch_to_test_user("team-leader") + with pytest.raises(db.TransactionError) as exc: + # it is not allowed to "steal" the entity with the state feature + rec.update(sync=False) + assert "You are not allowed to do this." in str(exc.value) + rec_retrieve = db.Record(id=rec.id).retrieve() + assert rec_retrieve.state is None + + # we need to give ownership to "team-leader" + switch_to_admin_user() + rec_update = db.Record(id=rec.id).retrieve(flags={"ACL": None}) + rec_update.acl.grant(role="team-leader", permission="EDIT:ACL") + rec_update.acl.grant(role="team-leader", permission="RETRIEVE:ACL") + rec_update.update_acl() + + switch_to_test_user("team-leader") + rec.update(sync=False) + assert rec.state == db.State(model="Model1", name="State1") + + rec_retrieve = db.Record(id=rec.id).retrieve() + assert rec_retrieve.state == db.State(model="Model1", name="State1") + + +@pytest.mark.xfail( + reason="This is a very special corner case bug with low severity") +def test_transitions_included_after_empty_update(): + rec = db.Record() + rec.description = "old description" + rec.add_parent("TestRT") + rec.state = db.State(model="Model1", name="State1") + rec_insert = rec.insert(sync=False) + + assert rec_insert.state.transitions is not None + assert rec_insert.state.transitions == {db.Transition(name="Transition1", + from_state="State1", + to_state="State2")} + + rec_insert.description = "new_description" + rec_insert.state = db.State(model="Model1", name="State2") + rec_update = rec_insert.update(sync=False) + assert rec_update.state.transitions is not None + assert rec_update.state.transitions == {db.Transition(name="Transition2", + from_state="State2", + to_state="State3"), + db.Transition(name="Transition4", + from_state="State2", + to_state="State2")} + + rec_update_2 = rec_update.update(sync=False) + + # this fails + assert rec_update_2.state.transitions is not None + assert rec_update_2.state.transitions == {db.Transition(name="Transition1", + from_state="State1", + to_state="State2"), + db.Transition(name="Transition4", + from_state="State2", + to_state="State2")}