From d3ac4239b7eb2aca291a133a7b42478274f264d6 Mon Sep 17 00:00:00 2001 From: Timm Fitschen <t.fitschen@indiscale.com> Date: Sun, 1 Aug 2021 12:23:12 +0200 Subject: [PATCH] WIP: multi retrieve --- include/caosdb/entity.h | 4 - include/caosdb/logging.h | 6 ++ include/caosdb/status_code.h | 2 + include/caosdb/transaction.h | 190 ++++++++++++++++++++++++++++++++--- src/caosdb/transaction.cpp | 93 +++++++++-------- test/test_transaction.cpp | 52 +++++++++- 6 files changed, 284 insertions(+), 63 deletions(-) diff --git a/include/caosdb/entity.h b/include/caosdb/entity.h index 46ac1b7..ace778c 100644 --- a/include/caosdb/entity.h +++ b/include/caosdb/entity.h @@ -71,10 +71,6 @@ public: private: inline Messages() : wrapped(nullptr){}; - explicit inline Messages( - ::google::protobuf::RepeatedPtrField<caosdb::entity::v1alpha1::Message> - *wrapped) - : wrapped(wrapped){}; ::google::protobuf::RepeatedPtrField<caosdb::entity::v1alpha1::Message> *wrapped; diff --git a/include/caosdb/logging.h b/include/caosdb/logging.h index 26ec1c8..8d3acdd 100644 --- a/include/caosdb/logging.h +++ b/include/caosdb/logging.h @@ -181,4 +181,10 @@ void caosdb_log_trace(const char *channel, const char *msg); BOOST_LOG_CHANNEL_SEV(caosdb::logging::logger::get(), Channel, \ CAOSDB_LOG_LEVEL_TRACE) +#define CAOSDB_LOG_ERROR_AND_RETURN_STATUS(Channel, StatusCode, Message) \ + CAOSDB_LOG_ERROR(Channel) \ + << "StatusCode (" << StatusCode << ") " \ + << caosdb::get_status_description(StatusCode) << ": " << Message; \ + return StatusCode; + #endif diff --git a/include/caosdb/status_code.h b/include/caosdb/status_code.h index 021f893..7d2f9f9 100644 --- a/include/caosdb/status_code.h +++ b/include/caosdb/status_code.h @@ -46,6 +46,8 @@ enum StatusCode { CONFIGURATION_ERROR = 23, UNKNOWN_CONNECTION_ERROR = 24, TRANSACTION_STATUS_ERROR = 25, + TRANSACTION_TYPE_ERROR = 26, + UNSUPPORTED_FEATURE = 27, }; auto get_status_description(int code) -> const std::string &; diff --git a/include/caosdb/transaction.h b/include/caosdb/transaction.h index 8b7dd95..56bd966 100644 --- a/include/caosdb/transaction.h +++ b/include/caosdb/transaction.h @@ -24,13 +24,81 @@ /** * @brief Creation and execution of transactions. */ -#include "caosdb/entity.h" // for Entity +#include "caosdb/entity.h" // for Entity +#include "caosdb/logging.h" #include "caosdb/entity/v1alpha1/main.grpc.pb.h" // for EntityTransactionSe... #include "caosdb/entity/v1alpha1/main.pb.h" // for Entity, RetrieveReq... #include "caosdb/transaction_status.h" // for TransactionStatus +#include "caosdb/status_code.h" // for StatusCode #include "google/protobuf/util/json_util.h" // for MessageToJsonString, Jso... -#include <memory> // for shared_ptr, unique_ptr -#include <string> // for string +#include <iterator> +#include <memory> // for shared_ptr, unique_ptr +#include <string> // for string + +/* + * Do all necessary checks and assure that another retrieval (by id or by + * query) can be added as a sub-request to a transaction. + */ +#define ASSERT_CAN_ADD_RETRIEVAL \ + if (!IsStatus(TransactionStatus::INITIAL())) { \ + return StatusCode::TRANSACTION_STATUS_ERROR; \ + } \ + switch (this->transaction_type) { \ + case NONE: \ + this->transaction_type = TransactionType::READ_ONLY; \ + case READ_ONLY: \ + case MIXED_READ_AND_WRITE: \ + break; \ + default: \ + CAOSDB_LOG_ERROR_AND_RETURN_STATUS( \ + logger_name, StatusCode::TRANSACTION_TYPE_ERROR, \ + "You cannot add a retrieval to this transaction because it has the " \ + "wrong TransactionType.") \ + } + +/* + * Do all necessary checks and assure that another deletion can be added as a + * sub-request to a transaction. + */ +#define ASSERT_CAN_ADD_DELETION \ + if (!IsStatus(TransactionStatus::INITIAL())) { \ + return StatusCode::TRANSACTION_STATUS_ERROR; \ + } \ + switch (this->transaction_type) { \ + case NONE: \ + this->transaction_type = TransactionType::DELETE; \ + case DELETE: \ + case MIXED_WRITE: \ + case MIXED_READ_AND_WRITE: \ + break; \ + default: \ + CAOSDB_LOG_ERROR_AND_RETURN_STATUS( \ + logger_name, StatusCode::TRANSACTION_TYPE_ERROR, \ + "You cannot add a deletion to this transaction because it has the " \ + "wrong TransactionType.") \ + } + +/* + * Do all necessary checks and assure that another insertion can be added as a + * sub-request to a transaction. + */ +#define ASSERT_CAN_ADD_INSERTION \ + if (!IsStatus(TransactionStatus::INITIAL())) { \ + return StatusCode::TRANSACTION_STATUS_ERROR; \ + } \ + switch (this->transaction_type) { \ + case NONE: \ + this->transaction_type = TransactionType::INSERT; \ + case INSERT: \ + case MIXED_WRITE: \ + case MIXED_READ_AND_WRITE: \ + break; \ + default: \ + CAOSDB_LOG_ERROR_AND_RETURN_STATUS( \ + logger_name, StatusCode::TRANSACTION_TYPE_ERROR, \ + "You cannot add an insertion to this transaction because it has the " \ + "wrong TransactionType.") \ + } namespace caosdb::transaction { using caosdb::entity::Entity; @@ -40,20 +108,60 @@ using caosdb::entity::v1alpha1::IdResponse; using caosdb::entity::v1alpha1::MultiTransactionRequest; using caosdb::entity::v1alpha1::MultiTransactionResponse; using caosdb::transaction::TransactionStatus; +using WrappedResponseCase = + caosdb::entity::v1alpha1::TransactionResponse::WrappedResponseCase; + +static const std::string logger_name = "caosdb::transaction"; class ResultSet { public: virtual ~ResultSet() = default; + [[nodiscard]] virtual auto Size() const noexcept -> int = 0; +}; + +class MultiResultSet : public ResultSet { +public: + ~MultiResultSet() = default; + explicit inline MultiResultSet(MultiTransactionResponse *response) { + auto responses = response->mutable_responses(); + Entity *entity = nullptr; + for (auto sub_response : *responses) { + switch (sub_response.wrapped_response_case()) { + case WrappedResponseCase::kRetrieveResponse: + entity = new Entity( + sub_response.mutable_retrieve_response()->release_entity()); + break; + case WrappedResponseCase::kInsertResponse: + + entity = new Entity(sub_response.release_insert_response()); + break; + case WrappedResponseCase::kDeleteResponse: + entity = new Entity(sub_response.release_delete_response()); + break; + default: + // TODO(tf) Updates + break; + } + if (entity) { + this->entities.push_back(std::unique_ptr<Entity>(entity)); + } + } + } + [[nodiscard]] inline auto Size() const noexcept -> int override { + return this->entities.size(); + } + std::vector<std::unique_ptr<Entity>> entities; }; class UniqueResult : public ResultSet { public: - ~UniqueResult(){}; + ~UniqueResult() = default; explicit inline UniqueResult(ProtoEntity *protoEntity) : entity(new Entity(protoEntity)){}; explicit inline UniqueResult(IdResponse *idResponse) : entity(new Entity(idResponse)){}; [[nodiscard]] auto GetEntity() const -> const Entity &; + [[nodiscard]] inline auto Size() const noexcept -> int override { return 1; } private: std::unique_ptr<Entity> entity; @@ -63,15 +171,16 @@ private: * @brief Create a transaction via `CaosDBConnection.createTransaction()` */ class Transaction { -private: - mutable std::unique_ptr<ResultSet> result_set; - mutable TransactionStatus status = TransactionStatus::INITIAL(); - std::shared_ptr<EntityTransactionService::Stub> service_stub; - MultiTransactionRequest *request; - mutable MultiTransactionResponse *response; - std::string error_message; - public: + enum TransactionType { + NONE, + READ_ONLY, + INSERT, + UPDATE, + DELETE, + MIXED_WRITE, + MIXED_READ_AND_WRITE + }; Transaction(std::shared_ptr<EntityTransactionService::Stub> service_stub); /** @@ -80,7 +189,21 @@ public: * The retrieval is being processed when the Execute() or * ExecuteAsynchronously() methods of this transaction are called. */ - auto RetrieveById(const std::string &id) -> void; + auto RetrieveById(const std::string &id) noexcept -> StatusCode; + + /** + * Add all entity ids to this transaction for retrieval. + */ + template <class InputIterator> + inline auto RetrieveById(InputIterator begin, InputIterator end) noexcept + -> StatusCode; + + /** + * Add a query to this transaction. + * + * Currently, only FIND and COUNT queries are supported. + */ + auto Query(const std::string &query) noexcept -> StatusCode; /** * Add the entity to this transaction for an insertion. @@ -90,7 +213,7 @@ public: * * Changing the entity afterwards results in undefined behavior. */ - auto InsertEntity(Entity *entity) -> void; + auto InsertEntity(Entity *entity) noexcept -> StatusCode; /** * Add an entity id to this transaction for deletion. @@ -98,7 +221,7 @@ public: * The deletion is being processed when the Execute() or * ExecuteAsynchronously() methods of this transaction are called. */ - auto DeleteById(const std::string &id) -> void; + auto DeleteById(const std::string &id) noexcept -> StatusCode; inline auto IsStatus(const TransactionStatus &status) const noexcept -> bool { return this->status.GetCode() == status.GetCode(); @@ -116,7 +239,7 @@ public: * * Use WaitForIt() to join the back-ground execution of this transaction. */ - auto ExecuteAsynchronously() noexcept -> void; + auto ExecuteAsynchronously() noexcept -> StatusCode; /** * Join the background execution and return the status when the execution @@ -138,13 +261,48 @@ public: return *result_set; } + /** + * Return the number of sub-requests in this transaction. + * + * This is meant for debugging because the number of sub-requests is a + * GRPC-API detail. + */ + [[nodiscard]] inline auto GetRequestCount() const -> int { + return this->request->requests_size(); + } + inline auto RequestToString() const -> const std::string { google::protobuf::util::JsonOptions options; std::string out; google::protobuf::util::MessageToJsonString(*this->request, &out, options); return out; } + +private: + TransactionType transaction_type = TransactionType::NONE; + mutable std::unique_ptr<ResultSet> result_set; + mutable TransactionStatus status = TransactionStatus::INITIAL(); + std::shared_ptr<EntityTransactionService::Stub> service_stub; + MultiTransactionRequest *request; + mutable MultiTransactionResponse *response; + std::string error_message; }; +template <class InputIterator> +inline auto Transaction::RetrieveById(InputIterator begin, + InputIterator end) noexcept + -> StatusCode { + ASSERT_CAN_ADD_RETRIEVAL + + auto next = begin; + while (next != end) { + auto *sub_request = this->request->add_requests(); + sub_request->mutable_retrieve_request()->set_id(*next); + next = std::next(next); + } + + return StatusCode::INITIAL; +} + } // namespace caosdb::transaction #endif diff --git a/src/caosdb/transaction.cpp b/src/caosdb/transaction.cpp index 380362a..688472e 100644 --- a/src/caosdb/transaction.cpp +++ b/src/caosdb/transaction.cpp @@ -18,9 +18,10 @@ * */ #include "caosdb/transaction.h" -#include "caosdb/entity/v1alpha1/main.grpc.pb.h" // for EntityTransactionS... -#include "caosdb/entity/v1alpha1/main.pb.h" // for SingleRetrieveRequest -#include "caosdb/exceptions.h" // for TransactionError, ... +#include "caosdb/entity/v1alpha1/main.grpc.pb.h" // for EntityTransactionS... +#include "caosdb/entity/v1alpha1/main.pb.h" // for SingleRetrieveRequest +#include "caosdb/exceptions.h" // for TransactionError, ... +#include "caosdb/logging.h" #include "caosdb/protobuf_helper.h" // for get_arena #include "caosdb/status_code.h" // for StatusCode, AUTHEN... #include "google/protobuf/arena.h" // for Arena @@ -64,7 +65,12 @@ auto get_status_description(int code) -> const std::string & { {StatusCode::UNKNOWN_CONNECTION_ERROR, "The ConnectionManager does not know any connection of this name."}, {StatusCode::TRANSACTION_STATUS_ERROR, - "The Transaction is in a wrong state for the attempted action."}}; + "The Transaction is in a wrong state for the attempted action."}, + {StatusCode::TRANSACTION_TYPE_ERROR, + "The Transaction has a transaction type which does not allow the " + "attempted action."}, + {StatusCode::UNSUPPORTED_FEATURE, + "This feature is not available in the this client implementation."}}; try { return descriptions.at(code); } catch (const std::out_of_range &exc) { @@ -80,8 +86,6 @@ using caosdb::entity::v1alpha1::MultiTransactionRequest; using caosdb::entity::v1alpha1::MultiTransactionResponse; using WrappedResponseCase = caosdb::entity::v1alpha1::TransactionResponse::WrappedResponseCase; -using caosdb::exceptions::TransactionError; -using caosdb::exceptions::TransactionStatusError; using caosdb::utility::get_arena; using grpc::ClientAsyncResponseReader; using ProtoEntity = caosdb::entity::v1alpha1::Entity; @@ -101,55 +105,42 @@ Transaction::Transaction( this->service_stub = std::move(service_stub); } -auto Transaction::RetrieveById(const std::string &id) -> void { - if (!IsStatus(TransactionStatus::INITIAL())) { - throw TransactionStatusError( - caosdb::get_status_description(StatusCode::TRANSACTION_STATUS_ERROR)); - } - - // TODO(tf) remove checks when the server is ready - if (this->request->requests_size() > 0) { - throw TransactionError( - "This request object cannot handle another RetrieveById sub-request"); - } +auto Transaction::RetrieveById(const std::string &id) noexcept -> StatusCode { + ASSERT_CAN_ADD_RETRIEVAL auto *sub_request = this->request->add_requests(); sub_request->mutable_retrieve_request()->set_id(id); + + return StatusCode::INITIAL; } -auto Transaction::DeleteById(const std::string &id) -> void { - if (!IsStatus(TransactionStatus::INITIAL())) { - throw TransactionStatusError( - caosdb::get_status_description(StatusCode::TRANSACTION_STATUS_ERROR)); - } +auto Transaction::Query(const std::string &query) noexcept -> StatusCode { + ASSERT_CAN_ADD_RETRIEVAL - // TODO(tf) remove checks when the server is ready - if (this->request->requests_size() > 0) { - throw TransactionError( - "This request object cannot handle another DeleteById sub-request"); - } + auto *sub_request = this->request->add_requests(); + sub_request->mutable_retrieve_request()->mutable_query()->set_query(query); + + return StatusCode::INITIAL; +} + +auto Transaction::DeleteById(const std::string &id) noexcept -> StatusCode { + ASSERT_CAN_ADD_DELETION auto *sub_request = this->request->add_requests(); sub_request->mutable_delete_request()->set_id(id); -} -auto Transaction::InsertEntity(Entity *entity) -> void { - if (!IsStatus(TransactionStatus::INITIAL())) { - throw TransactionStatusError( - caosdb::get_status_description(StatusCode::TRANSACTION_STATUS_ERROR)); - } + return StatusCode::INITIAL; +} - // TODO(tf) remove checks when the server is ready - if (this->request->requests_size() > 0) { - throw TransactionError( - "This request object cannot handle another DeleteById sub-request"); - } +auto Transaction::InsertEntity(Entity *entity) noexcept -> StatusCode { + ASSERT_CAN_ADD_INSERTION auto *sub_request = this->request->add_requests(); auto *proto_entity = sub_request->mutable_insert_request(); - // swap and switch entity->Switch(proto_entity); + + return StatusCode::INITIAL; } auto Transaction::Execute() -> TransactionStatus { @@ -158,7 +149,26 @@ auto Transaction::Execute() -> TransactionStatus { return this->status; } -auto Transaction::ExecuteAsynchronously() noexcept -> void { +auto Transaction::ExecuteAsynchronously() noexcept -> StatusCode { + if (!IsStatus(TransactionStatus::INITIAL())) { + return StatusCode::TRANSACTION_STATUS_ERROR; + } + switch (this->transaction_type) { + case MIXED_WRITE: + CAOSDB_LOG_ERROR_AND_RETURN_STATUS( + logger_name, StatusCode::UNSUPPORTED_FEATURE, + "MIXED_WRITE UNSUPPORTED: The current implementation does not support " + "mixed write transactions (containing insertions, deletions, and updates " + "in one transaction).") + case MIXED_READ_AND_WRITE: + CAOSDB_LOG_ERROR_AND_RETURN_STATUS( + logger_name, StatusCode::UNSUPPORTED_FEATURE, + "MIXED_WRITE UNSUPPORTED: The current implementation does not support " + "mixed read and write transactions (containing retrievals, insertions, " + "deletions, and updates in one transaction).") + default: + break; + } this->status = TransactionStatus::EXECUTING(); grpc::Status grpc_status; @@ -197,6 +207,7 @@ auto Transaction::ExecuteAsynchronously() noexcept -> void { } else { this->status = TransactionStatus::SUCCESS(); } + return StatusCode::EXECUTING; } auto Transaction::WaitForIt() const noexcept -> TransactionStatus { @@ -232,7 +243,7 @@ auto Transaction::WaitForIt() const noexcept -> TransactionStatus { break; } } else { - // TODO(tf) + this->result_set = std::make_unique<MultiResultSet>(this->response); } return this->status; diff --git a/test/test_transaction.cpp b/test/test_transaction.cpp index ab1979f..a36eb4d 100644 --- a/test/test_transaction.cpp +++ b/test/test_transaction.cpp @@ -1,5 +1,4 @@ /* - * * This file is a part of the CaosDB Project. * * Copyright (C) 2021 Timm Fitschen <t.fitschen@indiscale.com> @@ -17,7 +16,6 @@ * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see <https://www.gnu.org/licenses/>. - * */ #include "caosdb/configuration.h" // for InsecureConnectionConfig... #include "caosdb/connection.h" // for Connection @@ -73,9 +71,59 @@ TEST(test_transaction, test_unavailable) { transaction->RetrieveById("100"); transaction->ExecuteAsynchronously(); + EXPECT_EQ(transaction->GetRequestCount(), 1); auto status = transaction->WaitForIt(); EXPECT_EQ(status.GetCode(), StatusCode::CONNECTION_ERROR); } +TEST(test_transaction, test_retrieve_by_ids) { + const auto *host = "localhost"; + auto configuration = InsecureConnectionConfiguration(host, 8000); + Connection connection(configuration); + auto transaction = connection.CreateTransaction(); + + std::vector<std::string> ids = {"100", "101", "102"}; + transaction->RetrieveById(ids.begin(), ids.end()); + + EXPECT_EQ(transaction->GetRequestCount(), 3); +} + +TEST(test_transaction, test_multi_result_set_empty) { + MultiTransactionResponse response; + + MultiResultSet rs(&response); + EXPECT_EQ(rs.Size(), 0); +} + +TEST(test_transaction, test_multi_result_set_one) { + MultiTransactionResponse response; + response.add_responses() + ->mutable_retrieve_response() + ->mutable_entity() + ->set_id("100"); + + MultiResultSet rs(&response); + EXPECT_EQ(rs.Size(), 1); +} + +TEST(test_transaction, test_multi_result_set_three) { + MultiTransactionResponse response; + response.add_responses() + ->mutable_retrieve_response() + ->mutable_entity() + ->set_id("100"); + response.add_responses() + ->mutable_retrieve_response() + ->mutable_entity() + ->set_id("101"); + response.add_responses() + ->mutable_retrieve_response() + ->mutable_entity() + ->set_id("102"); + + MultiResultSet rs(&response); + EXPECT_EQ(rs.Size(), 3); +} + } // namespace caosdb::transaction -- GitLab