diff --git a/conanfile.py b/conanfile.py index a773f501212cb23d3649aae31c644bd66a259516..c91038d2d9ab6e2f23c21696bd62d19c0421ca78 100644 --- a/conanfile.py +++ b/conanfile.py @@ -14,7 +14,7 @@ class CaosdbConan(ConanFile): default_options = {"shared": False, "fPIC": True} generators = "cmake" requires = [ - ("grpc/1.39.1"), + ("grpc/1.45.2"), ] build_requires = [ ("boost/1.77.0"), diff --git a/doc/CHANGELOG.md b/doc/CHANGELOG.md index fd3083fe290b51f78c12838c94bb3cf5b7a5a066..adbf07c79763f4af236ac90eff74e44669ec7f02 100644 --- a/doc/CHANGELOG.md +++ b/doc/CHANGELOG.md @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed +* Transaction::ExecuteAsynchronously is actually asynchronous now. * Removed boost from the headers. Boost is only a build dependency from now on. ### Deprecated diff --git a/include/caosdb/authentication.h b/include/caosdb/authentication.h index 8e98a17e6d94fe118cf2fc478efbbf77733be4a6..2e117265d71f660888d6fd76d5fe3a3f898cac95 100644 --- a/include/caosdb/authentication.h +++ b/include/caosdb/authentication.h @@ -27,15 +27,15 @@ * @date 2021-06-28 * @brief Configuration and setup of the client authentication. */ -#include "caosdb/utility.h" // for base64_encode -#include <grpcpp/impl/codegen/interceptor.h> // for Status -#include <grpcpp/impl/codegen/security/auth_context.h> // for AuthContext -#include <grpcpp/impl/codegen/status.h> // for Status -#include <grpcpp/impl/codegen/string_ref.h> // for string_ref -#include <grpcpp/security/credentials.h> // for CallCredentials -#include <map> // for multimap -#include <memory> // for shared_ptr -#include <string> // for string +#include "caosdb/utility.h" // for base64_encode +#include <grpcpp/security/auth_context.h> // for AuthContext +#include <grpcpp/security/credentials.h> // for CallCredentials +#include <grpcpp/support/interceptor.h> // for Status +#include <grpcpp/support/status.h> // for Status +#include <grpcpp/support/string_ref.h> // for string_ref +#include <map> // for multimap +#include <memory> // for shared_ptr +#include <string> // for string namespace caosdb { namespace authentication { diff --git a/include/caosdb/entity.h b/include/caosdb/entity.h index 52a69e05083670a302a71b883993ae28122e7655..23b6e2f612a52e85df9eb2682b6cac11fd594063 100644 --- a/include/caosdb/entity.h +++ b/include/caosdb/entity.h @@ -47,6 +47,7 @@ #include <string> // for string, basic... #include <utility> // for move #include <vector> // for vector +// IWYU pragma: no_include "net/proto2/public/repeated_field.h" namespace caosdb::entity { using caosdb::entity::v1::IdResponse; diff --git a/include/caosdb/file_transmission/download_request_handler.h b/include/caosdb/file_transmission/download_request_handler.h index cd8542bef873169c363a43e8c727a7323f3c64bc..965db6419de180f46e1a712d4a4d5795857599ab 100644 --- a/include/caosdb/file_transmission/download_request_handler.h +++ b/include/caosdb/file_transmission/download_request_handler.h @@ -84,7 +84,11 @@ public: DownloadRequestHandler(DownloadRequestHandler &&) = delete; DownloadRequestHandler &operator=(DownloadRequestHandler &&) = delete; - void Start() override { OnNext(true); } + void Start() override { + if (state_ == CallState::NewCall) { + OnNext(true); + } + } bool OnNext(bool ok) override; diff --git a/include/caosdb/file_transmission/upload_request_handler.h b/include/caosdb/file_transmission/upload_request_handler.h index 193489e21c2c69d4c4df564cc6ad2c71d200cc8f..099e39e62a087111308f7afdf77631a3e42c5d09 100644 --- a/include/caosdb/file_transmission/upload_request_handler.h +++ b/include/caosdb/file_transmission/upload_request_handler.h @@ -81,7 +81,11 @@ public: UploadRequestHandler(UploadRequestHandler &&) = delete; UploadRequestHandler &operator=(UploadRequestHandler &&) = delete; - void Start() override { OnNext(true); } + void Start() override { + if (state_ == CallState::NewCall) { + OnNext(true); + } + } bool OnNext(bool ok) override; diff --git a/include/caosdb/logging.h b/include/caosdb/logging.h index 263151fd21674d1d801439c7c4b4d13a0837043a..e50c70f69bc0ed45e8f6a60f78cd11e4142b0568 100644 --- a/include/caosdb/logging.h +++ b/include/caosdb/logging.h @@ -25,8 +25,8 @@ #include "caosdb/log_level.h" // for CAOSDB_LOG_... #include <cstdint> // for uint64_t +#include <iosfwd> // for streamsize #include <memory> // for shared_ptr -#include <ostream> // for ostream #include <string> // for string #include <vector> // for vector @@ -53,6 +53,30 @@ private: int level; }; +/** + * Helper class for logging the entering and leaving of a function or method. + * + * Please Use the macro + * + * CAOSDB_LOG_TRACE_ENTER_AND_LEAVE(logger_name, function_name); + */ +class TraceEnterLeaveLogger { +public: + inline TraceEnterLeaveLogger(const std::string &channel, const std::string &function_name) + : channel(channel), function_name(function_name) { + caosdb::logging::LoggerOutputStream::get(this->channel, CAOSDB_LOG_LEVEL_TRACE) + << "Enter " << this->function_name; + } + inline ~TraceEnterLeaveLogger() { + caosdb::logging::LoggerOutputStream::get(this->channel, CAOSDB_LOG_LEVEL_TRACE) + << "Leave " << this->function_name; + } + +private: + const std::string &channel; + const std::string function_name; +}; + /** * This class stores the integer log level. */ @@ -210,6 +234,9 @@ void caosdb_log_trace(const char *channel, const char *msg); #define CAOSDB_LOG_TRACE(Channel) \ caosdb::logging::LoggerOutputStream::get(Channel, CAOSDB_LOG_LEVEL_TRACE) +#define CAOSDB_LOG_TRACE_ENTER_AND_LEAVE(Channel, FunctionName) \ + const caosdb::logging::TraceEnterLeaveLogger trace_enter_leave_logger(Channel, FunctionName); + #define CAOSDB_LOG_ERROR_AND_RETURN_STATUS(Channel, StatusCode, Message) \ CAOSDB_LOG_ERROR(Channel) << "StatusCode (" << StatusCode << ") " \ << caosdb::get_status_description(StatusCode) << ": " << Message; \ diff --git a/include/caosdb/transaction.h b/include/caosdb/transaction.h index ae0361276eb0ad1d5f282a0068ae99f9418b2f7c..e73840e4f2aa355961f653b24817784a2773313f 100644 --- a/include/caosdb/transaction.h +++ b/include/caosdb/transaction.h @@ -21,24 +21,26 @@ #ifndef CAOSDB_TRANSACTION_H #define CAOSDB_TRANSACTION_H -#include "caosdb/entity.h" // for Entity, FileDe... -#include "caosdb/entity/v1/main.grpc.pb.h" // for EntityTransact... -#include "caosdb/entity/v1/main.pb.h" // for MultiTransacti... -#include "caosdb/file_descriptor.h" // for FileDescriptor -#include "caosdb/handler_interface.h" // for HandlerInterface -#include "caosdb/transaction_handler.h" // for EntityTransactionHandler -#include "caosdb/logging.h" // for CAOSDB_LOG_ERR... -#include "caosdb/protobuf_helper.h" // for get_arena -#include "caosdb/status_code.h" // for StatusCode -#include "caosdb/transaction_status.h" // for StatusCode -#include <google/protobuf/arena.h> // for Arena -#include <google/protobuf/util/json_util.h> // for MessageToJsonS... -#include <grpcpp/impl/codegen/completion_queue.h> // for CompletionQueue -#include <algorithm> // for max -#include <iterator> // for iterator, next -#include <map> // for map +#include "caosdb/entity.h" // for Entity, FileDe... +#include "caosdb/entity/v1/main.grpc.pb.h" // for EntityTransact... +#include "caosdb/entity/v1/main.pb.h" // for MultiTransacti... +#include "caosdb/file_descriptor.h" // for FileDescriptor +#include "caosdb/handler_interface.h" // for HandlerInterface +#include "caosdb/transaction_handler.h" // for EntityTransactionHandler +#include "caosdb/logging.h" // for CAOSDB_LOG_ERR... +#include "caosdb/protobuf_helper.h" // for get_arena +#include "caosdb/status_code.h" // for StatusCode +#include "caosdb/transaction_status.h" // for StatusCode +#include <algorithm> // for max +#include <future> // for async, future +#include <google/protobuf/arena.h> // for Arena +#include <google/protobuf/util/json_util.h> // for MessageToJsonS... +#include <grpcpp/completion_queue.h> // for CompletionQueue +#include <iterator> // for iterator, next +#include <map> // for map // IWYU pragma: no_include <ext/alloc_traits.h> #include <memory> // for unique_ptr +#include <mutex> // for mutex #include <string> // for string #include <utility> // for move #include <vector> // for vector @@ -48,7 +50,8 @@ * query) can be added as a sub-request to a transaction. */ #define ASSERT_CAN_ADD_RETRIEVAL \ - if (!IsStatus(TransactionStatus::INITIAL()) && !IsStatus(TransactionStatus::GO_ON())) { \ + if (this->status.GetCode() != StatusCode::INITIAL && \ + this->status.GetCode() != StatusCode::GO_ON) { \ return StatusCode::TRANSACTION_STATUS_ERROR; \ } \ switch (this->transaction_type) { \ @@ -84,7 +87,8 @@ * sub-request to a transaction. */ #define ASSERT_CAN_ADD_DELETION \ - if (!IsStatus(TransactionStatus::INITIAL()) && !IsStatus(TransactionStatus::GO_ON())) { \ + if (this->status.GetCode() != StatusCode::INITIAL && \ + this->status.GetCode() != StatusCode::GO_ON) { \ return StatusCode::TRANSACTION_STATUS_ERROR; \ } \ switch (this->transaction_type) { \ @@ -106,7 +110,8 @@ * sub-request to a transaction. */ #define ASSERT_CAN_ADD_INSERTION \ - if (!IsStatus(TransactionStatus::INITIAL()) && !IsStatus(TransactionStatus::GO_ON())) { \ + if (this->status.GetCode() != StatusCode::INITIAL && \ + this->status.GetCode() != StatusCode::GO_ON) { \ return StatusCode::TRANSACTION_STATUS_ERROR; \ } \ switch (this->transaction_type) { \ @@ -128,7 +133,8 @@ * sub-request to a transaction. */ #define ASSERT_CAN_ADD_UPDATE \ - if (!IsStatus(TransactionStatus::INITIAL()) && !IsStatus(TransactionStatus::GO_ON())) { \ + if (this->status.GetCode() != StatusCode::INITIAL && \ + this->status.GetCode() != StatusCode::GO_ON) { \ return StatusCode::TRANSACTION_STATUS_ERROR; \ } \ switch (this->transaction_type) { \ @@ -174,6 +180,7 @@ using caosdb::entity::v1::MultiTransactionResponse; using caosdb::entity::v1::RegisterFileUploadRequest; using caosdb::entity::v1::RegisterFileUploadResponse; using caosdb::transaction::TransactionStatus; +using RetrieveResponse = caosdb::entity::v1::RetrieveResponse; using TransactionResponseCase = caosdb::entity::v1::TransactionResponse::TransactionResponseCase; using caosdb::utility::get_arena; using google::protobuf::Arena; @@ -361,10 +368,6 @@ public: */ auto DeleteById(const std::string &id) noexcept -> StatusCode; - inline auto IsStatus(const TransactionStatus &status) const noexcept -> bool { - return this->status.GetCode() == status.GetCode(); - }; - /** * Execute this transaction in blocking mode and return the status. */ @@ -375,8 +378,7 @@ public: * * A client may request the current status at any time via GetStatus(). * - * Use WaitForIt() to join the back-ground execution of this transaction, otherwise the behaviour - * of getting the ResultSet is undefined. + * Use WaitForIt() to join the back-ground execution of this transaction. */ auto ExecuteAsynchronously() noexcept -> StatusCode; @@ -387,7 +389,7 @@ public: * Use this after ExecuteAsynchronously(), otherwise the TransactionStatus still remains * EXECUTING. */ - [[nodiscard]] auto WaitForIt() const noexcept -> TransactionStatus; + auto WaitForIt() const noexcept -> TransactionStatus; /** * Return the current status of the transaction. @@ -398,12 +400,11 @@ public: * Return the ResultSet of this transaction. * * Note: If this method is called before the transaction has terminated - * (GetStatus().GetCode() < 0) this method has undefined behavior. - * - * Instead, do Execute() or WaitForIt() and only call this method afterwards. + * successfully (as indicated by GetStatus().GetCode == 0) this method has + * undefined behavior. */ [[nodiscard]] inline auto GetResultSet() const noexcept -> const ResultSet & { - if (this->result_set == nullptr) { + if (this->GetStatus().GetCode() < 0) { CAOSDB_LOG_ERROR(logger_name) << "GetResultSet was called before the transaction has terminated. This is a programming " "error of the code which uses the transaction."; @@ -411,7 +412,12 @@ public: // terminates and the result_set is being overriden, the unique_ptr // created here will be deleted and any client of the return ResultSet // will have a SegFault. - this->result_set = std::make_unique<MultiResultSet>(std::vector<std::unique_ptr<Entity>>()); + } else if (this->GetStatus().GetCode() == StatusCode::SPOILED) { + CAOSDB_LOG_ERROR(logger_name) + << "GetResultSet was called on a \"spoiled\" transaction. That means " + "that the result set has already been released via " + "ReleaseResultSet(). This is a programming error of the code which " + "uses the transaction."; } return *(this->result_set.get()); } @@ -428,7 +434,9 @@ public: */ [[nodiscard]] inline auto ReleaseResultSet() noexcept -> const ResultSet * { this->status = TransactionStatus::SPOILED(); - return this->result_set.release(); + auto result_set = this->result_set.release(); + this->result_set = std::make_unique<MultiResultSet>(std::vector<std::unique_ptr<Entity>>()); + return result_set; } /** @@ -481,22 +489,21 @@ public: */ inline auto GetUploadFiles() const -> const std::vector<FileDescriptor> & { return upload_files; } -protected: /** - * Await and process the current handler's results. + * Cancel this transaction after it has been started with + * ExecuteAsynchronously(). * - * This implies consecutive calls to the handler's OnNext function. + * This will cancel any active handler and drains the completion_queue. */ - auto ProcessCalls() -> TransactionStatus; + auto Cancel() -> void; +protected: /** - * Cancels any active handler and drains the completion_queue. + * Await and process the current handler's results. * - * Can stay protected until ExecuteAsynchronously() is actually asynchronous. - * Then it is also intended for aborting an execution after it has already - * started. + * This implies consecutive calls to the handler's OnNext function. */ - auto Cancel() -> void; + auto ProcessCalls() -> TransactionStatus; /** * Return the Arena where this transaction may create Message instances. @@ -508,6 +515,22 @@ protected: inline auto GetArena() const -> Arena * { return get_arena(); } private: + /** + * Await and process the termination of this transaction. + * + * To be called at the end of DoExecuteTransaction on success. + */ + auto ProcessTerminated() const noexcept -> void; + auto ProcessRetrieveResponse(RetrieveResponse *retrieve_response, + std::vector<std::unique_ptr<Entity>> *entities, + bool *set_error) const noexcept -> std::unique_ptr<Entity>; + /** + * This functions actually does all the work. Also, it is the one which is + * going to be send to the background thread by ExecuteAsynchronously. + */ + auto DoExecuteTransaction() noexcept -> void; + mutable std::mutex transaction_mutex; + mutable std::future<void> transaction_future; grpc::CompletionQueue completion_queue; std::unique_ptr<HandlerInterface> handler_; @@ -516,7 +539,6 @@ private: bool has_query = false; TransactionType transaction_type = TransactionType::NONE; - mutable std::unique_ptr<ResultSet> result_set; mutable TransactionStatus status = TransactionStatus::INITIAL(); std::shared_ptr<EntityTransactionService::Stub> entity_service; std::shared_ptr<FileTransmissionService::Stub> file_service; @@ -524,6 +546,7 @@ private: mutable MultiTransactionResponse *response; std::string error_message; mutable long query_count; + mutable std::unique_ptr<ResultSet> result_set; }; template <class InputIterator> diff --git a/include/caosdb/transaction_status.h b/include/caosdb/transaction_status.h index bdfd65595090acde2b177dd7486492738901b0e0..d46f3911aa17fbf518fd43b91e3568e4e4740c2f 100644 --- a/include/caosdb/transaction_status.h +++ b/include/caosdb/transaction_status.h @@ -92,6 +92,13 @@ public: * This status means that the transaction has been executed successfully. */ CAOSDB_TRANSACTION_STATUS_DEFAULT_FACTORY(SUCCESS, StatusCode::SUCCESS) + /** + * Factory for a CANCELLED status. + * + * This status means that the transaction has been canceled and should not be + * used anymore. + */ + CAOSDB_TRANSACTION_STATUS_DEFAULT_FACTORY(CANCELLED, StatusCode::CANCELLED) /** * Factory for a CONNECTION_ERROR status. * diff --git a/include/caosdb/unary_rpc_handler.h b/include/caosdb/unary_rpc_handler.h index 0d35eec0f03d546b70ae5c1c59aa057896bb575b..a505a8724d67b5434b3535e15bc1ccb9eb635de4 100644 --- a/include/caosdb/unary_rpc_handler.h +++ b/include/caosdb/unary_rpc_handler.h @@ -63,8 +63,10 @@ public: : HandlerInterface(), state_(CallState::NewCall), completion_queue(completion_queue) {} void Start() override { - transaction_status = TransactionStatus::EXECUTING(); - OnNext(true); + if (state_ == CallState::NewCall) { + transaction_status = TransactionStatus::EXECUTING(); + OnNext(true); + } } bool OnNext(bool ok) override; diff --git a/src/caosdb/authentication.cpp b/src/caosdb/authentication.cpp index ecfbe35344f557671d8529770f73a3987c389ab0..7faa8619b4153c8ff3c03549369b7d976a62e035 100644 --- a/src/caosdb/authentication.cpp +++ b/src/caosdb/authentication.cpp @@ -19,15 +19,11 @@ * */ #include "caosdb/authentication.h" -#include <grpcpp/impl/codegen/interceptor.h> // for Status -#include <grpcpp/impl/codegen/security/auth_context.h> // for AuthContext -#include <grpcpp/impl/codegen/status.h> // for Status, Status::OK -#include <grpcpp/impl/codegen/string_ref.h> // for string_ref -#include <grpcpp/security/credentials.h> // for MetadataCredentialsPlugin -#include <map> // for multimap -#include <memory> // for allocator, shared_ptr -#include <string> // for basic_string, operator+ -#include <utility> // for pair, move, make_pair +#include <grpcpp/security/credentials.h> // for MetadataCredentialsPlugin +#include <map> // for multimap +#include <memory> // for allocator, shared_ptr +#include <string> // for basic_string, operator+ +#include <utility> // for pair, move, make_pair namespace caosdb::authentication { using caosdb::utility::base64_encode; diff --git a/src/caosdb/connection.cpp b/src/caosdb/connection.cpp index f13100c4dd8cf5dfe44f6c511e6cf1e0cda3c688..cf019dab920ae24f17cb2002d22533c1aa69ee85 100644 --- a/src/caosdb/connection.cpp +++ b/src/caosdb/connection.cpp @@ -30,10 +30,10 @@ #include "caosdb/info/v1/main.pb.h" // for GetVersionInfoRequest #include "caosdb/transaction.h" // for Transaction #include "caosdb/transaction_status.h" // for TransactionStatus -#include <grpcpp/create_channel.h> // for CreateChannel -#include <grpcpp/impl/codegen/client_context.h> // for ClientContext -#include <grpcpp/impl/codegen/status.h> // for Status -#include <grpcpp/impl/codegen/status_code_enum.h> // for StatusCode, UNAUTH... +#include <grpcpp/client_context.h> // for ClientContext +#include <grpcpp/create_channel.h> // for CreateChannel +#include <grpcpp/support/status.h> // for Status +#include <grpcpp/support/status_code_enum.h> // for StatusCode, UNAUTHENTIC... #include <string> // for string, operator+ namespace caosdb::connection { diff --git a/src/caosdb/file_transmission/download_request_handler.cpp b/src/caosdb/file_transmission/download_request_handler.cpp index 6d2df655acde2cec564c1ac6bf308bac2c9eca82..e079d28390b59cd439efa7476c5df672a63c1ecc 100644 --- a/src/caosdb/file_transmission/download_request_handler.cpp +++ b/src/caosdb/file_transmission/download_request_handler.cpp @@ -116,7 +116,10 @@ bool DownloadRequestHandler::OnNext(bool ok) { return true; } -void DownloadRequestHandler::Cancel() { ctx_.TryCancel(); } +void DownloadRequestHandler::Cancel() { + state_ = CallState::CallComplete; + ctx_.TryCancel(); +} void DownloadRequestHandler::handleNewCallState() { CAOSDB_LOG_TRACE(logger_name) << "Enter DownloadRequestHandler::handleNewCallState. local_path = " diff --git a/src/caosdb/file_transmission/upload_request_handler.cpp b/src/caosdb/file_transmission/upload_request_handler.cpp index 2f2d2c08db213bea30a3b2635b15d7eb38e249a2..e4fb985b70b73e25869f54e635cb089901bb41a1 100644 --- a/src/caosdb/file_transmission/upload_request_handler.cpp +++ b/src/caosdb/file_transmission/upload_request_handler.cpp @@ -119,7 +119,10 @@ bool UploadRequestHandler::OnNext(bool ok) { return true; } -void UploadRequestHandler::Cancel() { ctx_.TryCancel(); } +void UploadRequestHandler::Cancel() { + state_ = CallState::CallComplete; + ctx_.TryCancel(); +} void UploadRequestHandler::handleNewCallState() { auto filename = file_descriptor_.local_path; diff --git a/src/caosdb/logging.cpp b/src/caosdb/logging.cpp index cdd505877850bc0adc06b15a85f80fdc9ce3a653..7aaedee77696ada264418c104b6c8c28c4bb7b24 100644 --- a/src/caosdb/logging.cpp +++ b/src/caosdb/logging.cpp @@ -40,7 +40,6 @@ #include <boost/smart_ptr/shared_ptr.hpp> #include <cstdint> // for uint64_t #include <memory> -#include <ostream> // for ostream #include <sstream> #include <string> #include <utility> // for move diff --git a/src/caosdb/transaction.cpp b/src/caosdb/transaction.cpp index 939e5e8d6e55a533d78c826f513ffea527101e0c..740945faeb2fc91ee43129165c24d630015ee365 100644 --- a/src/caosdb/transaction.cpp +++ b/src/caosdb/transaction.cpp @@ -29,16 +29,22 @@ #include "caosdb/status_code.h" // for StatusCode #include "caosdb/transaction_handler.h" // for EntityTransactionHandler #include <algorithm> // for max + // +#include <chrono> // for chrono_literals +#include <exception> // IWYU pragma: keep +#include <filesystem> // for operator<<, path +#include <future> // for async, future +#include <google/protobuf/arena.h> // for Arena +#include <grpc/impl/codegen/gpr_types.h> // for gpr_timespec +#include <map> // for map, operator!= +#include <memory> // for unique_ptr +#include <random> // for mt19937, rand... +#include <system_error> // for std::system_error +#include <thread> // for sleep +#include <utility> // for move, pair // IWYU pragma: no_include <bits/exception.h> -#include <exception> // IWYU pragma: keep -#include <filesystem> // for operator<<, path -#include <google/protobuf/arena.h> // for Arena -#include <grpc/impl/codegen/gpr_types.h> // for gpr_timespec -#include <grpcpp/impl/codegen/completion_queue.h> // for CompletionQueue -#include <map> // for map, operator!= -#include <memory> // for unique_ptr -#include <random> // for mt19937, rand... -#include <utility> // for move, pair +// IWYU pragma: no_include <cxxabi.h> +// IWYU pragma: no_include "net/proto2/public/repeated_field.h" namespace caosdb::transaction { using caosdb::entity::v1::EntityTransactionService; @@ -47,8 +53,10 @@ using caosdb::entity::v1::MultiTransactionRequest; using caosdb::entity::v1::MultiTransactionResponse; using TransactionResponseCase = caosdb::entity::v1::TransactionResponse::TransactionResponseCase; using RetrieveResponseCase = caosdb::entity::v1::RetrieveResponse::RetrieveResponseCase; +using RetrieveResponse = caosdb::entity::v1::RetrieveResponse; using ProtoEntity = caosdb::entity::v1::Entity; using caosdb::entity::v1::EntityRequest; + using google::protobuf::Arena; using NextStatus = grpc::CompletionQueue::NextStatus; using RegistrationStatus = caosdb::entity::v1::RegistrationStatus; @@ -86,7 +94,8 @@ Transaction::Transaction(std::shared_ptr<EntityTransactionService::Stub> entity_ std::shared_ptr<FileTransmissionService::Stub> file_service) : entity_service(std::move(entity_service)), file_service(std::move(file_service)), request(Arena::CreateMessage<MultiTransactionRequest>(GetArena())), - response(Arena::CreateMessage<MultiTransactionResponse>(GetArena())), query_count(-1) {} + response(Arena::CreateMessage<MultiTransactionResponse>(GetArena())), query_count(-1), + result_set(std::make_unique<MultiResultSet>(std::vector<std::unique_ptr<Entity>>())) {} auto Transaction::RetrieveById(const std::string &id) noexcept -> StatusCode { ASSERT_CAN_ADD_RETRIEVAL @@ -194,59 +203,62 @@ auto Transaction::Execute() -> TransactionStatus { return status; } -// TODO(tf) This has apparently a cognitive complexity of 39>25 (threshold). -auto Transaction::ExecuteAsynchronously() noexcept -> StatusCode { // NOLINT - if (!IsStatus(TransactionStatus::READY()) && !IsStatus(TransactionStatus::GO_ON())) { - return StatusCode::TRANSACTION_STATUS_ERROR; - } - switch (this->transaction_type) { - case MIXED_READ_AND_WRITE: - CAOSDB_LOG_ERROR_AND_RETURN_STATUS( - logger_name, StatusCode::UNSUPPORTED_FEATURE, - "MIXED_READ_AND_WRITE UNSUPPORTED: The current implementation does not support " - "mixed read and write transactions (containing retrievals, insertions, " - "deletions, and updates in one transaction).") - default: - break; - } - this->status = TransactionStatus::EXECUTING(); +// NOLINTNEXTLINE +#define TRANSACTION_SYNCRONIZED_BLOCK \ + const std::lock_guard<std::mutex> lock(this->transaction_mutex); +// NOLINTNEXTLINE +auto Transaction::DoExecuteTransaction() noexcept -> void { // upload files first if (!upload_files.empty()) { CAOSDB_LOG_INFO(logger_name) << "Number of files to be uploaded: " << upload_files.size(); - auto *registration_request = Arena::CreateMessage<RegisterFileUploadRequest>(GetArena()); - auto *registration_response = Arena::CreateMessage<RegisterFileUploadResponse>(GetArena()); + auto *registration_request = Arena::CreateMessage<RegisterFileUploadRequest>(get_arena()); + auto *registration_response = Arena::CreateMessage<RegisterFileUploadResponse>(get_arena()); - handler_ = - std::make_unique<RegisterFileUploadHandler>(&handler_, file_service.get(), &completion_queue, - registration_request, registration_response); + { + TRANSACTION_SYNCRONIZED_BLOCK + if (this->status.GetCode() == StatusCode::EXECUTING) { + handler_ = std::make_unique<RegisterFileUploadHandler>( + &handler_, file_service.get(), &completion_queue, registration_request, + registration_response); + } + } this->status = ProcessCalls(); if (registration_response->status() != RegistrationStatus::REGISTRATION_STATUS_ACCEPTED) { this->status = TransactionStatus::FILE_UPLOAD_ERROR(); - return StatusCode::EXECUTING; } for (auto &file_descriptor : upload_files) { file_descriptor.file_transmission_id->set_registration_id( registration_response->registration_id()); - CAOSDB_LOG_INFO(logger_name) << "Uploading " << file_descriptor.local_path; - handler_ = std::make_unique<UploadRequestHandler>(&handler_, file_service.get(), - &completion_queue, file_descriptor); + { + TRANSACTION_SYNCRONIZED_BLOCK + if (this->status.GetCode() == StatusCode::EXECUTING) { + CAOSDB_LOG_INFO(logger_name) << "Uploading " << file_descriptor.local_path; + handler_ = std::make_unique<UploadRequestHandler>(&handler_, file_service.get(), + &completion_queue, file_descriptor); + } + } this->status = ProcessCalls(); if (this->status.GetCode() != StatusCode::EXECUTING) { - return StatusCode::EXECUTING; + // Return early, there has been an error. + return; } } } - CAOSDB_LOG_DEBUG(logger_name) << "RPC Request: " << RequestToString(); - handler_ = std::make_unique<EntityTransactionHandler>(&handler_, entity_service.get(), - &completion_queue, request, response); - this->status = ProcessCalls(); - if (this->status.GetCode() != StatusCode::EXECUTING) { - return StatusCode::EXECUTING; + if (this->status.GetCode() == StatusCode::EXECUTING) { + { + TRANSACTION_SYNCRONIZED_BLOCK + if (this->status.GetCode() == StatusCode::EXECUTING) { + CAOSDB_LOG_DEBUG(logger_name) << "RPC Request: " << RequestToString(); + handler_ = std::make_unique<EntityTransactionHandler>(&handler_, entity_service.get(), + &completion_queue, request, response); + } + } + this->status = ProcessCalls(); } // file download afterwards @@ -267,67 +279,98 @@ auto Transaction::ExecuteAsynchronously() noexcept -> StatusCode { // NOLINT for (const auto &item : download_files) { auto file_descriptor(item.second); - CAOSDB_LOG_INFO(logger_name) << "Downloading " << file_descriptor.local_path; + { + TRANSACTION_SYNCRONIZED_BLOCK + if (this->status.GetCode() == StatusCode::EXECUTING) { + CAOSDB_LOG_INFO(logger_name) << "Downloading " << file_descriptor.local_path; - handler_ = std::make_unique<DownloadRequestHandler>(&handler_, file_service.get(), - &completion_queue, file_descriptor); + handler_ = std::make_unique<DownloadRequestHandler>(&handler_, file_service.get(), + &completion_queue, file_descriptor); + } + } this->status = ProcessCalls(); if (this->status.GetCode() != StatusCode::EXECUTING) { - return StatusCode::EXECUTING; + // this indicates an error during the download + return; } } } + if (this->status.GetCode() == StatusCode::EXECUTING) { + ProcessTerminated(); + } +} + +auto Transaction::ExecuteAsynchronously() noexcept -> StatusCode { + TRANSACTION_SYNCRONIZED_BLOCK + if (this->status.GetCode() != StatusCode::READY && this->status.GetCode() != StatusCode::GO_ON) { + return StatusCode::TRANSACTION_STATUS_ERROR; + } + switch (this->transaction_type) { + case MIXED_READ_AND_WRITE: + CAOSDB_LOG_ERROR_AND_RETURN_STATUS( + logger_name, StatusCode::UNSUPPORTED_FEATURE, + "MIXED_WRITE UNSUPPORTED: The current implementation does not support " + "mixed read and write transactions (containing retrievals, insertions, " + "deletions, and updates in one transaction).") + default: + break; + } + this->status = TransactionStatus::EXECUTING(); + + this->transaction_future = std::async(std::launch::async, [this]() { DoExecuteTransaction(); }); + return StatusCode::EXECUTING; } -// TODO(tf) This has apparently a cognitive complexity of 36>25 (threshold). -auto Transaction::WaitForIt() const noexcept -> TransactionStatus { // NOLINT - if (this->status.GetCode() != StatusCode::EXECUTING) { - return this->status; +auto Transaction::ProcessRetrieveResponse(RetrieveResponse *retrieve_response, + std::vector<std::unique_ptr<Entity>> *entities, + bool *set_error) const noexcept + -> std::unique_ptr<Entity> { + std::unique_ptr<Entity> result; + switch (retrieve_response->retrieve_response_case()) { + case RetrieveResponseCase::kEntityResponse: { + auto *retrieve_entity_response = retrieve_response->release_entity_response(); + result = std::make_unique<Entity>(retrieve_entity_response); + } break; + case RetrieveResponseCase::kSelectResult: { + CAOSDB_LOG_ERROR(logger_name) << "Results of a SELECT query cannot be " + "processed by this client yet."; + // TODO(tf) Select queries + } break; + case RetrieveResponseCase::kCountResult: { + this->query_count = retrieve_response->count_result(); + } break; + case RetrieveResponseCase::kFindResult: { + std::unique_ptr<Entity> find_result; + for (auto &entity_response : *retrieve_response->mutable_find_result()->mutable_result_set()) { + find_result = std::make_unique<Entity>(&entity_response); + if (find_result->HasErrors()) { + *set_error = true; + } + entities->push_back(std::move(find_result)); + } + } break; + default: + CAOSDB_LOG_FATAL(logger_name) << "Received invalid QueryResponseCase."; + break; } - this->status = TransactionStatus::SUCCESS(); + return result; +} + +auto Transaction::ProcessTerminated() const noexcept -> void { + CAOSDB_LOG_TRACE_ENTER_AND_LEAVE(logger_name, "Transaction::ProcessTerminated()") bool set_error = false; auto *responses = this->response->mutable_responses(); std::vector<std::unique_ptr<Entity>> entities; + for (auto &sub_response : *responses) { std::unique_ptr<Entity> result; switch (sub_response.transaction_response_case()) { - case TransactionResponseCase::kRetrieveResponse: { auto *retrieve_response = sub_response.mutable_retrieve_response(); - - switch (retrieve_response->retrieve_response_case()) { - case RetrieveResponseCase::kEntityResponse: { - auto *retrieve_entity_response = retrieve_response->mutable_entity_response(); - result = std::make_unique<Entity>(retrieve_entity_response); - } break; - case RetrieveResponseCase::kSelectResult: { - CAOSDB_LOG_ERROR(logger_name) << "Results of a SELECT query cannot be " - "processed by this client yet."; - // TODO(tf) Select queries - } break; - case RetrieveResponseCase::kCountResult: { - this->query_count = retrieve_response->count_result(); - } break; - case RetrieveResponseCase::kFindResult: { - std::unique_ptr<Entity> find_result; - for (auto &entity_response : - *retrieve_response->mutable_find_result()->mutable_result_set()) { - find_result = std::make_unique<Entity>(&entity_response); - if (find_result->HasErrors()) { - set_error = true; - } - entities.push_back(std::move(find_result)); - } - } break; - default: - CAOSDB_LOG_FATAL(logger_name) << "Received invalid QueryResponseCase."; - break; - } - + result = ProcessRetrieveResponse(retrieve_response, &entities, &set_error); break; // break TransactionResponseCase::kRetrieveResponse } - case TransactionResponseCase::kInsertResponse: { auto *inserted_id_response = sub_response.mutable_insert_response()->mutable_id_response(); result = std::make_unique<Entity>(inserted_id_response); @@ -367,12 +410,31 @@ auto Transaction::WaitForIt() const noexcept -> TransactionStatus { // NOLINT if (set_error) { this->status = TransactionStatus::TRANSACTION_ERROR("The request terminated with errors."); + } else { + this->status = TransactionStatus::SUCCESS(); + } +} + +auto Transaction::WaitForIt() const noexcept -> TransactionStatus { + if (this->status.GetCode() != StatusCode::EXECUTING) { + return this->status; } + this->transaction_future.wait(); + return this->status; } +// NOLINTNEXTLINE auto Transaction::ProcessCalls() -> TransactionStatus { + CAOSDB_LOG_TRACE_ENTER_AND_LEAVE(logger_name, "Transaction::ProcessCalls()") + if (this->status.GetCode() != StatusCode::EXECUTING) { + CAOSDB_LOG_ERROR(logger_name) + << "Transaction::ProcessCalls() was called, TransactionStatus was: " + << std::to_string(this->status.GetCode()) << " - " << this->status.GetDescription(); + return status; + } + gpr_timespec deadline; deadline.tv_sec = 1; deadline.tv_nsec = 0; @@ -380,6 +442,7 @@ auto Transaction::ProcessCalls() -> TransactionStatus { TransactionStatus result = TransactionStatus::EXECUTING(); handler_->Start(); + void *tag = nullptr; bool ok = false; while (true) { @@ -390,20 +453,17 @@ auto Transaction::ProcessCalls() -> TransactionStatus { if (!res) { // The handler has finished it's work result = handler_->GetStatus(); - handler_.reset(); return result; } } else { std::string description("Invalid tag delivered by notification queue."); CAOSDB_LOG_ERROR(logger_name) << description; - handler_.reset(); return TransactionStatus::RPC_ERROR(description); } } break; case NextStatus::SHUTDOWN: { CAOSDB_LOG_ERROR(logger_name) << "Notification queue has been shut down unexpectedly."; result = handler_->GetStatus(); - handler_.reset(); return result; } break; case NextStatus::TIMEOUT: { @@ -412,17 +472,37 @@ auto Transaction::ProcessCalls() -> TransactionStatus { default: CAOSDB_LOG_FATAL(logger_name) << "Got an invalid NextStatus from CompletionQueue."; result = handler_->GetStatus(); - handler_.reset(); return result; } } + result = handler_->GetStatus(); - handler_.reset(); return result; } Transaction::~Transaction() { + CAOSDB_LOG_TRACE_ENTER_AND_LEAVE(logger_name, "Transaction::~Transaction()") this->Cancel(); +} + +void Transaction::Cancel() { + CAOSDB_LOG_TRACE_ENTER_AND_LEAVE(logger_name, "Transaction::Cancel()") + + TRANSACTION_SYNCRONIZED_BLOCK + + if (this->status.GetCode() > 0) { + // Prevent canceling before the queue even started. + // Temporary fix for a bug in GRPC. + // Fix is in the making: + // https://github.com/grpc/grpc/pull/30004 + using namespace std::chrono_literals; + std::this_thread::sleep_for(5ms); + } + + this->status = TransactionStatus::CANCELLED(); + if (handler_ != nullptr) { + handler_->Cancel(); + } completion_queue.Shutdown(); @@ -432,12 +512,9 @@ Transaction::~Transaction() { while (completion_queue.Next(&ignoredTag, &ok)) { ; } -} -void Transaction::Cancel() { - // TODO(tf) State Canceled - if (handler_ != nullptr) { - handler_->Cancel(); + if (transaction_future.valid()) { + transaction_future.wait(); } } diff --git a/src/caosdb/transaction_handler.cpp b/src/caosdb/transaction_handler.cpp index fd2b527e47dc411290ef05d629899c335788bc36..cbcbf76332f437712ea3883be72c347370f75924 100644 --- a/src/caosdb/transaction_handler.cpp +++ b/src/caosdb/transaction_handler.cpp @@ -16,17 +16,13 @@ EntityTransactionHandler::EntityTransactionHandler(HandlerTag tag, response_(response) {} void EntityTransactionHandler::handleNewCallState() { - CAOSDB_LOG_TRACE(logger_name) << "Enter EntityTransactionHandler::handleNewCallState with " - "CompletionQueue " - << completion_queue; + CAOSDB_LOG_TRACE_ENTER_AND_LEAVE(logger_name, "EntityTransactionHandler::handleNewCallState()") rpc_ = stub_->PrepareAsyncMultiTransaction(&call_context, *request_, completion_queue); state_ = CallState::CallComplete; rpc_->StartCall(); rpc_->Finish(response_, &status_, tag_); - - CAOSDB_LOG_TRACE(logger_name) << "Leave EntityTransactionHandler::handleNewCallState"; } } // namespace caosdb::transaction diff --git a/src/caosdb/unary_rpc_handler.cpp b/src/caosdb/unary_rpc_handler.cpp index 86320ca2a94b0b67bd362e0e76fe0f31ddd78007..6770cae1981fa193105d5af9af4934d90a6b0aca 100644 --- a/src/caosdb/unary_rpc_handler.cpp +++ b/src/caosdb/unary_rpc_handler.cpp @@ -59,6 +59,7 @@ namespace caosdb::transaction { bool UnaryRpcHandler::OnNext(bool ok) { + CAOSDB_LOG_TRACE_ENTER_AND_LEAVE(logger_name, "UnaryRpcHandler::OnNext(bool)") try { if (ok) { if (state_ == CallState::NewCall) { @@ -99,10 +100,14 @@ bool UnaryRpcHandler::OnNext(bool ok) { return false; } -void UnaryRpcHandler::Cancel() { call_context.TryCancel(); } +void UnaryRpcHandler::Cancel() { + state_ = CallState::CallComplete; + transaction_status = TransactionStatus::CANCELLED(); + call_context.TryCancel(); +} void UnaryRpcHandler::handleCallCompleteState() { - CAOSDB_LOG_TRACE(logger_name) << "Enter UnaryRpcHandler::handleCallCompleteState"; + CAOSDB_LOG_TRACE_ENTER_AND_LEAVE(logger_name, "UnaryRpcHandler::handleCallCompleteState()") switch (status_.error_code()) { case grpc::OK: @@ -117,8 +122,6 @@ void UnaryRpcHandler::handleCallCompleteState() { << "): " << description; break; } - - CAOSDB_LOG_TRACE(logger_name) << "Leave UnaryRpcHandler::handleCallCompleteState"; } } // namespace caosdb::transaction diff --git a/test/test_entity.cpp b/test/test_entity.cpp index 5f5aaaab4a5f6727f38ea30c0205ea8d79a925b1..a42e91781922c5e8b9530663b5bdbc53d4532fe3 100644 --- a/test/test_entity.cpp +++ b/test/test_entity.cpp @@ -39,6 +39,7 @@ #include <stdexcept> // for out_of_range #include <string> // for operator+, to_string #include <utility> // for move +// IWYU pragma: no_include "net/proto2/public/repeated_field.h" namespace caosdb::entity { using caosdb::entity::v1::IdResponse; diff --git a/test/test_issues.cpp b/test/test_issues.cpp index 389c8ee40c12a89a347a811cca5a4e7dd756068e..b7a6cc8fe7df2bbcf8b2db0902c57c7146b77faf 100644 --- a/test/test_issues.cpp +++ b/test/test_issues.cpp @@ -37,11 +37,13 @@ TEST(test_issues, test_issue_11) { Connection connection(configuration); auto transaction = connection.CreateTransaction(); - ASSERT_EQ(transaction->GetResultSet().size(), 0); + EXPECT_EQ(transaction->GetResultSet().size(), 0); transaction->RetrieveById("100"); - ASSERT_EQ(StatusCode::EXECUTING, transaction->ExecuteAsynchronously()); + EXPECT_EQ(StatusCode::EXECUTING, transaction->ExecuteAsynchronously()); // Trying to obtain ResultSet while it is still empty. - ASSERT_EQ(transaction->GetResultSet().size(), 0); + EXPECT_EQ(transaction->GetResultSet().size(), 0); + transaction->WaitForIt(); + EXPECT_EQ(transaction->GetResultSet().size(), 0); } } // namespace caosdb::transaction diff --git a/test/test_transaction.cpp b/test/test_transaction.cpp index 797b6acc37a43a882b83aa540f17124b462add02..e710b6c775615968b567b6efa64ba76e56717c69 100644 --- a/test/test_transaction.cpp +++ b/test/test_transaction.cpp @@ -37,6 +37,7 @@ #include <string> // for string, basic_string #include <utility> // for move #include <vector> // for vector +// IWYU pragma: no_include "net/proto2/public/repeated_field.h" namespace caosdb::transaction { using caosdb::configuration::InsecureConnectionConfiguration; @@ -209,7 +210,7 @@ TEST(test_transaction, test_retrieve_and_download) { EXPECT_EQ(transaction->GetStatus().GetCode(), StatusCode::GO_ON); EXPECT_EQ(transaction->ExecuteAsynchronously(), StatusCode::EXECUTING); - EXPECT_EQ(transaction->GetStatus().GetCode(), StatusCode::CONNECTION_ERROR); + EXPECT_EQ(transaction->WaitForIt().GetCode(), StatusCode::CONNECTION_ERROR); } TEST(test_transaction, test_insert_with_file) { @@ -226,7 +227,7 @@ TEST(test_transaction, test_insert_with_file) { EXPECT_EQ(transaction->GetUploadFiles().size(), 1); transaction->ExecuteAsynchronously(); - EXPECT_EQ(transaction->GetStatus().GetCode(), StatusCode::FILE_UPLOAD_ERROR); + EXPECT_EQ(transaction->WaitForIt().GetCode(), StatusCode::FILE_UPLOAD_ERROR); } TEST(test_transaction, test_copy_result_set) { @@ -251,4 +252,39 @@ TEST(test_transaction, test_copy_result_set) { } } +TEST(test_transaction, test_multiple_execute) { + const auto *host = "localhost"; + auto configuration = InsecureConnectionConfiguration(host, 8000); + Connection connection(configuration); + auto transaction = connection.CreateTransaction(); + + EXPECT_EQ(transaction->GetStatus().GetCode(), StatusCode::INITIAL); + transaction->RetrieveById("asdf"); + + EXPECT_EQ(transaction->GetStatus().GetCode(), StatusCode::GO_ON); + + EXPECT_EQ(transaction->ExecuteAsynchronously(), StatusCode::EXECUTING); + EXPECT_EQ(transaction->ExecuteAsynchronously(), StatusCode::TRANSACTION_STATUS_ERROR); + EXPECT_EQ(transaction->ExecuteAsynchronously(), StatusCode::TRANSACTION_STATUS_ERROR); + transaction->Cancel(); +} + +TEST(test_transaction, test_multiple_wait_for_it) { + const auto *host = "localhost"; + auto configuration = InsecureConnectionConfiguration(host, 8000); + Connection connection(configuration); + auto transaction = connection.CreateTransaction(); + + EXPECT_EQ(transaction->GetStatus().GetCode(), StatusCode::INITIAL); + transaction->RetrieveById("asdf"); + + EXPECT_EQ(transaction->GetStatus().GetCode(), StatusCode::GO_ON); + + EXPECT_EQ(transaction->ExecuteAsynchronously(), StatusCode::EXECUTING); + + EXPECT_EQ(transaction->WaitForIt().GetCode(), StatusCode::CONNECTION_ERROR); + EXPECT_EQ(transaction->WaitForIt().GetCode(), StatusCode::CONNECTION_ERROR); + EXPECT_EQ(transaction->WaitForIt().GetCode(), StatusCode::CONNECTION_ERROR); +} + } // namespace caosdb::transaction