Skip to content
Snippets Groups Projects
Verified Commit 02be7c66 authored by Timm Fitschen's avatar Timm Fitschen
Browse files

Bump grpc to 1.45.2, clean up transaction

parent 1ebb197c
No related branches found
No related tags found
2 merge requests!42Release 0.2.0,!28F async execute
Pipeline #24694 passed with warnings
Pipeline: caosdb-cppinttest

#24695

    ......@@ -14,7 +14,7 @@ class CaosdbConan(ConanFile):
    default_options = {"shared": False, "fPIC": True}
    generators = "cmake"
    requires = [
    ("grpc/1.39.1"),
    ("grpc/1.45.2"),
    ]
    build_requires = [
    ("boost/1.77.0"),
    ......
    ......@@ -28,11 +28,11 @@
    * @brief Configuration and setup of the client authentication.
    */
    #include "caosdb/utility.h" // for base64_encode
    #include <grpcpp/impl/codegen/interceptor.h> // for Status
    #include <grpcpp/impl/codegen/security/auth_context.h> // for AuthContext
    #include <grpcpp/impl/codegen/status.h> // for Status
    #include <grpcpp/impl/codegen/string_ref.h> // for string_ref
    #include <grpcpp/security/auth_context.h> // for AuthContext
    #include <grpcpp/security/credentials.h> // for CallCredentials
    #include <grpcpp/support/interceptor.h> // for Status
    #include <grpcpp/support/status.h> // for Status
    #include <grpcpp/support/string_ref.h> // for string_ref
    #include <map> // for multimap
    #include <memory> // for shared_ptr
    #include <string> // for string
    ......
    ......@@ -47,6 +47,7 @@
    #include <string> // for string, basic...
    #include <utility> // for move
    #include <vector> // for vector
    // IWYU pragma: no_include "net/proto2/public/repeated_field.h"
    namespace caosdb::entity {
    using caosdb::entity::v1::IdResponse;
    ......
    ......@@ -35,7 +35,7 @@
    #include <future> // for async, future
    #include <google/protobuf/arena.h> // for Arena
    #include <google/protobuf/util/json_util.h> // for MessageToJsonS...
    #include <grpcpp/impl/codegen/completion_queue.h> // for CompletionQueue
    #include <grpcpp/completion_queue.h> // for CompletionQueue
    #include <iterator> // for iterator, next
    #include <map> // for map
    // IWYU pragma: no_include <ext/alloc_traits.h>
    ......@@ -378,8 +378,7 @@ public:
    *
    * A client may request the current status at any time via GetStatus().
    *
    * Use WaitForIt() to join the back-ground execution of this transaction, otherwise the behaviour
    * of getting the ResultSet is undefined.
    * Use WaitForIt() to join the back-ground execution of this transaction.
    */
    auto ExecuteAsynchronously() noexcept -> StatusCode;
    ......@@ -390,7 +389,7 @@ public:
    * Use this after ExecuteAsynchronously(), otherwise the TransactionStatus still remains
    * EXECUTING.
    */
    [[nodiscard]] auto WaitForIt() const noexcept -> TransactionStatus;
    auto WaitForIt() const noexcept -> TransactionStatus;
    /**
    * Return the current status of the transaction.
    ......@@ -401,20 +400,20 @@ public:
    * Return the ResultSet of this transaction.
    *
    * Note: If this method is called before the transaction has terminated
    * (GetStatus().GetCode() < 0) this method has undefined behavior.
    *
    * Instead, do Execute() or WaitForIt() and only call this method afterwards.
    * successfully (as indicated by GetStatus().GetCode == 0) this method has
    * undefined behavior.
    */
    [[nodiscard]] inline auto GetResultSet() const noexcept -> const ResultSet & {
    if (this->result_set == nullptr) {
    if (this->GetStatus().GetCode() < 0) {
    CAOSDB_LOG_ERROR(logger_name)
    << "GetResultSet was called before the transaction has terminated. This is a programming "
    "error of the code which uses the transaction.";
    // TODO(tf) This is a really bad SegFault factory. When the transaction
    // terminates and the result_set is being overriden, the unique_ptr
    // created here will be deleted and any client of the return ResultSet
    // will have a SegFault.
    this->result_set = std::make_unique<MultiResultSet>(std::vector<std::unique_ptr<Entity>>());
    } else if (this->GetStatus().GetCode() == StatusCode::SPOILED) {
    CAOSDB_LOG_ERROR(logger_name)
    << "GetResultSet was called on a \"spoiled\" transaction. That means "
    "that the result set has already been released via "
    "ReleaseResultSet(). This is a programming error of the code which "
    "uses the transaction.";
    }
    return *(this->result_set.get());
    }
    ......@@ -431,7 +430,9 @@ public:
    */
    [[nodiscard]] inline auto ReleaseResultSet() noexcept -> const ResultSet * {
    this->status = TransactionStatus::SPOILED();
    return this->result_set.release();
    auto result_set = this->result_set.release();
    this->result_set = std::make_unique<MultiResultSet>(std::vector<std::unique_ptr<Entity>>());
    return result_set;
    }
    /**
    ......@@ -484,22 +485,21 @@ public:
    */
    inline auto GetUploadFiles() const -> const std::vector<FileDescriptor> & { return upload_files; }
    protected:
    /**
    * Await and process the current handler's results.
    * Cancel this transaction after it has been started with
    * ExecuteAsynchronously().
    *
    * This implies consecutive calls to the handler's OnNext function.
    * This will cancel any active handler and drains the completion_queue.
    */
    auto ProcessCalls() -> TransactionStatus;
    auto Cancel() -> void;
    protected:
    /**
    * Cancels any active handler and drains the completion_queue.
    * Await and process the current handler's results.
    *
    * Can stay protected until ExecuteAsynchronously() is actually asynchronous.
    * Then it is also intended for aborting an execution after it has already
    * started.
    * This implies consecutive calls to the handler's OnNext function.
    */
    auto Cancel() -> void;
    auto ProcessCalls() -> TransactionStatus;
    /**
    * Return the Arena where this transaction may create Message instances.
    ......@@ -511,13 +511,22 @@ protected:
    inline auto GetArena() const -> Arena * { return get_arena(); }
    private:
    auto ProcessTerminated() const noexcept -> TransactionStatus;
    /**
    * Await and process the termination of this transaction.
    *
    * To be called at the end of DoExecuteTransaction on success.
    */
    auto ProcessTerminated() const noexcept -> void;
    auto ProcessRetrieveResponse(RetrieveResponse *retrieve_response,
    std::vector<std::unique_ptr<Entity>> *entities,
    bool *set_error) const noexcept -> std::unique_ptr<Entity>;
    auto DoExecuteTransaction() noexcept -> StatusCode;
    /**
    * This functions actually does all the work. Also, it is the one which is
    * going to be send to the background thread by ExecuteAsynchronously.
    */
    auto DoExecuteTransaction() noexcept -> void;
    mutable std::mutex transaction_mutex;
    mutable std::future<StatusCode> transaction_future;
    mutable std::future<void> transaction_future;
    grpc::CompletionQueue completion_queue;
    std::unique_ptr<HandlerInterface> handler_;
    ......@@ -526,7 +535,6 @@ private:
    bool has_query = false;
    TransactionType transaction_type = TransactionType::NONE;
    mutable std::unique_ptr<ResultSet> result_set;
    mutable TransactionStatus status = TransactionStatus::INITIAL();
    std::shared_ptr<EntityTransactionService::Stub> entity_service;
    std::shared_ptr<FileTransmissionService::Stub> file_service;
    ......@@ -534,6 +542,7 @@ private:
    mutable MultiTransactionResponse *response;
    std::string error_message;
    mutable long query_count;
    mutable std::unique_ptr<ResultSet> result_set;
    };
    template <class InputIterator>
    ......
    ......@@ -19,10 +19,6 @@
    *
    */
    #include "caosdb/authentication.h"
    #include <grpcpp/impl/codegen/interceptor.h> // for Status
    #include <grpcpp/impl/codegen/security/auth_context.h> // for AuthContext
    #include <grpcpp/impl/codegen/status.h> // for Status, Status::OK
    #include <grpcpp/impl/codegen/string_ref.h> // for string_ref
    #include <grpcpp/security/credentials.h> // for MetadataCredentialsPlugin
    #include <map> // for multimap
    #include <memory> // for allocator, shared_ptr
    ......
    ......@@ -27,10 +27,10 @@
    #include "caosdb/info/v1/main.pb.h" // for GetVersionInfoRequest
    #include "caosdb/transaction.h" // for Transaction
    #include "caosdb/transaction_status.h" // for TransactionStatus
    #include "grpcpp/impl/codegen/status_code_enum.h" // for StatusCode, UNAUTH...
    #include <grpcpp/client_context.h> // for ClientContext
    #include <grpcpp/create_channel.h> // for CreateChannel
    #include <grpcpp/impl/codegen/client_context.h> // for ClientContext
    #include <grpcpp/impl/codegen/status.h> // for Status
    #include <grpcpp/support/status.h> // for Status
    #include <grpcpp/support/status_code_enum.h> // for StatusCode, UNAUTHENTIC...
    #include <string> // for string, operator+
    namespace caosdb::connection {
    ......
    ......@@ -29,19 +29,22 @@
    #include "caosdb/status_code.h" // for StatusCode
    #include "caosdb/transaction_handler.h" // for EntityTransactionHandler
    #include <algorithm> // for max
    //
    #include <chrono> // for chrono_literals
    #include <exception> // IWYU pragma: keep
    #include <filesystem> // for operator<<, path
    #include <future> // for async, future
    #include <google/protobuf/arena.h> // for Arena
    #include <grpc/impl/codegen/gpr_types.h> // for gpr_timespec
    #include <map> // for map, operator!=
    #include <memory> // for unique_ptr
    #include <random> // for mt19937, rand...
    #include <system_error> // for std::system_error
    #include <thread> // for sleep
    #include <utility> // for move, pair
    // IWYU pragma: no_include <bits/exception.h>
    // IWYU pragma: no_include <cxxabi.h>
    #include <exception> // IWYU pragma: keep
    #include <filesystem> // for operator<<, path
    #include <future> // for async, future
    #include <google/protobuf/arena.h> // for Arena
    #include <grpc/impl/codegen/gpr_types.h> // for gpr_timespec
    #include <grpcpp/impl/codegen/completion_queue.h> // for CompletionQueue
    #include <map> // for map, operator!=
    #include <memory> // for unique_ptr
    #include <random> // for mt19937, rand...
    #include <system_error> // for std::system_error
    #include <utility> // for move, pair
    // IWYU pragma: no_include "net/proto2/public/repeated_field.h"
    namespace caosdb::transaction {
    using caosdb::entity::v1::EntityTransactionService;
    ......@@ -91,7 +94,8 @@ Transaction::Transaction(std::shared_ptr<EntityTransactionService::Stub> entity_
    std::shared_ptr<FileTransmissionService::Stub> file_service)
    : entity_service(std::move(entity_service)), file_service(std::move(file_service)),
    request(Arena::CreateMessage<MultiTransactionRequest>(GetArena())),
    response(Arena::CreateMessage<MultiTransactionResponse>(GetArena())), query_count(-1) {}
    response(Arena::CreateMessage<MultiTransactionResponse>(GetArena())), query_count(-1),
    result_set(std::make_unique<MultiResultSet>(std::vector<std::unique_ptr<Entity>>())) {}
    auto Transaction::RetrieveById(const std::string &id) noexcept -> StatusCode {
    ASSERT_CAN_ADD_RETRIEVAL
    ......@@ -200,7 +204,11 @@ auto Transaction::Execute() -> TransactionStatus {
    }
    // NOLINTNEXTLINE
    auto Transaction::DoExecuteTransaction() noexcept -> StatusCode {
    #define TRANSACTION_SYNCRONIZED_BLOCK \
    const std::lock_guard<std::mutex> lock(this->transaction_mutex);
    // NOLINTNEXTLINE
    auto Transaction::DoExecuteTransaction() noexcept -> void {
    // upload files first
    if (!upload_files.empty()) {
    CAOSDB_LOG_INFO(logger_name) << "Number of files to be uploaded: " << upload_files.size();
    ......@@ -209,7 +217,7 @@ auto Transaction::DoExecuteTransaction() noexcept -> StatusCode {
    auto *registration_response = Arena::CreateMessage<RegisterFileUploadResponse>(get_arena());
    {
    const std::lock_guard<std::mutex> lock(this->transaction_mutex);
    TRANSACTION_SYNCRONIZED_BLOCK
    if (this->status.GetCode() == StatusCode::EXECUTING) {
    handler_ = std::make_unique<RegisterFileUploadHandler>(
    &handler_, file_service.get(), &completion_queue, registration_request,
    ......@@ -220,14 +228,13 @@ auto Transaction::DoExecuteTransaction() noexcept -> StatusCode {
    if (registration_response->status() != RegistrationStatus::REGISTRATION_STATUS_ACCEPTED) {
    this->status = TransactionStatus::FILE_UPLOAD_ERROR();
    return this->status.GetCode();
    }
    for (auto &file_descriptor : upload_files) {
    file_descriptor.file_transmission_id->set_registration_id(
    registration_response->registration_id());
    {
    const std::lock_guard<std::mutex> lock(this->transaction_mutex);
    TRANSACTION_SYNCRONIZED_BLOCK
    if (this->status.GetCode() == StatusCode::EXECUTING) {
    CAOSDB_LOG_INFO(logger_name) << "Uploading " << file_descriptor.local_path;
    handler_ = std::make_unique<UploadRequestHandler>(&handler_, file_service.get(),
    ......@@ -236,15 +243,15 @@ auto Transaction::DoExecuteTransaction() noexcept -> StatusCode {
    }
    this->status = ProcessCalls();
    if (this->status.GetCode() != StatusCode::EXECUTING) {
    // this indicates an error during the upload
    return this->status.GetCode();
    // Return early, there has been an error.
    return;
    }
    }
    }
    if (this->status.GetCode() == StatusCode::EXECUTING) {
    {
    const std::lock_guard<std::mutex> lock(this->transaction_mutex);
    TRANSACTION_SYNCRONIZED_BLOCK
    if (this->status.GetCode() == StatusCode::EXECUTING) {
    CAOSDB_LOG_DEBUG(logger_name) << "RPC Request: " << RequestToString();
    handler_ = std::make_unique<EntityTransactionHandler>(&handler_, entity_service.get(),
    ......@@ -273,7 +280,7 @@ auto Transaction::DoExecuteTransaction() noexcept -> StatusCode {
    for (const auto &item : download_files) {
    auto file_descriptor(item.second);
    {
    const std::lock_guard<std::mutex> lock(this->transaction_mutex);
    TRANSACTION_SYNCRONIZED_BLOCK
    if (this->status.GetCode() == StatusCode::EXECUTING) {
    CAOSDB_LOG_INFO(logger_name) << "Downloading " << file_descriptor.local_path;
    ......@@ -284,19 +291,17 @@ auto Transaction::DoExecuteTransaction() noexcept -> StatusCode {
    this->status = ProcessCalls();
    if (this->status.GetCode() != StatusCode::EXECUTING) {
    // this indicates an error during the download
    break;
    return this->status.GetCode();
    return;
    }
    }
    }
    if (this->status.GetCode() == StatusCode::EXECUTING) {
    return ProcessTerminated().GetCode();
    ProcessTerminated();
    }
    return this->status.GetCode();
    }
    auto Transaction::ExecuteAsynchronously() noexcept -> StatusCode {
    const std::lock_guard<std::mutex> lock(this->transaction_mutex);
    TRANSACTION_SYNCRONIZED_BLOCK
    if (this->status.GetCode() != StatusCode::READY && this->status.GetCode() != StatusCode::GO_ON) {
    return StatusCode::TRANSACTION_STATUS_ERROR;
    }
    ......@@ -312,8 +317,7 @@ auto Transaction::ExecuteAsynchronously() noexcept -> StatusCode {
    }
    this->status = TransactionStatus::EXECUTING();
    this->transaction_future =
    std::async(std::launch::async, [this]() { return this->DoExecuteTransaction(); });
    this->transaction_future = std::async(std::launch::async, [this]() { DoExecuteTransaction(); });
    return StatusCode::EXECUTING;
    }
    ......@@ -353,7 +357,7 @@ auto Transaction::ProcessRetrieveResponse(RetrieveResponse *retrieve_response,
    return result;
    }
    auto Transaction::ProcessTerminated() const noexcept -> TransactionStatus {
    auto Transaction::ProcessTerminated() const noexcept -> void {
    CAOSDB_LOG_TRACE_ENTER_AND_LEAVE(logger_name, "Transaction::ProcessTerminated()")
    bool set_error = false;
    auto *responses = this->response->mutable_responses();
    ......@@ -409,8 +413,6 @@ auto Transaction::ProcessTerminated() const noexcept -> TransactionStatus {
    } else {
    this->status = TransactionStatus::SUCCESS();
    }
    return this->status;
    }
    auto Transaction::WaitForIt() const noexcept -> TransactionStatus {
    ......@@ -485,7 +487,18 @@ Transaction::~Transaction() {
    void Transaction::Cancel() {
    CAOSDB_LOG_TRACE_ENTER_AND_LEAVE(logger_name, "Transaction::Cancel()")
    const std::lock_guard<std::mutex> lock(this->transaction_mutex);
    TRANSACTION_SYNCRONIZED_BLOCK
    if (this->status.GetCode() > 0) {
    // Prevent canceling before the queue even started.
    // Temporary fix for a bug in GRPC.
    // Fix is in the making:
    // https://github.com/grpc/grpc/pull/30004
    using namespace std::chrono_literals;
    std::this_thread::sleep_for(5ms);
    }
    this->status = TransactionStatus::CANCELLED();
    if (handler_ != nullptr) {
    ......
    ......@@ -39,6 +39,7 @@
    #include <stdexcept> // for out_of_range
    #include <string> // for operator+, to_string
    #include <utility> // for move
    // IWYU pragma: no_include "net/proto2/public/repeated_field.h"
    namespace caosdb::entity {
    using caosdb::entity::v1::IdResponse;
    ......
    ......@@ -42,6 +42,8 @@ TEST(test_issues, test_issue_11) {
    EXPECT_EQ(StatusCode::EXECUTING, transaction->ExecuteAsynchronously());
    // Trying to obtain ResultSet while it is still empty.
    EXPECT_EQ(transaction->GetResultSet().size(), 0);
    transaction->WaitForIt();
    EXPECT_EQ(transaction->GetResultSet().size(), 0);
    }
    } // namespace caosdb::transaction
    ......@@ -37,6 +37,7 @@
    #include <string> // for string, basic_string
    #include <utility> // for move
    #include <vector> // for vector
    // IWYU pragma: no_include "net/proto2/public/repeated_field.h"
    namespace caosdb::transaction {
    using caosdb::configuration::InsecureConnectionConfiguration;
    ......@@ -265,6 +266,7 @@ TEST(test_transaction, test_multiple_execute) {
    EXPECT_EQ(transaction->ExecuteAsynchronously(), StatusCode::EXECUTING);
    EXPECT_EQ(transaction->ExecuteAsynchronously(), StatusCode::TRANSACTION_STATUS_ERROR);
    EXPECT_EQ(transaction->ExecuteAsynchronously(), StatusCode::TRANSACTION_STATUS_ERROR);
    transaction->Cancel();
    }
    TEST(test_transaction, test_multiple_wait_for_it) {
    ......
    0% Loading or .
    You are about to add 0 people to the discussion. Proceed with caution.
    Please register or to comment