diff --git a/include/caosdb/logging.h b/include/caosdb/logging.h index 35c5fdfa1a72ba5f55933f3bd99aa93f313c3e33..d684c47b26e1252df0540562a01f0a221fa83f02 100644 --- a/include/caosdb/logging.h +++ b/include/caosdb/logging.h @@ -42,6 +42,26 @@ typedef boost::log::sources::severity_channel_logger_mt<int, std::string> boost_ BOOST_LOG_GLOBAL_LOGGER(logger, boost_logger_class) +/** + * Helper class for logging the entering and leaving of a function or method. + * + * Please Use the macro + * + * CAOSDB_LOG_TRACE_ENTER_AND_LEAVE(logger_name, function_name); + */ +class TraceEnterLeaveLogger { +public: + inline TraceEnterLeaveLogger(const std::string &channel, const std::string &function_name) : channel(channel), function_name(function_name) { + BOOST_LOG_CHANNEL_SEV(caosdb::logging::logger::get(), this->channel, CAOSDB_LOG_LEVEL_TRACE) << "Enter " << this->function_name; + } + inline ~TraceEnterLeaveLogger() { + BOOST_LOG_CHANNEL_SEV(caosdb::logging::logger::get(), this->channel, CAOSDB_LOG_LEVEL_TRACE) << "Leave " << this->function_name; + } +private: + const std::string &channel; + const std::string function_name; +}; + /** * This class stores the integer log level. */ @@ -199,6 +219,9 @@ void caosdb_log_trace(const char *channel, const char *msg); #define CAOSDB_LOG_TRACE(Channel) \ BOOST_LOG_CHANNEL_SEV(caosdb::logging::logger::get(), Channel, CAOSDB_LOG_LEVEL_TRACE) +#define CAOSDB_LOG_TRACE_ENTER_AND_LEAVE(Channel, FunctionName) \ + const caosdb::logging::TraceEnterLeaveLogger trace_enter_leave_logger(Channel, FunctionName); + #define CAOSDB_LOG_ERROR_AND_RETURN_STATUS(Channel, StatusCode, Message) \ CAOSDB_LOG_ERROR(Channel) << "StatusCode (" << StatusCode << ") " \ << caosdb::get_status_description(StatusCode) << ": " << Message; \ diff --git a/include/caosdb/protobuf_helper.h b/include/caosdb/protobuf_helper.h index 8574845b194f0080d2b0f1d07475c4bf1c3f82bd..5573aca71463f3505b8580d82cfc0200487ee05b 100644 --- a/include/caosdb/protobuf_helper.h +++ b/include/caosdb/protobuf_helper.h @@ -22,11 +22,11 @@ #ifndef CAOSDB_PROTOBUF_HELPER_H #define CAOSDB_PROTOBUF_HELPER_H -#include "caosdb/status_code.h" // for StatusCode, SUCCESS -#include <google/protobuf/arena.h> // for Arena -#include <google/protobuf/extension_set.h> // for Arena -#include <google/protobuf/util/json_util.h> // for JsonOptions, MessageToJs... -#include <string> // for string +#include "caosdb/status_code.h" // for StatusCode, SUCCESS +#include <google/protobuf/arena.h> // for Arena +#include <google/protobuf/generated_message_util.h> // for Arena +#include <google/protobuf/util/json_util.h> // for JsonOptions, MessageToJs... +#include <string> // for string #define CAOSDB_DEBUG_MESSAGE_STRING(message, out) \ std::string out; \ diff --git a/include/caosdb/transaction.h b/include/caosdb/transaction.h index 153ee46f8173d98a13203f31cd6a140999a58689..d9cf3fd614c8d32da83f2721479e7b57d08c8c83 100644 --- a/include/caosdb/transaction.h +++ b/include/caosdb/transaction.h @@ -30,19 +30,21 @@ #include "caosdb/protobuf_helper.h" // for get_arena #include "caosdb/status_code.h" // for StatusCode #include "caosdb/transaction_status.h" // for StatusCode +#include <algorithm> // for max #include <boost/log/core/record.hpp> // for record #include <boost/log/sources/record_ostream.hpp> // for basic_record_o... #include <boost/preprocessor/seq/limits/enum_256.hpp> // for BOOST_PP_SEQ_E... #include <boost/preprocessor/seq/limits/size_256.hpp> // for BOOST_PP_SEQ_S... +#include <future> // for async, future #include <google/protobuf/arena.h> // for Arena #include <google/protobuf/generated_message_util.h> // for CreateMessage... #include <google/protobuf/util/json_util.h> // for MessageToJsonS... #include <grpcpp/impl/codegen/completion_queue.h> // for CompletionQueue -#include <algorithm> // for max #include <iterator> // for iterator, next #include <map> // for map // IWYU pragma: no_include <ext/alloc_traits.h> #include <memory> // for unique_ptr +#include <mutex> // for mutex #include <string> // for string #include <utility> // for move #include <vector> // for vector @@ -177,6 +179,7 @@ using caosdb::entity::v1alpha1::MultiTransactionRequest; using caosdb::entity::v1alpha1::MultiTransactionResponse; using caosdb::entity::v1alpha1::RegisterFileUploadRequest; using caosdb::entity::v1alpha1::RegisterFileUploadResponse; +using caosdb::entity::v1alpha1::RetrieveResponse; using caosdb::transaction::TransactionStatus; using TransactionResponseCase = caosdb::entity::v1alpha1::TransactionResponse::TransactionResponseCase; @@ -513,6 +516,13 @@ protected: inline auto GetArena() const -> Arena * { return get_arena(); } private: + auto ProcessTerminated() const noexcept -> TransactionStatus; + auto ProcessRetrieveResponse(RetrieveResponse *retrieve_response, + std::vector<std::unique_ptr<Entity>> *entities, + bool *set_error) const noexcept -> std::unique_ptr<Entity>; + auto DoExecuteTransaction() noexcept -> StatusCode; + mutable std::mutex transaction_mutex; + mutable std::future<StatusCode> transaction_future; grpc::CompletionQueue completion_queue; std::unique_ptr<HandlerInterface> handler_; diff --git a/include/caosdb/transaction_status.h b/include/caosdb/transaction_status.h index bdfd65595090acde2b177dd7486492738901b0e0..d46f3911aa17fbf518fd43b91e3568e4e4740c2f 100644 --- a/include/caosdb/transaction_status.h +++ b/include/caosdb/transaction_status.h @@ -92,6 +92,13 @@ public: * This status means that the transaction has been executed successfully. */ CAOSDB_TRANSACTION_STATUS_DEFAULT_FACTORY(SUCCESS, StatusCode::SUCCESS) + /** + * Factory for a CANCELLED status. + * + * This status means that the transaction has been canceled and should not be + * used anymore. + */ + CAOSDB_TRANSACTION_STATUS_DEFAULT_FACTORY(CANCELLED, StatusCode::CANCELLED) /** * Factory for a CONNECTION_ERROR status. * diff --git a/src/caosdb/entity.cpp b/src/caosdb/entity.cpp index c409a46e8ad37aee93b67b2a36109b1fe8f8c640..cea12b41df4cd9c2dea7bd55c9052f44a7b06fe8 100644 --- a/src/caosdb/entity.cpp +++ b/src/caosdb/entity.cpp @@ -20,12 +20,12 @@ * */ #include "caosdb/entity.h" -#include "caosdb/data_type.h" // for DataType -#include "caosdb/entity/v1alpha1/main.pb.h" // for Messages -#include "caosdb/protobuf_helper.h" // for get_arena -#include "caosdb/value.h" // for Value -#include <google/protobuf/arena.h> // for Arena -#include <new> // for operator new +#include "caosdb/data_type.h" // for DataType +#include "caosdb/entity/v1alpha1/main.pb.h" // for Messages +#include "caosdb/protobuf_helper.h" // for get_arena +#include "caosdb/value.h" // for Value +#include <google/protobuf/arena.h> // for Arena +#include <new> // for operator new namespace caosdb::entity { using ProtoParent = caosdb::entity::v1alpha1::Parent; diff --git a/src/caosdb/protobuf_helper.cpp b/src/caosdb/protobuf_helper.cpp index e8bbd07834ead9b561c7e8769ed834527337f7a6..418d14b9c847bc204582f6165fae81bf6adcc156 100644 --- a/src/caosdb/protobuf_helper.cpp +++ b/src/caosdb/protobuf_helper.cpp @@ -19,8 +19,7 @@ * */ #include "caosdb/protobuf_helper.h" -#include <google/protobuf/arena.h> // for Arena -#include <google/protobuf/extension_set.h> // for Arena +#include <google/protobuf/arena.h> // for Arena namespace caosdb::utility { diff --git a/src/caosdb/transaction.cpp b/src/caosdb/transaction.cpp index 678d6fbd728cdd82158e263a476f4ce63bc19310..7b686ca055e7111afebad9f668f9da80091d2065 100644 --- a/src/caosdb/transaction.cpp +++ b/src/caosdb/transaction.cpp @@ -28,14 +28,16 @@ #include "caosdb/status_code.h" // for StatusCode #include "caosdb/transaction_handler.h" // for EntityTransactionHandler #include <algorithm> // for max +// IWYU pragma: no_include <bits/exception.h> #include <boost/filesystem/path.hpp> // for operator<<, path #include <boost/log/core/record.hpp> // for record #include <boost/log/detail/attachable_sstream_buf.hpp> // for basic_ostring... #include <boost/log/sources/record_ostream.hpp> // for basic_record_... #include <boost/preprocessor/seq/limits/enum_256.hpp> // for BOOST_PP_SEQ_... #include <boost/preprocessor/seq/limits/size_256.hpp> // for BOOST_PP_SEQ_... -// IWYU pragma: no_include <bits/exception.h> +// IWYU pragma: no_include <cxxabi.h> #include <exception> // IWYU pragma: keep +#include <future> // for async, future #include <google/protobuf/arena.h> // for Arena #include <google/protobuf/generated_message_util.h> // for CreateMessage... #include <grpc/impl/codegen/gpr_types.h> // for gpr_timespec @@ -43,8 +45,10 @@ #include <iosfwd> // for streamsize #include <map> // for map, operator!= #include <memory> // for unique_ptr +#include <system_error> // for std::system_error #include <utility> // for move, pair + namespace caosdb::transaction { using caosdb::entity::v1alpha1::EntityTransactionService; using caosdb::entity::v1alpha1::FileTransmissionService; @@ -53,6 +57,7 @@ using caosdb::entity::v1alpha1::MultiTransactionResponse; using TransactionResponseCase = caosdb::entity::v1alpha1::TransactionResponse::TransactionResponseCase; using RetrieveResponseCase = caosdb::entity::v1alpha1::RetrieveResponse::RetrieveResponseCase; +using RetrieveResponse = caosdb::entity::v1alpha1::RetrieveResponse; using ProtoEntity = caosdb::entity::v1alpha1::Entity; using google::protobuf::Arena; using NextStatus = grpc::CompletionQueue::NextStatus; @@ -188,29 +193,13 @@ auto Transaction::Execute() -> TransactionStatus { return status; } -// TODO(tf) This has apparently a cognitive complexity of 39>25 (threshold). -auto Transaction::ExecuteAsynchronously() noexcept -> StatusCode { // NOLINT - if (!IsStatus(TransactionStatus::READY()) && !IsStatus(TransactionStatus::GO_ON())) { - return StatusCode::TRANSACTION_STATUS_ERROR; - } - switch (this->transaction_type) { - case MIXED_READ_AND_WRITE: - CAOSDB_LOG_ERROR_AND_RETURN_STATUS( - logger_name, StatusCode::UNSUPPORTED_FEATURE, - "MIXED_WRITE UNSUPPORTED: The current implementation does not support " - "mixed read and write transactions (containing retrievals, insertions, " - "deletions, and updates in one transaction).") - default: - break; - } - this->status = TransactionStatus::EXECUTING(); - +auto Transaction::DoExecuteTransaction() noexcept -> StatusCode { // upload files first if (!upload_files.empty()) { CAOSDB_LOG_INFO(logger_name) << "Number of files to be uploaded: " << upload_files.size(); - auto *registration_request = Arena::CreateMessage<RegisterFileUploadRequest>(GetArena()); - auto *registration_response = Arena::CreateMessage<RegisterFileUploadResponse>(GetArena()); + auto *registration_request = Arena::CreateMessage<RegisterFileUploadRequest>(get_arena()); + auto *registration_response = Arena::CreateMessage<RegisterFileUploadResponse>(get_arena()); handler_ = std::make_unique<RegisterFileUploadHandler>(&handler_, file_service.get(), &completion_queue, @@ -219,7 +208,7 @@ auto Transaction::ExecuteAsynchronously() noexcept -> StatusCode { // NOLINT if (registration_response->status() != RegistrationStatus::REGISTRATION_STATUS_ACCEPTED) { this->status = TransactionStatus::FILE_UPLOAD_ERROR(); - return StatusCode::EXECUTING; + return this->status.GetCode(); } for (auto &file_descriptor : upload_files) { @@ -230,17 +219,17 @@ auto Transaction::ExecuteAsynchronously() noexcept -> StatusCode { // NOLINT &completion_queue, file_descriptor); this->status = ProcessCalls(); if (this->status.GetCode() != StatusCode::EXECUTING) { - return StatusCode::EXECUTING; + // this indicates an error during the upload + return this->status.GetCode(); } } } - CAOSDB_LOG_DEBUG(logger_name) << "RPC Request: " << RequestToString(); - handler_ = std::make_unique<EntityTransactionHandler>(&handler_, entity_service.get(), - &completion_queue, request, response); - this->status = ProcessCalls(); - if (this->status.GetCode() != StatusCode::EXECUTING) { - return StatusCode::EXECUTING; + if(this->status.GetCode() == StatusCode::EXECUTING) { + CAOSDB_LOG_DEBUG(logger_name) << "RPC Request: " << RequestToString(); + handler_ = std::make_unique<EntityTransactionHandler>(&handler_, entity_service.get(), + &completion_queue, request, response); + this->status = ProcessCalls(); } // file download afterwards @@ -266,61 +255,84 @@ auto Transaction::ExecuteAsynchronously() noexcept -> StatusCode { // NOLINT &completion_queue, file_descriptor); this->status = ProcessCalls(); if (this->status.GetCode() != StatusCode::EXECUTING) { - return StatusCode::EXECUTING; + // this indicates an error during the download + return this->status.GetCode(); } } } + return this->status.GetCode(); +} + +auto Transaction::ExecuteAsynchronously() noexcept -> StatusCode { + if (!IsStatus(TransactionStatus::READY()) && !IsStatus(TransactionStatus::GO_ON())) { + return StatusCode::TRANSACTION_STATUS_ERROR; + } + switch (this->transaction_type) { + case MIXED_READ_AND_WRITE: + CAOSDB_LOG_ERROR_AND_RETURN_STATUS( + logger_name, StatusCode::UNSUPPORTED_FEATURE, + "MIXED_WRITE UNSUPPORTED: The current implementation does not support " + "mixed read and write transactions (containing retrievals, insertions, " + "deletions, and updates in one transaction).") + default: + break; + } + this->status = TransactionStatus::EXECUTING(); + + this->transaction_future = + std::async(std::launch::async, [this]() { return this->DoExecuteTransaction(); }); + return StatusCode::EXECUTING; } -// TODO(tf) This has apparently a cognitive complexity of 36>25 (threshold). -auto Transaction::WaitForIt() const noexcept -> TransactionStatus { // NOLINT - if (this->status.GetCode() != StatusCode::EXECUTING) { - return this->status; +auto Transaction::ProcessRetrieveResponse(RetrieveResponse *retrieve_response, + std::vector<std::unique_ptr<Entity>> *entities, + bool *set_error) const noexcept + -> std::unique_ptr<Entity> { + std::unique_ptr<Entity> result; + switch (retrieve_response->retrieve_response_case()) { + case RetrieveResponseCase::kEntityResponse: { + auto *retrieve_entity_response = retrieve_response->release_entity_response(); + result = std::make_unique<Entity>(retrieve_entity_response); + } break; + case RetrieveResponseCase::kSelectResult: { + CAOSDB_LOG_ERROR(logger_name) << "Results of a SELECT query cannot be " + "processed by this client yet."; + // TODO(tf) Select queries + } break; + case RetrieveResponseCase::kCountResult: { + this->query_count = retrieve_response->count_result(); + } break; + case RetrieveResponseCase::kFindResult: { + std::unique_ptr<Entity> find_result; + for (auto &entity_response : *retrieve_response->mutable_find_result()->mutable_result_set()) { + find_result = std::make_unique<Entity>(&entity_response); + if (find_result->HasErrors()) { + *set_error = true; + } + entities->push_back(std::move(find_result)); + } + } break; + default: + CAOSDB_LOG_FATAL(logger_name) << "Received invalid QueryResponseCase."; + break; } - this->status = TransactionStatus::SUCCESS(); + return result; +} + +auto Transaction::ProcessTerminated() const noexcept -> TransactionStatus { bool set_error = false; auto *responses = this->response->mutable_responses(); std::vector<std::unique_ptr<Entity>> entities; + for (auto &sub_response : *responses) { std::unique_ptr<Entity> result; switch (sub_response.transaction_response_case()) { - case TransactionResponseCase::kRetrieveResponse: { auto *retrieve_response = sub_response.mutable_retrieve_response(); - - switch (retrieve_response->retrieve_response_case()) { - case RetrieveResponseCase::kEntityResponse: { - auto *retrieve_entity_response = retrieve_response->release_entity_response(); - result = std::make_unique<Entity>(retrieve_entity_response); - } break; - case RetrieveResponseCase::kSelectResult: { - CAOSDB_LOG_ERROR(logger_name) << "Results of a SELECT query cannot be " - "processed by this client yet."; - // TODO(tf) Select queries - } break; - case RetrieveResponseCase::kCountResult: { - this->query_count = retrieve_response->count_result(); - } break; - case RetrieveResponseCase::kFindResult: { - std::unique_ptr<Entity> find_result; - for (auto &entity_response : - *retrieve_response->mutable_find_result()->mutable_result_set()) { - find_result = std::make_unique<Entity>(&entity_response); - if (find_result->HasErrors()) { - set_error = true; - } - entities.push_back(std::move(find_result)); - } - } break; - default: - CAOSDB_LOG_FATAL(logger_name) << "Received invalid QueryResponseCase."; - break; - } - + result = ProcessRetrieveResponse(retrieve_response, &entities, &set_error); break; // break TransactionResponseCase::kRetrieveResponse } - case TransactionResponseCase::kInsertResponse: { auto *inserted_id_response = sub_response.mutable_insert_response()->release_id_response(); result = std::make_unique<Entity>(inserted_id_response); @@ -360,12 +372,29 @@ auto Transaction::WaitForIt() const noexcept -> TransactionStatus { // NOLINT if (set_error) { this->status = TransactionStatus::TRANSACTION_ERROR("The request terminated with errors."); + } else { + this->status = TransactionStatus::SUCCESS(); } return this->status; } +auto Transaction::WaitForIt() const noexcept -> TransactionStatus { + if (this->status.GetCode() != StatusCode::EXECUTING) { + return this->status; + } + + this->transaction_future.wait(); + + if(this->status.GetCode() != StatusCode::EXECUTING) { + return this->status; + } + + return ProcessTerminated(); +} + auto Transaction::ProcessCalls() -> TransactionStatus { + CAOSDB_LOG_TRACE_ENTER_AND_LEAVE(logger_name, "Transaction::ProcessCalls()") gpr_timespec deadline; deadline.tv_sec = 1; deadline.tv_nsec = 0; @@ -373,6 +402,7 @@ auto Transaction::ProcessCalls() -> TransactionStatus { TransactionStatus result = TransactionStatus::EXECUTING(); handler_->Start(); + void *tag = nullptr; bool ok = false; while (true) { @@ -409,12 +439,13 @@ auto Transaction::ProcessCalls() -> TransactionStatus { return result; } } + result = handler_->GetStatus(); - handler_.reset(); return result; } Transaction::~Transaction() { + CAOSDB_LOG_TRACE(logger_name) << "Enter Transaction::~Transaction()"; this->Cancel(); completion_queue.Shutdown(); @@ -425,10 +456,12 @@ Transaction::~Transaction() { while (completion_queue.Next(&ignoredTag, &ok)) { ; } + CAOSDB_LOG_TRACE(logger_name) << "Leave Transaction::~Transaction()"; } void Transaction::Cancel() { // TODO(tf) State Canceled + this->status = TransactionStatus::CANCELLED(); if (handler_ != nullptr) { handler_->Cancel(); } diff --git a/src/caosdb/unary_rpc_handler.cpp b/src/caosdb/unary_rpc_handler.cpp index 9a61a534ec30da943f7637d9995a2595acf96804..90e007e816feb48a68f5eb150e6f70267d24952a 100644 --- a/src/caosdb/unary_rpc_handler.cpp +++ b/src/caosdb/unary_rpc_handler.cpp @@ -105,7 +105,7 @@ bool UnaryRpcHandler::OnNext(bool ok) { return false; } -void UnaryRpcHandler::Cancel() { call_context.TryCancel(); } +void UnaryRpcHandler::Cancel() { transaction_status = TransactionStatus::CANCELLED(); call_context.TryCancel(); } void UnaryRpcHandler::handleCallCompleteState() { CAOSDB_LOG_TRACE(logger_name) << "Enter UnaryRpcHandler::handleCallCompleteState"; diff --git a/test/test_issues.cpp b/test/test_issues.cpp index 389c8ee40c12a89a347a811cca5a4e7dd756068e..3e92f4e71d3055575058dcfba5ff3a9ab9de8596 100644 --- a/test/test_issues.cpp +++ b/test/test_issues.cpp @@ -37,11 +37,11 @@ TEST(test_issues, test_issue_11) { Connection connection(configuration); auto transaction = connection.CreateTransaction(); - ASSERT_EQ(transaction->GetResultSet().size(), 0); + EXPECT_EQ(transaction->GetResultSet().size(), 0); transaction->RetrieveById("100"); - ASSERT_EQ(StatusCode::EXECUTING, transaction->ExecuteAsynchronously()); + EXPECT_EQ(StatusCode::EXECUTING, transaction->ExecuteAsynchronously()); // Trying to obtain ResultSet while it is still empty. - ASSERT_EQ(transaction->GetResultSet().size(), 0); + EXPECT_EQ(transaction->GetResultSet().size(), 0); } } // namespace caosdb::transaction diff --git a/test/test_transaction.cpp b/test/test_transaction.cpp index 7c93ac5d9b081ac4cb2314f34272f54cba0f61c9..a98fb469eba1687b093d0884e3eb70ffa8bfc510 100644 --- a/test/test_transaction.cpp +++ b/test/test_transaction.cpp @@ -204,7 +204,7 @@ TEST(test_transaction, test_retrieve_and_download) { EXPECT_EQ(transaction->GetStatus().GetCode(), StatusCode::GO_ON); EXPECT_EQ(transaction->ExecuteAsynchronously(), StatusCode::EXECUTING); - EXPECT_EQ(transaction->GetStatus().GetCode(), StatusCode::CONNECTION_ERROR); + EXPECT_EQ(transaction->WaitForIt().GetCode(), StatusCode::CONNECTION_ERROR); } TEST(test_transaction, test_insert_with_file) { @@ -221,7 +221,7 @@ TEST(test_transaction, test_insert_with_file) { EXPECT_EQ(transaction->GetUploadFiles().size(), 1); transaction->ExecuteAsynchronously(); - EXPECT_EQ(transaction->GetStatus().GetCode(), StatusCode::FILE_UPLOAD_ERROR); + EXPECT_EQ(transaction->WaitForIt().GetCode(), StatusCode::FILE_UPLOAD_ERROR); } TEST(test_transaction, test_copy_result_set) {