Skip to content
Snippets Groups Projects
Commit c634d5df authored by Florian Spreckelsen's avatar Florian Spreckelsen
Browse files

Merge branch 'f-async-execute' into 'dev'

F async execute

See merge request !28
parents 7b352aad 5e1a32be
Branches
Tags
2 merge requests!42Release 0.2.0,!28F async execute
Pipeline #25210 passed
Pipeline: caosdb-cppinttest

#25211

    This commit is part of merge request !42. Comments created here will be created in the context of that merge request.
    Showing
    with 387 additions and 255 deletions
    ......@@ -14,7 +14,7 @@ class CaosdbConan(ConanFile):
    default_options = {"shared": False, "fPIC": True}
    generators = "cmake"
    requires = [
    ("grpc/1.39.1"),
    ("grpc/1.45.2"),
    ]
    build_requires = [
    ("boost/1.77.0"),
    ......
    ......@@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
    ### Changed
    * Transaction::ExecuteAsynchronously is actually asynchronous now.
    * Removed boost from the headers. Boost is only a build dependency from now on.
    ### Deprecated
    ......
    ......@@ -28,11 +28,11 @@
    * @brief Configuration and setup of the client authentication.
    */
    #include "caosdb/utility.h" // for base64_encode
    #include <grpcpp/impl/codegen/interceptor.h> // for Status
    #include <grpcpp/impl/codegen/security/auth_context.h> // for AuthContext
    #include <grpcpp/impl/codegen/status.h> // for Status
    #include <grpcpp/impl/codegen/string_ref.h> // for string_ref
    #include <grpcpp/security/auth_context.h> // for AuthContext
    #include <grpcpp/security/credentials.h> // for CallCredentials
    #include <grpcpp/support/interceptor.h> // for Status
    #include <grpcpp/support/status.h> // for Status
    #include <grpcpp/support/string_ref.h> // for string_ref
    #include <map> // for multimap
    #include <memory> // for shared_ptr
    #include <string> // for string
    ......
    ......@@ -55,10 +55,10 @@
    #include "caosdb/file_transmission/file_writer.h" // for FileWriter
    #include "caosdb/handler_interface.h" // for HandlerTag, Handl...
    #include <cstdint> // for uint64_t
    #include <grpcpp/impl/codegen/async_stream.h> // for ClientAsyncReader
    #include <grpcpp/impl/codegen/client_context.h> // for ClientContext
    #include <grpcpp/impl/codegen/completion_queue.h> // for CompletionQueue
    #include <grpcpp/impl/codegen/status.h> // for Status
    #include <grpcpp/client_context.h> // for ClientContext
    #include <grpcpp/completion_queue.h> // for CompletionQueue
    #include <grpcpp/support/async_stream.h> // for ClientAsyncReader
    #include <grpcpp/support/status.h> // for Status
    #include <memory> // for unique_ptr
    namespace caosdb::transaction {
    ......@@ -84,7 +84,11 @@ public:
    DownloadRequestHandler(DownloadRequestHandler &&) = delete;
    DownloadRequestHandler &operator=(DownloadRequestHandler &&) = delete;
    void Start() override { OnNext(true); }
    void Start() override {
    if (state_ == CallState::NewCall) {
    OnNext(true);
    }
    }
    bool OnNext(bool ok) override;
    ......
    ......@@ -52,9 +52,9 @@
    #include "caosdb/entity/v1/main.grpc.pb.h" // for FileTransmissionS...
    #include "caosdb/entity/v1/main.pb.h" // for FileDownloadResponse
    #include "caosdb/handler_interface.h" // for HandlerTag, Handl...
    #include "caosdb/unary_rpc_handler.h"
    #include <grpcpp/impl/codegen/async_unary_call.h> // for ClientAsyncRespons...
    #include <grpcpp/impl/codegen/completion_queue.h> // for CompletionQueue
    #include "caosdb/unary_rpc_handler.h" // for UnaryRpcHandler
    #include <grpcpp/completion_queue.h> // for CompletionQueue
    #include <grpcpp/support/async_unary_call.h> // for ClientAsyncResponseReader
    #include <memory> // for unique_ptr
    namespace caosdb::transaction {
    ......
    ......@@ -55,10 +55,10 @@
    #include "caosdb/file_transmission/file_reader.h" // for FileReader
    #include "caosdb/handler_interface.h" // for HandlerTag, Handl...
    #include <cstdint> // for uint64_t
    #include <grpcpp/impl/codegen/async_stream.h> // for ClientAsyncWriter
    #include <grpcpp/impl/codegen/client_context.h> // for ClientContext
    #include <grpcpp/impl/codegen/completion_queue.h> // for CompletionQueue
    #include <grpcpp/impl/codegen/status.h> // for Status
    #include <grpcpp/client_context.h> // for ClientContext
    #include <grpcpp/completion_queue.h> // for CompletionQueue
    #include <grpcpp/support/async_stream.h> // for ClientAsyncReader
    #include <grpcpp/support/status.h> // for Status
    #include <memory> // for unique_ptr
    namespace caosdb::transaction {
    ......@@ -81,7 +81,11 @@ public:
    UploadRequestHandler(UploadRequestHandler &&) = delete;
    UploadRequestHandler &operator=(UploadRequestHandler &&) = delete;
    void Start() override { OnNext(true); }
    void Start() override {
    if (state_ == CallState::NewCall) {
    OnNext(true);
    }
    }
    bool OnNext(bool ok) override;
    ......
    ......@@ -25,8 +25,8 @@
    #include "caosdb/log_level.h" // for CAOSDB_LOG_...
    #include <cstdint> // for uint64_t
    #include <iosfwd> // for streamsize
    #include <memory> // for shared_ptr
    #include <ostream> // for ostream
    #include <string> // for string
    #include <vector> // for vector
    ......@@ -53,6 +53,30 @@ private:
    int level;
    };
    /**
    * Helper class for logging the entering and leaving of a function or method.
    *
    * Please Use the macro
    *
    * CAOSDB_LOG_TRACE_ENTER_AND_LEAVE(logger_name, function_name);
    */
    class TraceEnterLeaveLogger {
    public:
    inline TraceEnterLeaveLogger(const std::string &channel, const std::string &function_name)
    : channel(channel), function_name(function_name) {
    caosdb::logging::LoggerOutputStream::get(this->channel, CAOSDB_LOG_LEVEL_TRACE)
    << "Enter " << this->function_name;
    }
    inline ~TraceEnterLeaveLogger() {
    caosdb::logging::LoggerOutputStream::get(this->channel, CAOSDB_LOG_LEVEL_TRACE)
    << "Leave " << this->function_name;
    }
    private:
    const std::string &channel;
    const std::string function_name;
    };
    /**
    * This class stores the integer log level.
    */
    ......@@ -210,6 +234,9 @@ void caosdb_log_trace(const char *channel, const char *msg);
    #define CAOSDB_LOG_TRACE(Channel) \
    caosdb::logging::LoggerOutputStream::get(Channel, CAOSDB_LOG_LEVEL_TRACE)
    #define CAOSDB_LOG_TRACE_ENTER_AND_LEAVE(Channel, FunctionName) \
    const caosdb::logging::TraceEnterLeaveLogger trace_enter_leave_logger(Channel, FunctionName);
    #define CAOSDB_LOG_ERROR_AND_RETURN_STATUS(Channel, StatusCode, Message) \
    CAOSDB_LOG_ERROR(Channel) << "StatusCode (" << StatusCode << ") " \
    << caosdb::get_status_description(StatusCode) << ": " << Message; \
    ......
    ......@@ -31,14 +31,16 @@
    #include "caosdb/protobuf_helper.h" // for get_arena
    #include "caosdb/status_code.h" // for StatusCode
    #include "caosdb/transaction_status.h" // for StatusCode
    #include <algorithm> // for max
    #include <future> // for async, future
    #include <google/protobuf/arena.h> // for Arena
    #include <google/protobuf/util/json_util.h> // for MessageToJsonS...
    #include <grpcpp/impl/codegen/completion_queue.h> // for CompletionQueue
    #include <algorithm> // for max
    #include <grpcpp/completion_queue.h> // for CompletionQueue
    #include <iterator> // for iterator, next
    #include <map> // for map
    // IWYU pragma: no_include <ext/alloc_traits.h>
    #include <memory> // for unique_ptr
    #include <mutex> // for mutex
    #include <string> // for string
    #include <utility> // for move
    #include <vector> // for vector
    ......@@ -48,7 +50,8 @@
    * query) can be added as a sub-request to a transaction.
    */
    #define ASSERT_CAN_ADD_RETRIEVAL \
    if (!IsStatus(TransactionStatus::INITIAL()) && !IsStatus(TransactionStatus::GO_ON())) { \
    if (this->status.GetCode() != StatusCode::INITIAL && \
    this->status.GetCode() != StatusCode::GO_ON) { \
    return StatusCode::TRANSACTION_STATUS_ERROR; \
    } \
    switch (this->transaction_type) { \
    ......@@ -84,7 +87,8 @@
    * sub-request to a transaction.
    */
    #define ASSERT_CAN_ADD_DELETION \
    if (!IsStatus(TransactionStatus::INITIAL()) && !IsStatus(TransactionStatus::GO_ON())) { \
    if (this->status.GetCode() != StatusCode::INITIAL && \
    this->status.GetCode() != StatusCode::GO_ON) { \
    return StatusCode::TRANSACTION_STATUS_ERROR; \
    } \
    switch (this->transaction_type) { \
    ......@@ -106,7 +110,8 @@
    * sub-request to a transaction.
    */
    #define ASSERT_CAN_ADD_INSERTION \
    if (!IsStatus(TransactionStatus::INITIAL()) && !IsStatus(TransactionStatus::GO_ON())) { \
    if (this->status.GetCode() != StatusCode::INITIAL && \
    this->status.GetCode() != StatusCode::GO_ON) { \
    return StatusCode::TRANSACTION_STATUS_ERROR; \
    } \
    switch (this->transaction_type) { \
    ......@@ -128,7 +133,8 @@
    * sub-request to a transaction.
    */
    #define ASSERT_CAN_ADD_UPDATE \
    if (!IsStatus(TransactionStatus::INITIAL()) && !IsStatus(TransactionStatus::GO_ON())) { \
    if (this->status.GetCode() != StatusCode::INITIAL && \
    this->status.GetCode() != StatusCode::GO_ON) { \
    return StatusCode::TRANSACTION_STATUS_ERROR; \
    } \
    switch (this->transaction_type) { \
    ......@@ -174,6 +180,7 @@ using caosdb::entity::v1::MultiTransactionResponse;
    using caosdb::entity::v1::RegisterFileUploadRequest;
    using caosdb::entity::v1::RegisterFileUploadResponse;
    using caosdb::transaction::TransactionStatus;
    using RetrieveResponse = caosdb::entity::v1::RetrieveResponse;
    using TransactionResponseCase = caosdb::entity::v1::TransactionResponse::TransactionResponseCase;
    using caosdb::utility::get_arena;
    using google::protobuf::Arena;
    ......@@ -361,10 +368,6 @@ public:
    */
    auto DeleteById(const std::string &id) noexcept -> StatusCode;
    inline auto IsStatus(const TransactionStatus &status) const noexcept -> bool {
    return this->status.GetCode() == status.GetCode();
    };
    /**
    * Execute this transaction in blocking mode and return the status.
    */
    ......@@ -375,8 +378,7 @@ public:
    *
    * A client may request the current status at any time via GetStatus().
    *
    * Use WaitForIt() to join the back-ground execution of this transaction, otherwise the behaviour
    * of getting the ResultSet is undefined.
    * Use WaitForIt() to join the back-ground execution of this transaction.
    */
    auto ExecuteAsynchronously() noexcept -> StatusCode;
    ......@@ -387,7 +389,7 @@ public:
    * Use this after ExecuteAsynchronously(), otherwise the TransactionStatus still remains
    * EXECUTING.
    */
    [[nodiscard]] auto WaitForIt() const noexcept -> TransactionStatus;
    auto WaitForIt() const noexcept -> TransactionStatus;
    /**
    * Return the current status of the transaction.
    ......@@ -398,12 +400,11 @@ public:
    * Return the ResultSet of this transaction.
    *
    * Note: If this method is called before the transaction has terminated
    * (GetStatus().GetCode() < 0) this method has undefined behavior.
    *
    * Instead, do Execute() or WaitForIt() and only call this method afterwards.
    * successfully (as indicated by GetStatus().GetCode == 0) this method has
    * undefined behavior.
    */
    [[nodiscard]] inline auto GetResultSet() const noexcept -> const ResultSet & {
    if (this->result_set == nullptr) {
    if (this->GetStatus().GetCode() < 0) {
    CAOSDB_LOG_ERROR(logger_name)
    << "GetResultSet was called before the transaction has terminated. This is a programming "
    "error of the code which uses the transaction.";
    ......@@ -411,7 +412,12 @@ public:
    // terminates and the result_set is being overriden, the unique_ptr
    // created here will be deleted and any client of the return ResultSet
    // will have a SegFault.
    this->result_set = std::make_unique<MultiResultSet>(std::vector<std::unique_ptr<Entity>>());
    } else if (this->GetStatus().GetCode() == StatusCode::SPOILED) {
    CAOSDB_LOG_ERROR(logger_name)
    << "GetResultSet was called on a \"spoiled\" transaction. That means "
    "that the result set has already been released via "
    "ReleaseResultSet(). This is a programming error of the code which "
    "uses the transaction.";
    }
    return *(this->result_set.get());
    }
    ......@@ -428,7 +434,9 @@ public:
    */
    [[nodiscard]] inline auto ReleaseResultSet() noexcept -> const ResultSet * {
    this->status = TransactionStatus::SPOILED();
    return this->result_set.release();
    auto result_set = this->result_set.release();
    this->result_set = std::make_unique<MultiResultSet>(std::vector<std::unique_ptr<Entity>>());
    return result_set;
    }
    /**
    ......@@ -481,22 +489,21 @@ public:
    */
    inline auto GetUploadFiles() const -> const std::vector<FileDescriptor> & { return upload_files; }
    protected:
    /**
    * Await and process the current handler's results.
    * Cancel this transaction after it has been started with
    * ExecuteAsynchronously().
    *
    * This implies consecutive calls to the handler's OnNext function.
    * This will cancel any active handler and drains the completion_queue.
    */
    auto ProcessCalls() -> TransactionStatus;
    auto Cancel() -> void;
    protected:
    /**
    * Cancels any active handler and drains the completion_queue.
    * Await and process the current handler's results.
    *
    * Can stay protected until ExecuteAsynchronously() is actually asynchronous.
    * Then it is also intended for aborting an execution after it has already
    * started.
    * This implies consecutive calls to the handler's OnNext function.
    */
    auto Cancel() -> void;
    auto ProcessCalls() -> TransactionStatus;
    /**
    * Return the Arena where this transaction may create Message instances.
    ......@@ -508,6 +515,22 @@ protected:
    inline auto GetArena() const -> Arena * { return get_arena(); }
    private:
    /**
    * Await and process the termination of this transaction.
    *
    * To be called at the end of DoExecuteTransaction on success.
    */
    auto ProcessTerminated() const noexcept -> void;
    auto ProcessRetrieveResponse(RetrieveResponse *retrieve_response,
    std::vector<std::unique_ptr<Entity>> *entities,
    bool *set_error) const noexcept -> std::unique_ptr<Entity>;
    /**
    * This functions actually does all the work. Also, it is the one which is
    * going to be send to the background thread by ExecuteAsynchronously.
    */
    auto DoExecuteTransaction() noexcept -> void;
    mutable std::mutex transaction_mutex;
    mutable std::future<void> transaction_future;
    grpc::CompletionQueue completion_queue;
    std::unique_ptr<HandlerInterface> handler_;
    ......@@ -516,7 +539,6 @@ private:
    bool has_query = false;
    TransactionType transaction_type = TransactionType::NONE;
    mutable std::unique_ptr<ResultSet> result_set;
    mutable TransactionStatus status = TransactionStatus::INITIAL();
    std::shared_ptr<EntityTransactionService::Stub> entity_service;
    std::shared_ptr<FileTransmissionService::Stub> file_service;
    ......@@ -524,6 +546,7 @@ private:
    mutable MultiTransactionResponse *response;
    std::string error_message;
    mutable long query_count;
    mutable std::unique_ptr<ResultSet> result_set;
    };
    template <class InputIterator>
    ......
    ......@@ -3,8 +3,8 @@
    #include "caosdb/entity/v1/main.pb.h" // for FileDownloadResponse
    #include "caosdb/handler_interface.h" // for HandlerTag
    #include "caosdb/unary_rpc_handler.h" // for HandlerTag, Handl...
    #include <grpcpp/impl/codegen/async_unary_call.h> // for ClientAsyncRespons...
    #include <grpcpp/impl/codegen/completion_queue.h> // for CompletionQueue
    #include <grpcpp/completion_queue.h> // for CompletionQueue
    #include <grpcpp/support/async_unary_call.h> // for ClientAsyncResponseReader
    #include <memory> // for unique_ptr
    namespace caosdb::transaction {
    ......
    ......@@ -92,6 +92,13 @@ public:
    * This status means that the transaction has been executed successfully.
    */
    CAOSDB_TRANSACTION_STATUS_DEFAULT_FACTORY(SUCCESS, StatusCode::SUCCESS)
    /**
    * Factory for a CANCELLED status.
    *
    * This status means that the transaction has been canceled and should not be
    * used anymore.
    */
    CAOSDB_TRANSACTION_STATUS_DEFAULT_FACTORY(CANCELLED, StatusCode::CANCELLED)
    /**
    * Factory for a CONNECTION_ERROR status.
    *
    ......
    ......@@ -51,9 +51,9 @@
    #include "caosdb/handler_interface.h" // for HandlerTag, Handl...
    #include "caosdb/transaction_status.h" // for TransactionStatus
    #include <grpcpp/impl/codegen/client_context.h> // for ClientContext
    #include <grpcpp/impl/codegen/completion_queue.h> // for CompletionQueue
    #include <grpcpp/impl/codegen/status.h> // for Status
    #include <grpcpp/client_context.h> // for ClientContext
    #include <grpcpp/completion_queue.h> // for CompletionQueue
    #include <grpcpp/support/status.h> // for Status
    namespace caosdb::transaction {
    ......@@ -63,9 +63,11 @@ public:
    : HandlerInterface(), state_(CallState::NewCall), completion_queue(completion_queue) {}
    void Start() override {
    if (state_ == CallState::NewCall) {
    transaction_status = TransactionStatus::EXECUTING();
    OnNext(true);
    }
    }
    bool OnNext(bool ok) override;
    ......
    ......@@ -19,10 +19,6 @@
    *
    */
    #include "caosdb/authentication.h"
    #include <grpcpp/impl/codegen/interceptor.h> // for Status
    #include <grpcpp/impl/codegen/security/auth_context.h> // for AuthContext
    #include <grpcpp/impl/codegen/status.h> // for Status, Status::OK
    #include <grpcpp/impl/codegen/string_ref.h> // for string_ref
    #include <grpcpp/security/credentials.h> // for MetadataCredentialsPlugin
    #include <map> // for multimap
    #include <memory> // for allocator, shared_ptr
    ......
    ......@@ -27,10 +27,10 @@
    #include "caosdb/info/v1/main.pb.h" // for GetVersionInfoRequest
    #include "caosdb/transaction.h" // for Transaction
    #include "caosdb/transaction_status.h" // for TransactionStatus
    #include "grpcpp/impl/codegen/status_code_enum.h" // for StatusCode, UNAUTH...
    #include <grpcpp/client_context.h> // for ClientContext
    #include <grpcpp/create_channel.h> // for CreateChannel
    #include <grpcpp/impl/codegen/client_context.h> // for ClientContext
    #include <grpcpp/impl/codegen/status.h> // for Status
    #include <grpcpp/support/status.h> // for Status
    #include <grpcpp/support/status_code_enum.h> // for StatusCode, UNAUTHENTIC...
    #include <string> // for string, operator+
    namespace caosdb::connection {
    ......
    ......@@ -52,18 +52,18 @@
    #include "caosdb/status_code.h" // for GENERIC_RPC_E...
    #include "caosdb/transaction_status.h" // for TransactionStatus
    #include <exception> // IWYU pragma: keep
    // IWYU pragma: no_include <bits/exception.h>
    #include <filesystem> // for operator<<, path
    #include <google/protobuf/arena.h> // for Arena
    #include <grpcpp/impl/codegen/async_stream.h> // for ClientAsyncRe...
    #include <grpcpp/impl/codegen/client_context.h> // for ClientContext
    #include <grpcpp/impl/codegen/completion_queue.h> // for CompletionQueue
    #include <grpcpp/impl/codegen/status.h> // for Status
    #include <grpcpp/impl/codegen/status_code_enum.h> // for OK, UNAUTHENT...
    #include <grpcpp/client_context.h> // for ClientContext
    #include <grpcpp/completion_queue.h> // for CompletionQueue
    #include <grpcpp/support/async_stream.h> // for ClientAsyncReader
    #include <grpcpp/support/status.h> // for Status
    #include <grpcpp/support/status_code_enum.h> // for OK
    #include <stdexcept> // for runtime_error
    #include <string> // for string, opera...
    #include <utility> // for move
    // IWYU pragma: no_include <bits/exception.h>
    //
    namespace caosdb::transaction {
    using caosdb::StatusCode;
    using caosdb::utility::get_arena;
    ......@@ -116,7 +116,10 @@ bool DownloadRequestHandler::OnNext(bool ok) {
    return true;
    }
    void DownloadRequestHandler::Cancel() { ctx_.TryCancel(); }
    void DownloadRequestHandler::Cancel() {
    state_ = CallState::CallComplete;
    ctx_.TryCancel();
    }
    void DownloadRequestHandler::handleNewCallState() {
    CAOSDB_LOG_TRACE(logger_name) << "Enter DownloadRequestHandler::handleNewCallState. local_path = "
    ......
    ......@@ -48,8 +48,6 @@
    */
    #include "caosdb/file_transmission/register_file_upload_handler.h"
    #include "caosdb/logging.h" // for CAOSDB_LOG_TRACE
    #include <grpcpp/impl/codegen/async_unary_call.h> // for ClientAsyncRes...
    #include <grpcpp/impl/codegen/completion_queue.h> // for CompletionQueue
    namespace caosdb::transaction {
    ......
    ......@@ -54,17 +54,13 @@
    #include <algorithm> // for min
    #include <cstdint> // for uint64_t
    #include <exception> // IWYU pragma: keep
    // IWYU pragma: no_include <bits/exception.h>
    #include <filesystem> // for operator<<, path
    #include <google/protobuf/arena.h> // for Arena
    #include <grpcpp/impl/codegen/async_stream.h> // for ClientAsyncWr...
    #include <grpcpp/impl/codegen/call_op_set.h> // for WriteOptions
    #include <grpcpp/impl/codegen/client_context.h> // for ClientContext
    #include <grpcpp/impl/codegen/completion_queue.h> // for CompletionQueue
    #include <grpcpp/impl/codegen/status.h> // for Status
    #include <grpcpp/impl/codegen/status_code_enum.h> // for OK, UNAUTHENT...
    #include <grpcpp/support/status_code_enum.h> // for OK
    #include <string> // for basic_string
    #include <utility> // for move
    // IWYU pragma: no_include <bits/exception.h>
    namespace caosdb::transaction {
    using caosdb::StatusCode;
    ......@@ -119,7 +115,10 @@ bool UploadRequestHandler::OnNext(bool ok) {
    return true;
    }
    void UploadRequestHandler::Cancel() { ctx_.TryCancel(); }
    void UploadRequestHandler::Cancel() {
    state_ = CallState::CallComplete;
    ctx_.TryCancel();
    }
    void UploadRequestHandler::handleNewCallState() {
    auto filename = file_descriptor_.local_path;
    ......
    ......@@ -40,7 +40,6 @@
    #include <boost/smart_ptr/shared_ptr.hpp>
    #include <cstdint> // for uint64_t
    #include <memory>
    #include <ostream> // for ostream
    #include <sstream>
    #include <string>
    #include <utility> // for move
    ......
    ......@@ -30,14 +30,16 @@
    #include <algorithm> // for max
    #include <exception> // IWYU pragma: keep
    #include <filesystem> // for operator<<, path
    #include <future> // for async, future
    #include <google/protobuf/arena.h> // for Arena
    #include <grpc/impl/codegen/gpr_types.h> // for gpr_timespec
    #include <grpcpp/impl/codegen/completion_queue.h> // for CompletionQueue
    #include <map> // for map, operator!=
    #include <memory> // for unique_ptr
    #include <random> // for mt19937, rand...
    #include <system_error> // for std::system_error
    #include <utility> // for move, pair
    // IWYU pragma: no_include <bits/exception.h>
    // IWYU pragma: no_include <cxxabi.h>
    // IWYU pragma: no_include "net/proto2/public/repeated_field.h"
    namespace caosdb::transaction {
    ......@@ -47,8 +49,10 @@ using caosdb::entity::v1::MultiTransactionRequest;
    using caosdb::entity::v1::MultiTransactionResponse;
    using TransactionResponseCase = caosdb::entity::v1::TransactionResponse::TransactionResponseCase;
    using RetrieveResponseCase = caosdb::entity::v1::RetrieveResponse::RetrieveResponseCase;
    using RetrieveResponse = caosdb::entity::v1::RetrieveResponse;
    using ProtoEntity = caosdb::entity::v1::Entity;
    using caosdb::entity::v1::EntityRequest;
    using google::protobuf::Arena;
    using NextStatus = grpc::CompletionQueue::NextStatus;
    using RegistrationStatus = caosdb::entity::v1::RegistrationStatus;
    ......@@ -86,7 +90,8 @@ Transaction::Transaction(std::shared_ptr<EntityTransactionService::Stub> entity_
    std::shared_ptr<FileTransmissionService::Stub> file_service)
    : entity_service(std::move(entity_service)), file_service(std::move(file_service)),
    request(Arena::CreateMessage<MultiTransactionRequest>(GetArena())),
    response(Arena::CreateMessage<MultiTransactionResponse>(GetArena())), query_count(-1) {}
    response(Arena::CreateMessage<MultiTransactionResponse>(GetArena())), query_count(-1),
    result_set(std::make_unique<MultiResultSet>(std::vector<std::unique_ptr<Entity>>())) {}
    auto Transaction::RetrieveById(const std::string &id) noexcept -> StatusCode {
    ASSERT_CAN_ADD_RETRIEVAL
    ......@@ -194,59 +199,62 @@ auto Transaction::Execute() -> TransactionStatus {
    return status;
    }
    // TODO(tf) This has apparently a cognitive complexity of 39>25 (threshold).
    auto Transaction::ExecuteAsynchronously() noexcept -> StatusCode { // NOLINT
    if (!IsStatus(TransactionStatus::READY()) && !IsStatus(TransactionStatus::GO_ON())) {
    return StatusCode::TRANSACTION_STATUS_ERROR;
    }
    switch (this->transaction_type) {
    case MIXED_READ_AND_WRITE:
    CAOSDB_LOG_ERROR_AND_RETURN_STATUS(
    logger_name, StatusCode::UNSUPPORTED_FEATURE,
    "MIXED_READ_AND_WRITE UNSUPPORTED: The current implementation does not support "
    "mixed read and write transactions (containing retrievals, insertions, "
    "deletions, and updates in one transaction).")
    default:
    break;
    }
    this->status = TransactionStatus::EXECUTING();
    // NOLINTNEXTLINE
    #define TRANSACTION_SYNCRONIZED_BLOCK \
    const std::lock_guard<std::mutex> lock(this->transaction_mutex);
    // NOLINTNEXTLINE
    auto Transaction::DoExecuteTransaction() noexcept -> void {
    // upload files first
    if (!upload_files.empty()) {
    CAOSDB_LOG_INFO(logger_name) << "Number of files to be uploaded: " << upload_files.size();
    auto *registration_request = Arena::CreateMessage<RegisterFileUploadRequest>(GetArena());
    auto *registration_response = Arena::CreateMessage<RegisterFileUploadResponse>(GetArena());
    auto *registration_request = Arena::CreateMessage<RegisterFileUploadRequest>(get_arena());
    auto *registration_response = Arena::CreateMessage<RegisterFileUploadResponse>(get_arena());
    handler_ =
    std::make_unique<RegisterFileUploadHandler>(&handler_, file_service.get(), &completion_queue,
    registration_request, registration_response);
    {
    TRANSACTION_SYNCRONIZED_BLOCK
    if (this->status.GetCode() == StatusCode::EXECUTING) {
    handler_ = std::make_unique<RegisterFileUploadHandler>(
    &handler_, file_service.get(), &completion_queue, registration_request,
    registration_response);
    }
    }
    this->status = ProcessCalls();
    if (registration_response->status() != RegistrationStatus::REGISTRATION_STATUS_ACCEPTED) {
    this->status = TransactionStatus::FILE_UPLOAD_ERROR();
    return StatusCode::EXECUTING;
    }
    for (auto &file_descriptor : upload_files) {
    file_descriptor.file_transmission_id->set_registration_id(
    registration_response->registration_id());
    {
    TRANSACTION_SYNCRONIZED_BLOCK
    if (this->status.GetCode() == StatusCode::EXECUTING) {
    CAOSDB_LOG_INFO(logger_name) << "Uploading " << file_descriptor.local_path;
    handler_ = std::make_unique<UploadRequestHandler>(&handler_, file_service.get(),
    &completion_queue, file_descriptor);
    }
    }
    this->status = ProcessCalls();
    if (this->status.GetCode() != StatusCode::EXECUTING) {
    return StatusCode::EXECUTING;
    // Return early, there has been an error.
    return;
    }
    }
    }
    if (this->status.GetCode() == StatusCode::EXECUTING) {
    {
    TRANSACTION_SYNCRONIZED_BLOCK
    if (this->status.GetCode() == StatusCode::EXECUTING) {
    CAOSDB_LOG_DEBUG(logger_name) << "RPC Request: " << RequestToString();
    handler_ = std::make_unique<EntityTransactionHandler>(&handler_, entity_service.get(),
    &completion_queue, request, response);
    }
    }
    this->status = ProcessCalls();
    if (this->status.GetCode() != StatusCode::EXECUTING) {
    return StatusCode::EXECUTING;
    }
    // file download afterwards
    ......@@ -267,38 +275,57 @@ auto Transaction::ExecuteAsynchronously() noexcept -> StatusCode { // NOLINT
    for (const auto &item : download_files) {
    auto file_descriptor(item.second);
    {
    TRANSACTION_SYNCRONIZED_BLOCK
    if (this->status.GetCode() == StatusCode::EXECUTING) {
    CAOSDB_LOG_INFO(logger_name) << "Downloading " << file_descriptor.local_path;
    handler_ = std::make_unique<DownloadRequestHandler>(&handler_, file_service.get(),
    &completion_queue, file_descriptor);
    }
    }
    this->status = ProcessCalls();
    if (this->status.GetCode() != StatusCode::EXECUTING) {
    return StatusCode::EXECUTING;
    // this indicates an error during the download
    return;
    }
    }
    }
    return StatusCode::EXECUTING;
    if (this->status.GetCode() == StatusCode::EXECUTING) {
    ProcessTerminated();
    }
    }
    // TODO(tf) This has apparently a cognitive complexity of 36>25 (threshold).
    auto Transaction::WaitForIt() const noexcept -> TransactionStatus { // NOLINT
    if (this->status.GetCode() != StatusCode::EXECUTING) {
    return this->status;
    auto Transaction::ExecuteAsynchronously() noexcept -> StatusCode {
    TRANSACTION_SYNCRONIZED_BLOCK
    if (this->status.GetCode() != StatusCode::READY && this->status.GetCode() != StatusCode::GO_ON) {
    return StatusCode::TRANSACTION_STATUS_ERROR;
    }
    this->status = TransactionStatus::SUCCESS();
    bool set_error = false;
    auto *responses = this->response->mutable_responses();
    std::vector<std::unique_ptr<Entity>> entities;
    for (auto &sub_response : *responses) {
    std::unique_ptr<Entity> result;
    switch (sub_response.transaction_response_case()) {
    switch (this->transaction_type) {
    case MIXED_READ_AND_WRITE:
    CAOSDB_LOG_ERROR_AND_RETURN_STATUS(
    logger_name, StatusCode::UNSUPPORTED_FEATURE,
    "MIXED_WRITE UNSUPPORTED: The current implementation does not support "
    "mixed read and write transactions (containing retrievals, insertions, "
    "deletions, and updates in one transaction).")
    default:
    break;
    }
    this->status = TransactionStatus::EXECUTING();
    case TransactionResponseCase::kRetrieveResponse: {
    auto *retrieve_response = sub_response.mutable_retrieve_response();
    this->transaction_future = std::async(std::launch::async, [this]() { DoExecuteTransaction(); });
    return StatusCode::EXECUTING;
    }
    auto Transaction::ProcessRetrieveResponse(RetrieveResponse *retrieve_response,
    std::vector<std::unique_ptr<Entity>> *entities,
    bool *set_error) const noexcept
    -> std::unique_ptr<Entity> {
    std::unique_ptr<Entity> result;
    switch (retrieve_response->retrieve_response_case()) {
    case RetrieveResponseCase::kEntityResponse: {
    auto *retrieve_entity_response = retrieve_response->mutable_entity_response();
    auto *retrieve_entity_response = retrieve_response->release_entity_response();
    result = std::make_unique<Entity>(retrieve_entity_response);
    } break;
    case RetrieveResponseCase::kSelectResult: {
    ......@@ -311,23 +338,35 @@ auto Transaction::WaitForIt() const noexcept -> TransactionStatus { // NOLINT
    } break;
    case RetrieveResponseCase::kFindResult: {
    std::unique_ptr<Entity> find_result;
    for (auto &entity_response :
    *retrieve_response->mutable_find_result()->mutable_result_set()) {
    for (auto &entity_response : *retrieve_response->mutable_find_result()->mutable_result_set()) {
    find_result = std::make_unique<Entity>(&entity_response);
    if (find_result->HasErrors()) {
    set_error = true;
    *set_error = true;
    }
    entities.push_back(std::move(find_result));
    entities->push_back(std::move(find_result));
    }
    } break;
    default:
    CAOSDB_LOG_FATAL(logger_name) << "Received invalid QueryResponseCase.";
    break;
    }
    return result;
    }
    auto Transaction::ProcessTerminated() const noexcept -> void {
    CAOSDB_LOG_TRACE_ENTER_AND_LEAVE(logger_name, "Transaction::ProcessTerminated()")
    bool set_error = false;
    auto *responses = this->response->mutable_responses();
    std::vector<std::unique_ptr<Entity>> entities;
    for (auto &sub_response : *responses) {
    std::unique_ptr<Entity> result;
    switch (sub_response.transaction_response_case()) {
    case TransactionResponseCase::kRetrieveResponse: {
    auto *retrieve_response = sub_response.mutable_retrieve_response();
    result = ProcessRetrieveResponse(retrieve_response, &entities, &set_error);
    break; // break TransactionResponseCase::kRetrieveResponse
    }
    case TransactionResponseCase::kInsertResponse: {
    auto *inserted_id_response = sub_response.mutable_insert_response()->mutable_id_response();
    result = std::make_unique<Entity>(inserted_id_response);
    ......@@ -367,19 +406,41 @@ auto Transaction::WaitForIt() const noexcept -> TransactionStatus { // NOLINT
    if (set_error) {
    this->status = TransactionStatus::TRANSACTION_ERROR("The request terminated with errors.");
    } else {
    this->status = TransactionStatus::SUCCESS();
    }
    }
    auto Transaction::WaitForIt() const noexcept -> TransactionStatus {
    if (this->status.GetCode() != StatusCode::EXECUTING) {
    return this->status;
    }
    this->transaction_future.wait();
    return this->status;
    }
    auto Transaction::ProcessCalls() -> TransactionStatus {
    CAOSDB_LOG_TRACE_ENTER_AND_LEAVE(logger_name, "Transaction::ProcessCalls()") {
    TRANSACTION_SYNCRONIZED_BLOCK
    if (this->status.GetCode() != StatusCode::EXECUTING) {
    CAOSDB_LOG_ERROR(logger_name)
    << "Transaction::ProcessCalls() was called, TransactionStatus was: "
    << std::to_string(this->status.GetCode()) << " - " << this->status.GetDescription();
    return status;
    }
    handler_->Start();
    }
    gpr_timespec deadline;
    deadline.tv_sec = 1;
    deadline.tv_nsec = 0;
    deadline.clock_type = gpr_clock_type::GPR_TIMESPAN;
    TransactionStatus result = TransactionStatus::EXECUTING();
    handler_->Start();
    void *tag = nullptr;
    bool ok = false;
    while (true) {
    ......@@ -390,20 +451,17 @@ auto Transaction::ProcessCalls() -> TransactionStatus {
    if (!res) {
    // The handler has finished it's work
    result = handler_->GetStatus();
    handler_.reset();
    return result;
    }
    } else {
    std::string description("Invalid tag delivered by notification queue.");
    CAOSDB_LOG_ERROR(logger_name) << description;
    handler_.reset();
    return TransactionStatus::RPC_ERROR(description);
    }
    } break;
    case NextStatus::SHUTDOWN: {
    CAOSDB_LOG_ERROR(logger_name) << "Notification queue has been shut down unexpectedly.";
    result = handler_->GetStatus();
    handler_.reset();
    return result;
    } break;
    case NextStatus::TIMEOUT: {
    ......@@ -412,17 +470,35 @@ auto Transaction::ProcessCalls() -> TransactionStatus {
    default:
    CAOSDB_LOG_FATAL(logger_name) << "Got an invalid NextStatus from CompletionQueue.";
    result = handler_->GetStatus();
    handler_.reset();
    return result;
    }
    }
    result = handler_->GetStatus();
    handler_.reset();
    return result;
    }
    Transaction::~Transaction() {
    CAOSDB_LOG_TRACE_ENTER_AND_LEAVE(logger_name, "Transaction::~Transaction()")
    this->Cancel();
    }
    void Transaction::Cancel() {
    CAOSDB_LOG_TRACE_ENTER_AND_LEAVE(logger_name, "Transaction::Cancel()")
    if (this->status.GetCode() == StatusCode::CANCELLED) {
    return;
    }
    {
    TRANSACTION_SYNCRONIZED_BLOCK
    if (handler_ != nullptr) {
    handler_->Cancel();
    }
    this->status = TransactionStatus::CANCELLED();
    completion_queue.Shutdown();
    ......@@ -434,10 +510,8 @@ Transaction::~Transaction() {
    }
    }
    void Transaction::Cancel() {
    // TODO(tf) State Canceled
    if (handler_ != nullptr) {
    handler_->Cancel();
    if (transaction_future.valid()) {
    transaction_future.wait();
    }
    }
    ......
    ......@@ -2,8 +2,6 @@
    #include "caosdb/logging.h" // for CAOSDB_LOG_TRACE
    #include <exception> // IWYU pragma: keep
    // IWYU pragma: no_include <bits/exception.h>
    #include <grpcpp/impl/codegen/async_unary_call.h> // for ClientAsyncRes...
    #include <grpcpp/impl/codegen/completion_queue.h> // for CompletionQueue
    namespace caosdb::transaction {
    ......@@ -16,17 +14,13 @@ EntityTransactionHandler::EntityTransactionHandler(HandlerTag tag,
    response_(response) {}
    void EntityTransactionHandler::handleNewCallState() {
    CAOSDB_LOG_TRACE(logger_name) << "Enter EntityTransactionHandler::handleNewCallState with "
    "CompletionQueue "
    << completion_queue;
    CAOSDB_LOG_TRACE_ENTER_AND_LEAVE(logger_name, "EntityTransactionHandler::handleNewCallState()")
    rpc_ = stub_->PrepareAsyncMultiTransaction(&call_context, *request_, completion_queue);
    state_ = CallState::CallComplete;
    rpc_->StartCall();
    rpc_->Finish(response_, &status_, tag_);
    CAOSDB_LOG_TRACE(logger_name) << "Leave EntityTransactionHandler::handleNewCallState";
    }
    } // namespace caosdb::transaction
    ......@@ -49,16 +49,15 @@
    #include "caosdb/unary_rpc_handler.h"
    #include "caosdb/logging.h" // for CAOSDB_LOG_TRACE
    #include "caosdb/status_code.h" // for GENERIC_RPC_E...
    // IWYU pragma: no_include <bits/exception.h>
    #include <exception> // IWYU pragma: keep
    #include <grpcpp/impl/codegen/client_context.h> // for ClientContext
    #include <grpcpp/impl/codegen/status.h> // for Status
    #include <grpcpp/impl/codegen/status_code_enum.h> // for OK, UNAUTHENT...
    #include <grpcpp/support/status_code_enum.h> // for OK
    #include <string> // for string, opera...
    // IWYU pragma: no_include <bits/exception.h>
    namespace caosdb::transaction {
    bool UnaryRpcHandler::OnNext(bool ok) {
    CAOSDB_LOG_TRACE_ENTER_AND_LEAVE(logger_name, "UnaryRpcHandler::OnNext(bool)")
    try {
    if (ok) {
    if (state_ == CallState::NewCall) {
    ......@@ -99,10 +98,14 @@ bool UnaryRpcHandler::OnNext(bool ok) {
    return false;
    }
    void UnaryRpcHandler::Cancel() { call_context.TryCancel(); }
    void UnaryRpcHandler::Cancel() {
    state_ = CallState::CallComplete;
    transaction_status = TransactionStatus::CANCELLED();
    call_context.TryCancel();
    }
    void UnaryRpcHandler::handleCallCompleteState() {
    CAOSDB_LOG_TRACE(logger_name) << "Enter UnaryRpcHandler::handleCallCompleteState";
    CAOSDB_LOG_TRACE_ENTER_AND_LEAVE(logger_name, "UnaryRpcHandler::handleCallCompleteState()")
    switch (status_.error_code()) {
    case grpc::OK:
    ......@@ -117,8 +120,6 @@ void UnaryRpcHandler::handleCallCompleteState() {
    << "): " << description;
    break;
    }
    CAOSDB_LOG_TRACE(logger_name) << "Leave UnaryRpcHandler::handleCallCompleteState";
    }
    } // namespace caosdb::transaction
    0% Loading or .
    You are about to add 0 people to the discussion. Proceed with caution.
    Please register or to comment