diff --git a/tests/test_state.py b/tests/test_state.py new file mode 100644 index 0000000000000000000000000000000000000000..6a7f707f3e2d4cb46ac4cd313ceeaf35161f05af --- /dev/null +++ b/tests/test_state.py @@ -0,0 +1,162 @@ +import pytest +import caosdb as db + +def teardown_module(): + d = db.execute_query("FIND ENTITY WITH ID > 99") + if len(d) > 0: + d.delete(flags={"forceFinalState": "true"}) + +def setup_module(): + teardown_module() + db.RecordType("State").insert() + db.RecordType("StateModel").insert() + db.RecordType("Transition").insert() + db.Property(name="from", datatype="State").insert() + db.Property(name="to", datatype="State").insert() + db.Property(name="initial", datatype="State").insert() + db.Property(name="final", datatype="State").insert() + + db.Record("State1").add_parent("State").insert() + db.Record("State2").add_parent("State").insert() + db.Record("State3").add_parent("State").insert() + db.Record("Transition1").add_parent("Transition").add_property("from", "State1").add_property("to", "State2").insert() + db.Record("Transition2").add_parent("Transition").add_property("from", "State2").add_property("to", "State3").insert() + db.Record("Transition3").add_parent("Transition").add_property("from", "State3").add_property("to", "State1").insert() + + db.Record("Model1").add_parent("StateModel").add_property("State", datatype=db.LIST("State"), value=["State1", "State2", "State3"]).add_property("Transition", datatype=db.LIST("Transition"), value=["Transition1", "Transition2", "Transition3"]).add_property("initial", "State1").add_property("final", "State1").insert() + +def teardown(): + d = db.execute_query("FIND TestRT") + if len(d) > 0: + d.delete(flags={"forceFinalState": "true"}) + +def setup(): + teardown() + db.RecordType("TestRT").insert() + +def test_state_message(): + rec = db.Record() + rec.add_parent("TestRT") + rec.state = db.State(model="Model1", name="State1") + assert rec.get_property("State") is None + + rec_insert = rec.insert(sync=False) + assert rec_insert.get_property("State") is None + assert rec_insert.state == rec.state + + rec_retrieve = db.Record.retrieve(rec_insert.id) + assert rec_retrieve.get_property("State") is None + assert rec_retrieve.state == rec.state + +def test_state_query(): + rec = db.Record() + rec.add_parent("TestRT") + rec.state = db.State(model="Model1", name="State1") + rec.insert() + assert rec.get_property("State") is None + + assert db.execute_query("FIND TestRT WITH State = State1", unique=True).id == rec.id + +def test_state_transition(): + rec = db.Record() + rec.add_parent("TestRT") + rec.state = db.State(model="Model1", name="State1") + rec.insert() + + rec.state = db.State(model="Model1", name="State2") + rec_update = rec.update(sync=False) + + rec_retrieve = db.Record.retrieve(rec.id) + assert rec_retrieve.state == rec_update.state + +def test_transition_not_allowed(): + """unallowed transitions return errors and do not update the entity""" + rec = db.Record() + rec.add_parent("TestRT") + rec.state = db.State(model="Model1", name="State1") + rec_insert = rec.insert(sync=False) + + rec_insert.state = db.State(model="Model1", name="State3") + with pytest.raises(db.TransactionError): + rec_update = rec_insert.update(sync=False) + + rec_retrieve = db.Record.retrieve(rec_insert.id) + assert rec_retrieve.state == rec.state + +def test_wrong_initial(): + """the first state has to be an initial state""" + rec = db.Record() + rec.add_parent("TestRT") + rec.state = db.State(model="Model1", name="State2") + with pytest.raises(db.TransactionError): + rec.insert() + + assert len(rec.get_errors()) == 1 + assert rec.get_errors()[0].description == "Initial state not allowed." + +def test_wrong_final(): + """deletion of the entity or the state is only possible in final states""" + rec = db.Record() + rec.add_parent("TestRT") + rec.state = db.State(model="Model1", name="State1") + rec.insert() + + rec.state = db.State(model="Model1", name="State2") + rec.update() + + with pytest.raises(db.TransactionError): + rec.delete() + + assert len(rec.get_errors()) == 1 + assert rec.get_errors()[0].description == "Final state not allowed." + + rec.state = db.State(model="Model1", name="State3") + rec.update() + rec.state = db.State(model="Model1", name="State1") + rec.update() + rec.delete() + +def test_multiple_states(): + """currently, only one state is allowed""" + rec = db.Record() + rec.add_parent("TestRT") + + state1 = db.State(model="Model1", name="State1") + state2 = db.State(model="Model1", name="State2") + class TestState: + def to_xml(self, xml): + xml.append(state1.to_xml()) + xml.append(state2.to_xml()) + def clear_server_messages(self): + pass + rec.messages = TestState() + with pytest.raises(db.TransactionError): + rec_insert = rec.insert(sync=False) + print(rec_insert) + +def test_broken_state_missing_model(): + rec = db.Record() + rec.add_parent("TestRT") + rec.state = db.State(name="State1", model=None) + with pytest.raises(db.TransactionError): + rec.insert() + assert len(rec.get_errors()) == 1 + assert rec.get_errors()[0].description == "State model not specified." + +def test_broken_state_missing_state_name(): + rec = db.Record() + rec.add_parent("TestRT") + rec.state = db.State(model="Model1", name=None) + with pytest.raises(db.TransactionError): + rec.insert() + assert len(rec.get_errors()) == 1 + assert rec.get_errors()[0].description == "State not specified." + +def test_state_not_in_state_model(): + rec = db.Record() + rec.add_parent("TestRT") + rec.state = db.State(model="Model1", name="UnknownState") + with pytest.raises(db.TransactionError): + rec.insert() + assert len(rec.get_errors()) == 1 + assert rec.get_errors()[0].description == "State does not exist in this StateModel."