Skip to content
Snippets Groups Projects
Verified Commit d3ac4239 authored by Timm Fitschen's avatar Timm Fitschen
Browse files

WIP: multi retrieve

parent 8d56a257
No related branches found
No related tags found
1 merge request!5F multi retrieve
Pipeline #11021 passed
Pipeline: caosdb-cppinttest

#11027

    This commit is part of merge request !5. Comments created here will be created in the context of that merge request.
    ......@@ -71,10 +71,6 @@ public:
    private:
    inline Messages() : wrapped(nullptr){};
    explicit inline Messages(
    ::google::protobuf::RepeatedPtrField<caosdb::entity::v1alpha1::Message>
    *wrapped)
    : wrapped(wrapped){};
    ::google::protobuf::RepeatedPtrField<caosdb::entity::v1alpha1::Message>
    *wrapped;
    ......
    ......@@ -181,4 +181,10 @@ void caosdb_log_trace(const char *channel, const char *msg);
    BOOST_LOG_CHANNEL_SEV(caosdb::logging::logger::get(), Channel, \
    CAOSDB_LOG_LEVEL_TRACE)
    #define CAOSDB_LOG_ERROR_AND_RETURN_STATUS(Channel, StatusCode, Message) \
    CAOSDB_LOG_ERROR(Channel) \
    << "StatusCode (" << StatusCode << ") " \
    << caosdb::get_status_description(StatusCode) << ": " << Message; \
    return StatusCode;
    #endif
    ......@@ -46,6 +46,8 @@ enum StatusCode {
    CONFIGURATION_ERROR = 23,
    UNKNOWN_CONNECTION_ERROR = 24,
    TRANSACTION_STATUS_ERROR = 25,
    TRANSACTION_TYPE_ERROR = 26,
    UNSUPPORTED_FEATURE = 27,
    };
    auto get_status_description(int code) -> const std::string &;
    ......
    ......@@ -24,13 +24,81 @@
    /**
    * @brief Creation and execution of transactions.
    */
    #include "caosdb/entity.h" // for Entity
    #include "caosdb/entity.h" // for Entity
    #include "caosdb/logging.h"
    #include "caosdb/entity/v1alpha1/main.grpc.pb.h" // for EntityTransactionSe...
    #include "caosdb/entity/v1alpha1/main.pb.h" // for Entity, RetrieveReq...
    #include "caosdb/transaction_status.h" // for TransactionStatus
    #include "caosdb/status_code.h" // for StatusCode
    #include "google/protobuf/util/json_util.h" // for MessageToJsonString, Jso...
    #include <memory> // for shared_ptr, unique_ptr
    #include <string> // for string
    #include <iterator>
    #include <memory> // for shared_ptr, unique_ptr
    #include <string> // for string
    /*
    * Do all necessary checks and assure that another retrieval (by id or by
    * query) can be added as a sub-request to a transaction.
    */
    #define ASSERT_CAN_ADD_RETRIEVAL \
    if (!IsStatus(TransactionStatus::INITIAL())) { \
    return StatusCode::TRANSACTION_STATUS_ERROR; \
    } \
    switch (this->transaction_type) { \
    case NONE: \
    this->transaction_type = TransactionType::READ_ONLY; \
    case READ_ONLY: \
    case MIXED_READ_AND_WRITE: \
    break; \
    default: \
    CAOSDB_LOG_ERROR_AND_RETURN_STATUS( \
    logger_name, StatusCode::TRANSACTION_TYPE_ERROR, \
    "You cannot add a retrieval to this transaction because it has the " \
    "wrong TransactionType.") \
    }
    /*
    * Do all necessary checks and assure that another deletion can be added as a
    * sub-request to a transaction.
    */
    #define ASSERT_CAN_ADD_DELETION \
    if (!IsStatus(TransactionStatus::INITIAL())) { \
    return StatusCode::TRANSACTION_STATUS_ERROR; \
    } \
    switch (this->transaction_type) { \
    case NONE: \
    this->transaction_type = TransactionType::DELETE; \
    case DELETE: \
    case MIXED_WRITE: \
    case MIXED_READ_AND_WRITE: \
    break; \
    default: \
    CAOSDB_LOG_ERROR_AND_RETURN_STATUS( \
    logger_name, StatusCode::TRANSACTION_TYPE_ERROR, \
    "You cannot add a deletion to this transaction because it has the " \
    "wrong TransactionType.") \
    }
    /*
    * Do all necessary checks and assure that another insertion can be added as a
    * sub-request to a transaction.
    */
    #define ASSERT_CAN_ADD_INSERTION \
    if (!IsStatus(TransactionStatus::INITIAL())) { \
    return StatusCode::TRANSACTION_STATUS_ERROR; \
    } \
    switch (this->transaction_type) { \
    case NONE: \
    this->transaction_type = TransactionType::INSERT; \
    case INSERT: \
    case MIXED_WRITE: \
    case MIXED_READ_AND_WRITE: \
    break; \
    default: \
    CAOSDB_LOG_ERROR_AND_RETURN_STATUS( \
    logger_name, StatusCode::TRANSACTION_TYPE_ERROR, \
    "You cannot add an insertion to this transaction because it has the " \
    "wrong TransactionType.") \
    }
    namespace caosdb::transaction {
    using caosdb::entity::Entity;
    ......@@ -40,20 +108,60 @@ using caosdb::entity::v1alpha1::IdResponse;
    using caosdb::entity::v1alpha1::MultiTransactionRequest;
    using caosdb::entity::v1alpha1::MultiTransactionResponse;
    using caosdb::transaction::TransactionStatus;
    using WrappedResponseCase =
    caosdb::entity::v1alpha1::TransactionResponse::WrappedResponseCase;
    static const std::string logger_name = "caosdb::transaction";
    class ResultSet {
    public:
    virtual ~ResultSet() = default;
    [[nodiscard]] virtual auto Size() const noexcept -> int = 0;
    };
    class MultiResultSet : public ResultSet {
    public:
    ~MultiResultSet() = default;
    explicit inline MultiResultSet(MultiTransactionResponse *response) {
    auto responses = response->mutable_responses();
    Entity *entity = nullptr;
    for (auto sub_response : *responses) {
    switch (sub_response.wrapped_response_case()) {
    case WrappedResponseCase::kRetrieveResponse:
    entity = new Entity(
    sub_response.mutable_retrieve_response()->release_entity());
    break;
    case WrappedResponseCase::kInsertResponse:
    entity = new Entity(sub_response.release_insert_response());
    break;
    case WrappedResponseCase::kDeleteResponse:
    entity = new Entity(sub_response.release_delete_response());
    break;
    default:
    // TODO(tf) Updates
    break;
    }
    if (entity) {
    this->entities.push_back(std::unique_ptr<Entity>(entity));
    }
    }
    }
    [[nodiscard]] inline auto Size() const noexcept -> int override {
    return this->entities.size();
    }
    std::vector<std::unique_ptr<Entity>> entities;
    };
    class UniqueResult : public ResultSet {
    public:
    ~UniqueResult(){};
    ~UniqueResult() = default;
    explicit inline UniqueResult(ProtoEntity *protoEntity)
    : entity(new Entity(protoEntity)){};
    explicit inline UniqueResult(IdResponse *idResponse)
    : entity(new Entity(idResponse)){};
    [[nodiscard]] auto GetEntity() const -> const Entity &;
    [[nodiscard]] inline auto Size() const noexcept -> int override { return 1; }
    private:
    std::unique_ptr<Entity> entity;
    ......@@ -63,15 +171,16 @@ private:
    * @brief Create a transaction via `CaosDBConnection.createTransaction()`
    */
    class Transaction {
    private:
    mutable std::unique_ptr<ResultSet> result_set;
    mutable TransactionStatus status = TransactionStatus::INITIAL();
    std::shared_ptr<EntityTransactionService::Stub> service_stub;
    MultiTransactionRequest *request;
    mutable MultiTransactionResponse *response;
    std::string error_message;
    public:
    enum TransactionType {
    NONE,
    READ_ONLY,
    INSERT,
    UPDATE,
    DELETE,
    MIXED_WRITE,
    MIXED_READ_AND_WRITE
    };
    Transaction(std::shared_ptr<EntityTransactionService::Stub> service_stub);
    /**
    ......@@ -80,7 +189,21 @@ public:
    * The retrieval is being processed when the Execute() or
    * ExecuteAsynchronously() methods of this transaction are called.
    */
    auto RetrieveById(const std::string &id) -> void;
    auto RetrieveById(const std::string &id) noexcept -> StatusCode;
    /**
    * Add all entity ids to this transaction for retrieval.
    */
    template <class InputIterator>
    inline auto RetrieveById(InputIterator begin, InputIterator end) noexcept
    -> StatusCode;
    /**
    * Add a query to this transaction.
    *
    * Currently, only FIND and COUNT queries are supported.
    */
    auto Query(const std::string &query) noexcept -> StatusCode;
    /**
    * Add the entity to this transaction for an insertion.
    ......@@ -90,7 +213,7 @@ public:
    *
    * Changing the entity afterwards results in undefined behavior.
    */
    auto InsertEntity(Entity *entity) -> void;
    auto InsertEntity(Entity *entity) noexcept -> StatusCode;
    /**
    * Add an entity id to this transaction for deletion.
    ......@@ -98,7 +221,7 @@ public:
    * The deletion is being processed when the Execute() or
    * ExecuteAsynchronously() methods of this transaction are called.
    */
    auto DeleteById(const std::string &id) -> void;
    auto DeleteById(const std::string &id) noexcept -> StatusCode;
    inline auto IsStatus(const TransactionStatus &status) const noexcept -> bool {
    return this->status.GetCode() == status.GetCode();
    ......@@ -116,7 +239,7 @@ public:
    *
    * Use WaitForIt() to join the back-ground execution of this transaction.
    */
    auto ExecuteAsynchronously() noexcept -> void;
    auto ExecuteAsynchronously() noexcept -> StatusCode;
    /**
    * Join the background execution and return the status when the execution
    ......@@ -138,13 +261,48 @@ public:
    return *result_set;
    }
    /**
    * Return the number of sub-requests in this transaction.
    *
    * This is meant for debugging because the number of sub-requests is a
    * GRPC-API detail.
    */
    [[nodiscard]] inline auto GetRequestCount() const -> int {
    return this->request->requests_size();
    }
    inline auto RequestToString() const -> const std::string {
    google::protobuf::util::JsonOptions options;
    std::string out;
    google::protobuf::util::MessageToJsonString(*this->request, &out, options);
    return out;
    }
    private:
    TransactionType transaction_type = TransactionType::NONE;
    mutable std::unique_ptr<ResultSet> result_set;
    mutable TransactionStatus status = TransactionStatus::INITIAL();
    std::shared_ptr<EntityTransactionService::Stub> service_stub;
    MultiTransactionRequest *request;
    mutable MultiTransactionResponse *response;
    std::string error_message;
    };
    template <class InputIterator>
    inline auto Transaction::RetrieveById(InputIterator begin,
    InputIterator end) noexcept
    -> StatusCode {
    ASSERT_CAN_ADD_RETRIEVAL
    auto next = begin;
    while (next != end) {
    auto *sub_request = this->request->add_requests();
    sub_request->mutable_retrieve_request()->set_id(*next);
    next = std::next(next);
    }
    return StatusCode::INITIAL;
    }
    } // namespace caosdb::transaction
    #endif
    ......@@ -18,9 +18,10 @@
    *
    */
    #include "caosdb/transaction.h"
    #include "caosdb/entity/v1alpha1/main.grpc.pb.h" // for EntityTransactionS...
    #include "caosdb/entity/v1alpha1/main.pb.h" // for SingleRetrieveRequest
    #include "caosdb/exceptions.h" // for TransactionError, ...
    #include "caosdb/entity/v1alpha1/main.grpc.pb.h" // for EntityTransactionS...
    #include "caosdb/entity/v1alpha1/main.pb.h" // for SingleRetrieveRequest
    #include "caosdb/exceptions.h" // for TransactionError, ...
    #include "caosdb/logging.h"
    #include "caosdb/protobuf_helper.h" // for get_arena
    #include "caosdb/status_code.h" // for StatusCode, AUTHEN...
    #include "google/protobuf/arena.h" // for Arena
    ......@@ -64,7 +65,12 @@ auto get_status_description(int code) -> const std::string & {
    {StatusCode::UNKNOWN_CONNECTION_ERROR,
    "The ConnectionManager does not know any connection of this name."},
    {StatusCode::TRANSACTION_STATUS_ERROR,
    "The Transaction is in a wrong state for the attempted action."}};
    "The Transaction is in a wrong state for the attempted action."},
    {StatusCode::TRANSACTION_TYPE_ERROR,
    "The Transaction has a transaction type which does not allow the "
    "attempted action."},
    {StatusCode::UNSUPPORTED_FEATURE,
    "This feature is not available in the this client implementation."}};
    try {
    return descriptions.at(code);
    } catch (const std::out_of_range &exc) {
    ......@@ -80,8 +86,6 @@ using caosdb::entity::v1alpha1::MultiTransactionRequest;
    using caosdb::entity::v1alpha1::MultiTransactionResponse;
    using WrappedResponseCase =
    caosdb::entity::v1alpha1::TransactionResponse::WrappedResponseCase;
    using caosdb::exceptions::TransactionError;
    using caosdb::exceptions::TransactionStatusError;
    using caosdb::utility::get_arena;
    using grpc::ClientAsyncResponseReader;
    using ProtoEntity = caosdb::entity::v1alpha1::Entity;
    ......@@ -101,55 +105,42 @@ Transaction::Transaction(
    this->service_stub = std::move(service_stub);
    }
    auto Transaction::RetrieveById(const std::string &id) -> void {
    if (!IsStatus(TransactionStatus::INITIAL())) {
    throw TransactionStatusError(
    caosdb::get_status_description(StatusCode::TRANSACTION_STATUS_ERROR));
    }
    // TODO(tf) remove checks when the server is ready
    if (this->request->requests_size() > 0) {
    throw TransactionError(
    "This request object cannot handle another RetrieveById sub-request");
    }
    auto Transaction::RetrieveById(const std::string &id) noexcept -> StatusCode {
    ASSERT_CAN_ADD_RETRIEVAL
    auto *sub_request = this->request->add_requests();
    sub_request->mutable_retrieve_request()->set_id(id);
    return StatusCode::INITIAL;
    }
    auto Transaction::DeleteById(const std::string &id) -> void {
    if (!IsStatus(TransactionStatus::INITIAL())) {
    throw TransactionStatusError(
    caosdb::get_status_description(StatusCode::TRANSACTION_STATUS_ERROR));
    }
    auto Transaction::Query(const std::string &query) noexcept -> StatusCode {
    ASSERT_CAN_ADD_RETRIEVAL
    // TODO(tf) remove checks when the server is ready
    if (this->request->requests_size() > 0) {
    throw TransactionError(
    "This request object cannot handle another DeleteById sub-request");
    }
    auto *sub_request = this->request->add_requests();
    sub_request->mutable_retrieve_request()->mutable_query()->set_query(query);
    return StatusCode::INITIAL;
    }
    auto Transaction::DeleteById(const std::string &id) noexcept -> StatusCode {
    ASSERT_CAN_ADD_DELETION
    auto *sub_request = this->request->add_requests();
    sub_request->mutable_delete_request()->set_id(id);
    }
    auto Transaction::InsertEntity(Entity *entity) -> void {
    if (!IsStatus(TransactionStatus::INITIAL())) {
    throw TransactionStatusError(
    caosdb::get_status_description(StatusCode::TRANSACTION_STATUS_ERROR));
    }
    return StatusCode::INITIAL;
    }
    // TODO(tf) remove checks when the server is ready
    if (this->request->requests_size() > 0) {
    throw TransactionError(
    "This request object cannot handle another DeleteById sub-request");
    }
    auto Transaction::InsertEntity(Entity *entity) noexcept -> StatusCode {
    ASSERT_CAN_ADD_INSERTION
    auto *sub_request = this->request->add_requests();
    auto *proto_entity = sub_request->mutable_insert_request();
    // swap and switch
    entity->Switch(proto_entity);
    return StatusCode::INITIAL;
    }
    auto Transaction::Execute() -> TransactionStatus {
    ......@@ -158,7 +149,26 @@ auto Transaction::Execute() -> TransactionStatus {
    return this->status;
    }
    auto Transaction::ExecuteAsynchronously() noexcept -> void {
    auto Transaction::ExecuteAsynchronously() noexcept -> StatusCode {
    if (!IsStatus(TransactionStatus::INITIAL())) {
    return StatusCode::TRANSACTION_STATUS_ERROR;
    }
    switch (this->transaction_type) {
    case MIXED_WRITE:
    CAOSDB_LOG_ERROR_AND_RETURN_STATUS(
    logger_name, StatusCode::UNSUPPORTED_FEATURE,
    "MIXED_WRITE UNSUPPORTED: The current implementation does not support "
    "mixed write transactions (containing insertions, deletions, and updates "
    "in one transaction).")
    case MIXED_READ_AND_WRITE:
    CAOSDB_LOG_ERROR_AND_RETURN_STATUS(
    logger_name, StatusCode::UNSUPPORTED_FEATURE,
    "MIXED_WRITE UNSUPPORTED: The current implementation does not support "
    "mixed read and write transactions (containing retrievals, insertions, "
    "deletions, and updates in one transaction).")
    default:
    break;
    }
    this->status = TransactionStatus::EXECUTING();
    grpc::Status grpc_status;
    ......@@ -197,6 +207,7 @@ auto Transaction::ExecuteAsynchronously() noexcept -> void {
    } else {
    this->status = TransactionStatus::SUCCESS();
    }
    return StatusCode::EXECUTING;
    }
    auto Transaction::WaitForIt() const noexcept -> TransactionStatus {
    ......@@ -232,7 +243,7 @@ auto Transaction::WaitForIt() const noexcept -> TransactionStatus {
    break;
    }
    } else {
    // TODO(tf)
    this->result_set = std::make_unique<MultiResultSet>(this->response);
    }
    return this->status;
    ......
    /*
    *
    * This file is a part of the CaosDB Project.
    *
    * Copyright (C) 2021 Timm Fitschen <t.fitschen@indiscale.com>
    ......@@ -17,7 +16,6 @@
    *
    * You should have received a copy of the GNU Affero General Public License
    * along with this program. If not, see <https://www.gnu.org/licenses/>.
    *
    */
    #include "caosdb/configuration.h" // for InsecureConnectionConfig...
    #include "caosdb/connection.h" // for Connection
    ......@@ -73,9 +71,59 @@ TEST(test_transaction, test_unavailable) {
    transaction->RetrieveById("100");
    transaction->ExecuteAsynchronously();
    EXPECT_EQ(transaction->GetRequestCount(), 1);
    auto status = transaction->WaitForIt();
    EXPECT_EQ(status.GetCode(), StatusCode::CONNECTION_ERROR);
    }
    TEST(test_transaction, test_retrieve_by_ids) {
    const auto *host = "localhost";
    auto configuration = InsecureConnectionConfiguration(host, 8000);
    Connection connection(configuration);
    auto transaction = connection.CreateTransaction();
    std::vector<std::string> ids = {"100", "101", "102"};
    transaction->RetrieveById(ids.begin(), ids.end());
    EXPECT_EQ(transaction->GetRequestCount(), 3);
    }
    TEST(test_transaction, test_multi_result_set_empty) {
    MultiTransactionResponse response;
    MultiResultSet rs(&response);
    EXPECT_EQ(rs.Size(), 0);
    }
    TEST(test_transaction, test_multi_result_set_one) {
    MultiTransactionResponse response;
    response.add_responses()
    ->mutable_retrieve_response()
    ->mutable_entity()
    ->set_id("100");
    MultiResultSet rs(&response);
    EXPECT_EQ(rs.Size(), 1);
    }
    TEST(test_transaction, test_multi_result_set_three) {
    MultiTransactionResponse response;
    response.add_responses()
    ->mutable_retrieve_response()
    ->mutable_entity()
    ->set_id("100");
    response.add_responses()
    ->mutable_retrieve_response()
    ->mutable_entity()
    ->set_id("101");
    response.add_responses()
    ->mutable_retrieve_response()
    ->mutable_entity()
    ->set_id("102");
    MultiResultSet rs(&response);
    EXPECT_EQ(rs.Size(), 3);
    }
    } // namespace caosdb::transaction
    0% Loading or .
    You are about to add 0 people to the discussion. Proceed with caution.
    Finish editing this message first!
    Please register or to comment