diff --git a/conanfile.py b/conanfile.py index a773f501212cb23d3649aae31c644bd66a259516..c91038d2d9ab6e2f23c21696bd62d19c0421ca78 100644 --- a/conanfile.py +++ b/conanfile.py @@ -14,7 +14,7 @@ class CaosdbConan(ConanFile): default_options = {"shared": False, "fPIC": True} generators = "cmake" requires = [ - ("grpc/1.39.1"), + ("grpc/1.45.2"), ] build_requires = [ ("boost/1.77.0"), diff --git a/include/caosdb/authentication.h b/include/caosdb/authentication.h index 8e98a17e6d94fe118cf2fc478efbbf77733be4a6..ffc2cabf91ebb2f3e8c8bd150bfb5423cd10e583 100644 --- a/include/caosdb/authentication.h +++ b/include/caosdb/authentication.h @@ -28,11 +28,11 @@ * @brief Configuration and setup of the client authentication. */ #include "caosdb/utility.h" // for base64_encode -#include <grpcpp/impl/codegen/interceptor.h> // for Status -#include <grpcpp/impl/codegen/security/auth_context.h> // for AuthContext -#include <grpcpp/impl/codegen/status.h> // for Status -#include <grpcpp/impl/codegen/string_ref.h> // for string_ref +#include <grpcpp/security/auth_context.h> // for AuthContext #include <grpcpp/security/credentials.h> // for CallCredentials +#include <grpcpp/support/interceptor.h> // for Status +#include <grpcpp/support/status.h> // for Status +#include <grpcpp/support/string_ref.h> // for string_ref #include <map> // for multimap #include <memory> // for shared_ptr #include <string> // for string diff --git a/include/caosdb/entity.h b/include/caosdb/entity.h index 52a69e05083670a302a71b883993ae28122e7655..23b6e2f612a52e85df9eb2682b6cac11fd594063 100644 --- a/include/caosdb/entity.h +++ b/include/caosdb/entity.h @@ -47,6 +47,7 @@ #include <string> // for string, basic... #include <utility> // for move #include <vector> // for vector +// IWYU pragma: no_include "net/proto2/public/repeated_field.h" namespace caosdb::entity { using caosdb::entity::v1::IdResponse; diff --git a/include/caosdb/transaction.h b/include/caosdb/transaction.h index f0732e86a8faf55763632ec0114b1311b8f67648..0795abf07b45819d3b4dfe618558c7fa7c0461bb 100644 --- a/include/caosdb/transaction.h +++ b/include/caosdb/transaction.h @@ -35,7 +35,7 @@ #include <future> // for async, future #include <google/protobuf/arena.h> // for Arena #include <google/protobuf/util/json_util.h> // for MessageToJsonS... -#include <grpcpp/impl/codegen/completion_queue.h> // for CompletionQueue +#include <grpcpp/completion_queue.h> // for CompletionQueue #include <iterator> // for iterator, next #include <map> // for map // IWYU pragma: no_include <ext/alloc_traits.h> @@ -378,8 +378,7 @@ public: * * A client may request the current status at any time via GetStatus(). * - * Use WaitForIt() to join the back-ground execution of this transaction, otherwise the behaviour - * of getting the ResultSet is undefined. + * Use WaitForIt() to join the back-ground execution of this transaction. */ auto ExecuteAsynchronously() noexcept -> StatusCode; @@ -390,7 +389,7 @@ public: * Use this after ExecuteAsynchronously(), otherwise the TransactionStatus still remains * EXECUTING. */ - [[nodiscard]] auto WaitForIt() const noexcept -> TransactionStatus; + auto WaitForIt() const noexcept -> TransactionStatus; /** * Return the current status of the transaction. @@ -401,20 +400,20 @@ public: * Return the ResultSet of this transaction. * * Note: If this method is called before the transaction has terminated - * (GetStatus().GetCode() < 0) this method has undefined behavior. - * - * Instead, do Execute() or WaitForIt() and only call this method afterwards. + * successfully (as indicated by GetStatus().GetCode == 0) this method has + * undefined behavior. */ [[nodiscard]] inline auto GetResultSet() const noexcept -> const ResultSet & { - if (this->result_set == nullptr) { + if (this->GetStatus().GetCode() < 0) { CAOSDB_LOG_ERROR(logger_name) << "GetResultSet was called before the transaction has terminated. This is a programming " "error of the code which uses the transaction."; - // TODO(tf) This is a really bad SegFault factory. When the transaction - // terminates and the result_set is being overriden, the unique_ptr - // created here will be deleted and any client of the return ResultSet - // will have a SegFault. - this->result_set = std::make_unique<MultiResultSet>(std::vector<std::unique_ptr<Entity>>()); + } else if (this->GetStatus().GetCode() == StatusCode::SPOILED) { + CAOSDB_LOG_ERROR(logger_name) + << "GetResultSet was called on a \"spoiled\" transaction. That means " + "that the result set has already been released via " + "ReleaseResultSet(). This is a programming error of the code which " + "uses the transaction."; } return *(this->result_set.get()); } @@ -431,7 +430,9 @@ public: */ [[nodiscard]] inline auto ReleaseResultSet() noexcept -> const ResultSet * { this->status = TransactionStatus::SPOILED(); - return this->result_set.release(); + auto result_set = this->result_set.release(); + this->result_set = std::make_unique<MultiResultSet>(std::vector<std::unique_ptr<Entity>>()); + return result_set; } /** @@ -484,22 +485,21 @@ public: */ inline auto GetUploadFiles() const -> const std::vector<FileDescriptor> & { return upload_files; } -protected: /** - * Await and process the current handler's results. + * Cancel this transaction after it has been started with + * ExecuteAsynchronously(). * - * This implies consecutive calls to the handler's OnNext function. + * This will cancel any active handler and drains the completion_queue. */ - auto ProcessCalls() -> TransactionStatus; + auto Cancel() -> void; +protected: /** - * Cancels any active handler and drains the completion_queue. + * Await and process the current handler's results. * - * Can stay protected until ExecuteAsynchronously() is actually asynchronous. - * Then it is also intended for aborting an execution after it has already - * started. + * This implies consecutive calls to the handler's OnNext function. */ - auto Cancel() -> void; + auto ProcessCalls() -> TransactionStatus; /** * Return the Arena where this transaction may create Message instances. @@ -511,13 +511,22 @@ protected: inline auto GetArena() const -> Arena * { return get_arena(); } private: - auto ProcessTerminated() const noexcept -> TransactionStatus; + /** + * Await and process the termination of this transaction. + * + * To be called at the end of DoExecuteTransaction on success. + */ + auto ProcessTerminated() const noexcept -> void; auto ProcessRetrieveResponse(RetrieveResponse *retrieve_response, std::vector<std::unique_ptr<Entity>> *entities, bool *set_error) const noexcept -> std::unique_ptr<Entity>; - auto DoExecuteTransaction() noexcept -> StatusCode; + /** + * This functions actually does all the work. Also, it is the one which is + * going to be send to the background thread by ExecuteAsynchronously. + */ + auto DoExecuteTransaction() noexcept -> void; mutable std::mutex transaction_mutex; - mutable std::future<StatusCode> transaction_future; + mutable std::future<void> transaction_future; grpc::CompletionQueue completion_queue; std::unique_ptr<HandlerInterface> handler_; @@ -526,7 +535,6 @@ private: bool has_query = false; TransactionType transaction_type = TransactionType::NONE; - mutable std::unique_ptr<ResultSet> result_set; mutable TransactionStatus status = TransactionStatus::INITIAL(); std::shared_ptr<EntityTransactionService::Stub> entity_service; std::shared_ptr<FileTransmissionService::Stub> file_service; @@ -534,6 +542,7 @@ private: mutable MultiTransactionResponse *response; std::string error_message; mutable long query_count; + mutable std::unique_ptr<ResultSet> result_set; }; template <class InputIterator> diff --git a/src/caosdb/authentication.cpp b/src/caosdb/authentication.cpp index ecfbe35344f557671d8529770f73a3987c389ab0..71dd48a645d1f0f8a5b946c307919672aca05898 100644 --- a/src/caosdb/authentication.cpp +++ b/src/caosdb/authentication.cpp @@ -19,10 +19,6 @@ * */ #include "caosdb/authentication.h" -#include <grpcpp/impl/codegen/interceptor.h> // for Status -#include <grpcpp/impl/codegen/security/auth_context.h> // for AuthContext -#include <grpcpp/impl/codegen/status.h> // for Status, Status::OK -#include <grpcpp/impl/codegen/string_ref.h> // for string_ref #include <grpcpp/security/credentials.h> // for MetadataCredentialsPlugin #include <map> // for multimap #include <memory> // for allocator, shared_ptr diff --git a/src/caosdb/connection.cpp b/src/caosdb/connection.cpp index 02c190ee1beff00fc3cbffa950628a67b5ffa32d..d2ef513c54edf4996f49f6d944f458f09876dfe9 100644 --- a/src/caosdb/connection.cpp +++ b/src/caosdb/connection.cpp @@ -27,10 +27,10 @@ #include "caosdb/info/v1/main.pb.h" // for GetVersionInfoRequest #include "caosdb/transaction.h" // for Transaction #include "caosdb/transaction_status.h" // for TransactionStatus -#include "grpcpp/impl/codegen/status_code_enum.h" // for StatusCode, UNAUTH... +#include <grpcpp/client_context.h> // for ClientContext #include <grpcpp/create_channel.h> // for CreateChannel -#include <grpcpp/impl/codegen/client_context.h> // for ClientContext -#include <grpcpp/impl/codegen/status.h> // for Status +#include <grpcpp/support/status.h> // for Status +#include <grpcpp/support/status_code_enum.h> // for StatusCode, UNAUTHENTIC... #include <string> // for string, operator+ namespace caosdb::connection { diff --git a/src/caosdb/transaction.cpp b/src/caosdb/transaction.cpp index d30d6aa9f3fa06d8dee09bb5a3d7c35ba810fb81..bbd747e79a8cb7f3a5fc8c10a36f5db735c59bec 100644 --- a/src/caosdb/transaction.cpp +++ b/src/caosdb/transaction.cpp @@ -29,19 +29,22 @@ #include "caosdb/status_code.h" // for StatusCode #include "caosdb/transaction_handler.h" // for EntityTransactionHandler #include <algorithm> // for max + // +#include <chrono> // for chrono_literals +#include <exception> // IWYU pragma: keep +#include <filesystem> // for operator<<, path +#include <future> // for async, future +#include <google/protobuf/arena.h> // for Arena +#include <grpc/impl/codegen/gpr_types.h> // for gpr_timespec +#include <map> // for map, operator!= +#include <memory> // for unique_ptr +#include <random> // for mt19937, rand... +#include <system_error> // for std::system_error +#include <thread> // for sleep +#include <utility> // for move, pair // IWYU pragma: no_include <bits/exception.h> // IWYU pragma: no_include <cxxabi.h> -#include <exception> // IWYU pragma: keep -#include <filesystem> // for operator<<, path -#include <future> // for async, future -#include <google/protobuf/arena.h> // for Arena -#include <grpc/impl/codegen/gpr_types.h> // for gpr_timespec -#include <grpcpp/impl/codegen/completion_queue.h> // for CompletionQueue -#include <map> // for map, operator!= -#include <memory> // for unique_ptr -#include <random> // for mt19937, rand... -#include <system_error> // for std::system_error -#include <utility> // for move, pair +// IWYU pragma: no_include "net/proto2/public/repeated_field.h" namespace caosdb::transaction { using caosdb::entity::v1::EntityTransactionService; @@ -91,7 +94,8 @@ Transaction::Transaction(std::shared_ptr<EntityTransactionService::Stub> entity_ std::shared_ptr<FileTransmissionService::Stub> file_service) : entity_service(std::move(entity_service)), file_service(std::move(file_service)), request(Arena::CreateMessage<MultiTransactionRequest>(GetArena())), - response(Arena::CreateMessage<MultiTransactionResponse>(GetArena())), query_count(-1) {} + response(Arena::CreateMessage<MultiTransactionResponse>(GetArena())), query_count(-1), + result_set(std::make_unique<MultiResultSet>(std::vector<std::unique_ptr<Entity>>())) {} auto Transaction::RetrieveById(const std::string &id) noexcept -> StatusCode { ASSERT_CAN_ADD_RETRIEVAL @@ -200,7 +204,11 @@ auto Transaction::Execute() -> TransactionStatus { } // NOLINTNEXTLINE -auto Transaction::DoExecuteTransaction() noexcept -> StatusCode { +#define TRANSACTION_SYNCRONIZED_BLOCK \ + const std::lock_guard<std::mutex> lock(this->transaction_mutex); + +// NOLINTNEXTLINE +auto Transaction::DoExecuteTransaction() noexcept -> void { // upload files first if (!upload_files.empty()) { CAOSDB_LOG_INFO(logger_name) << "Number of files to be uploaded: " << upload_files.size(); @@ -209,7 +217,7 @@ auto Transaction::DoExecuteTransaction() noexcept -> StatusCode { auto *registration_response = Arena::CreateMessage<RegisterFileUploadResponse>(get_arena()); { - const std::lock_guard<std::mutex> lock(this->transaction_mutex); + TRANSACTION_SYNCRONIZED_BLOCK if (this->status.GetCode() == StatusCode::EXECUTING) { handler_ = std::make_unique<RegisterFileUploadHandler>( &handler_, file_service.get(), &completion_queue, registration_request, @@ -220,14 +228,13 @@ auto Transaction::DoExecuteTransaction() noexcept -> StatusCode { if (registration_response->status() != RegistrationStatus::REGISTRATION_STATUS_ACCEPTED) { this->status = TransactionStatus::FILE_UPLOAD_ERROR(); - return this->status.GetCode(); } for (auto &file_descriptor : upload_files) { file_descriptor.file_transmission_id->set_registration_id( registration_response->registration_id()); { - const std::lock_guard<std::mutex> lock(this->transaction_mutex); + TRANSACTION_SYNCRONIZED_BLOCK if (this->status.GetCode() == StatusCode::EXECUTING) { CAOSDB_LOG_INFO(logger_name) << "Uploading " << file_descriptor.local_path; handler_ = std::make_unique<UploadRequestHandler>(&handler_, file_service.get(), @@ -236,15 +243,15 @@ auto Transaction::DoExecuteTransaction() noexcept -> StatusCode { } this->status = ProcessCalls(); if (this->status.GetCode() != StatusCode::EXECUTING) { - // this indicates an error during the upload - return this->status.GetCode(); + // Return early, there has been an error. + return; } } } if (this->status.GetCode() == StatusCode::EXECUTING) { { - const std::lock_guard<std::mutex> lock(this->transaction_mutex); + TRANSACTION_SYNCRONIZED_BLOCK if (this->status.GetCode() == StatusCode::EXECUTING) { CAOSDB_LOG_DEBUG(logger_name) << "RPC Request: " << RequestToString(); handler_ = std::make_unique<EntityTransactionHandler>(&handler_, entity_service.get(), @@ -273,7 +280,7 @@ auto Transaction::DoExecuteTransaction() noexcept -> StatusCode { for (const auto &item : download_files) { auto file_descriptor(item.second); { - const std::lock_guard<std::mutex> lock(this->transaction_mutex); + TRANSACTION_SYNCRONIZED_BLOCK if (this->status.GetCode() == StatusCode::EXECUTING) { CAOSDB_LOG_INFO(logger_name) << "Downloading " << file_descriptor.local_path; @@ -284,19 +291,17 @@ auto Transaction::DoExecuteTransaction() noexcept -> StatusCode { this->status = ProcessCalls(); if (this->status.GetCode() != StatusCode::EXECUTING) { // this indicates an error during the download - break; - return this->status.GetCode(); + return; } } } if (this->status.GetCode() == StatusCode::EXECUTING) { - return ProcessTerminated().GetCode(); + ProcessTerminated(); } - return this->status.GetCode(); } auto Transaction::ExecuteAsynchronously() noexcept -> StatusCode { - const std::lock_guard<std::mutex> lock(this->transaction_mutex); + TRANSACTION_SYNCRONIZED_BLOCK if (this->status.GetCode() != StatusCode::READY && this->status.GetCode() != StatusCode::GO_ON) { return StatusCode::TRANSACTION_STATUS_ERROR; } @@ -312,8 +317,7 @@ auto Transaction::ExecuteAsynchronously() noexcept -> StatusCode { } this->status = TransactionStatus::EXECUTING(); - this->transaction_future = - std::async(std::launch::async, [this]() { return this->DoExecuteTransaction(); }); + this->transaction_future = std::async(std::launch::async, [this]() { DoExecuteTransaction(); }); return StatusCode::EXECUTING; } @@ -353,7 +357,7 @@ auto Transaction::ProcessRetrieveResponse(RetrieveResponse *retrieve_response, return result; } -auto Transaction::ProcessTerminated() const noexcept -> TransactionStatus { +auto Transaction::ProcessTerminated() const noexcept -> void { CAOSDB_LOG_TRACE_ENTER_AND_LEAVE(logger_name, "Transaction::ProcessTerminated()") bool set_error = false; auto *responses = this->response->mutable_responses(); @@ -409,8 +413,6 @@ auto Transaction::ProcessTerminated() const noexcept -> TransactionStatus { } else { this->status = TransactionStatus::SUCCESS(); } - - return this->status; } auto Transaction::WaitForIt() const noexcept -> TransactionStatus { @@ -485,7 +487,18 @@ Transaction::~Transaction() { void Transaction::Cancel() { CAOSDB_LOG_TRACE_ENTER_AND_LEAVE(logger_name, "Transaction::Cancel()") - const std::lock_guard<std::mutex> lock(this->transaction_mutex); + + TRANSACTION_SYNCRONIZED_BLOCK + + if (this->status.GetCode() > 0) { + // Prevent canceling before the queue even started. + // Temporary fix for a bug in GRPC. + // Fix is in the making: + // https://github.com/grpc/grpc/pull/30004 + using namespace std::chrono_literals; + std::this_thread::sleep_for(5ms); + } + this->status = TransactionStatus::CANCELLED(); if (handler_ != nullptr) { diff --git a/test/test_entity.cpp b/test/test_entity.cpp index 5f5aaaab4a5f6727f38ea30c0205ea8d79a925b1..a42e91781922c5e8b9530663b5bdbc53d4532fe3 100644 --- a/test/test_entity.cpp +++ b/test/test_entity.cpp @@ -39,6 +39,7 @@ #include <stdexcept> // for out_of_range #include <string> // for operator+, to_string #include <utility> // for move +// IWYU pragma: no_include "net/proto2/public/repeated_field.h" namespace caosdb::entity { using caosdb::entity::v1::IdResponse; diff --git a/test/test_issues.cpp b/test/test_issues.cpp index 3e92f4e71d3055575058dcfba5ff3a9ab9de8596..b7a6cc8fe7df2bbcf8b2db0902c57c7146b77faf 100644 --- a/test/test_issues.cpp +++ b/test/test_issues.cpp @@ -42,6 +42,8 @@ TEST(test_issues, test_issue_11) { EXPECT_EQ(StatusCode::EXECUTING, transaction->ExecuteAsynchronously()); // Trying to obtain ResultSet while it is still empty. EXPECT_EQ(transaction->GetResultSet().size(), 0); + transaction->WaitForIt(); + EXPECT_EQ(transaction->GetResultSet().size(), 0); } } // namespace caosdb::transaction diff --git a/test/test_transaction.cpp b/test/test_transaction.cpp index df87f74314ed88a7609aa27265d6bbc20edb36d1..e710b6c775615968b567b6efa64ba76e56717c69 100644 --- a/test/test_transaction.cpp +++ b/test/test_transaction.cpp @@ -37,6 +37,7 @@ #include <string> // for string, basic_string #include <utility> // for move #include <vector> // for vector +// IWYU pragma: no_include "net/proto2/public/repeated_field.h" namespace caosdb::transaction { using caosdb::configuration::InsecureConnectionConfiguration; @@ -265,6 +266,7 @@ TEST(test_transaction, test_multiple_execute) { EXPECT_EQ(transaction->ExecuteAsynchronously(), StatusCode::EXECUTING); EXPECT_EQ(transaction->ExecuteAsynchronously(), StatusCode::TRANSACTION_STATUS_ERROR); EXPECT_EQ(transaction->ExecuteAsynchronously(), StatusCode::TRANSACTION_STATUS_ERROR); + transaction->Cancel(); } TEST(test_transaction, test_multiple_wait_for_it) {