From 142dece42f9633bb4310e6123934a31e604713d4 Mon Sep 17 00:00:00 2001 From: Timm Fitschen <t.fitschen@indiscale.com> Date: Wed, 22 Sep 2021 03:24:40 +0200 Subject: [PATCH] EHN: Add locking to async methods --- .../download_request_handler.h | 6 +- .../upload_request_handler.h | 6 +- include/caosdb/logging.h | 27 ++++-- include/caosdb/transaction.h | 18 ++-- include/caosdb/unary_rpc_handler.h | 6 +- src/caosdb/entity.cpp | 12 +-- .../download_request_handler.cpp | 5 +- .../upload_request_handler.cpp | 5 +- src/caosdb/transaction.cpp | 95 ++++++++++++------- src/caosdb/transaction_handler.cpp | 6 +- src/caosdb/unary_rpc_handler.cpp | 6 +- test/test_transaction.cpp | 34 +++++++ 12 files changed, 157 insertions(+), 69 deletions(-) diff --git a/include/caosdb/file_transmission/download_request_handler.h b/include/caosdb/file_transmission/download_request_handler.h index 3af72cc..cd3d05b 100644 --- a/include/caosdb/file_transmission/download_request_handler.h +++ b/include/caosdb/file_transmission/download_request_handler.h @@ -83,7 +83,11 @@ public: DownloadRequestHandler(DownloadRequestHandler &&) = delete; DownloadRequestHandler &operator=(DownloadRequestHandler &&) = delete; - void Start() override { OnNext(true); } + void Start() override { + if (state_ == CallState::NewCall) { + OnNext(true); + } + } bool OnNext(bool ok) override; diff --git a/include/caosdb/file_transmission/upload_request_handler.h b/include/caosdb/file_transmission/upload_request_handler.h index 9965d54..81d84fa 100644 --- a/include/caosdb/file_transmission/upload_request_handler.h +++ b/include/caosdb/file_transmission/upload_request_handler.h @@ -81,7 +81,11 @@ public: UploadRequestHandler(UploadRequestHandler &&) = delete; UploadRequestHandler &operator=(UploadRequestHandler &&) = delete; - void Start() override { OnNext(true); } + void Start() override { + if (state_ == CallState::NewCall) { + OnNext(true); + } + } bool OnNext(bool ok) override; diff --git a/include/caosdb/logging.h b/include/caosdb/logging.h index d684c47..a119c9c 100644 --- a/include/caosdb/logging.h +++ b/include/caosdb/logging.h @@ -24,12 +24,17 @@ #define CAOSDB_LOGGING_H #include "caosdb/log_level.h" // for CAOSDB_LOG_... -#include "boost/log/sources/global_logger_storage.hpp" // for BOOST_LOG_I... -#include "boost/log/sources/record_ostream.hpp" // IWYU pragma: keep -#include "boost/log/sources/severity_channel_logger.hpp" // for BOOST_LOG_C... -#include "boost/log/utility/setup/settings.hpp" // for settings -#include "boost/smart_ptr/intrusive_ptr.hpp" // for intrusive_ptr -#include "boost/smart_ptr/intrusive_ref_counter.hpp" // for intrusive_p... +#include <boost/log/sources/global_logger_storage.hpp> // for BOOST_LOG_I... +#include <boost/log/sources/record_ostream.hpp> // IWYU pragma: keep +#include <boost/log/sources/severity_channel_logger.hpp> // for BOOST_LOG_C... +#include <boost/log/utility/setup/settings.hpp> // for settings +#include <boost/smart_ptr/intrusive_ptr.hpp> // for intrusive_ptr +#include <boost/smart_ptr/intrusive_ref_counter.hpp> // for intrusive_p... +#include <boost/log/core/record.hpp> // for record +#include <boost/log/detail/attachable_sstream_buf.hpp> // for basic_ostri... +#include <boost/preprocessor/seq/limits/enum_256.hpp> // for BOOST_PP_SE... +#include <boost/preprocessor/seq/limits/size_256.hpp> // for BOOST_PP_SE... +#include <iosfwd> // for streamsize #include <memory> // for shared_ptr #include <string> // for string #include <vector> // for vector @@ -51,12 +56,16 @@ BOOST_LOG_GLOBAL_LOGGER(logger, boost_logger_class) */ class TraceEnterLeaveLogger { public: - inline TraceEnterLeaveLogger(const std::string &channel, const std::string &function_name) : channel(channel), function_name(function_name) { - BOOST_LOG_CHANNEL_SEV(caosdb::logging::logger::get(), this->channel, CAOSDB_LOG_LEVEL_TRACE) << "Enter " << this->function_name; + inline TraceEnterLeaveLogger(const std::string &channel, const std::string &function_name) + : channel(channel), function_name(function_name) { + BOOST_LOG_CHANNEL_SEV(caosdb::logging::logger::get(), this->channel, CAOSDB_LOG_LEVEL_TRACE) + << "Enter " << this->function_name; } inline ~TraceEnterLeaveLogger() { - BOOST_LOG_CHANNEL_SEV(caosdb::logging::logger::get(), this->channel, CAOSDB_LOG_LEVEL_TRACE) << "Leave " << this->function_name; + BOOST_LOG_CHANNEL_SEV(caosdb::logging::logger::get(), this->channel, CAOSDB_LOG_LEVEL_TRACE) + << "Leave " << this->function_name; } + private: const std::string &channel; const std::string function_name; diff --git a/include/caosdb/transaction.h b/include/caosdb/transaction.h index d9cf3fd..9d5f3ab 100644 --- a/include/caosdb/transaction.h +++ b/include/caosdb/transaction.h @@ -44,7 +44,7 @@ #include <map> // for map // IWYU pragma: no_include <ext/alloc_traits.h> #include <memory> // for unique_ptr -#include <mutex> // for mutex +#include <mutex> // for mutex #include <string> // for string #include <utility> // for move #include <vector> // for vector @@ -54,7 +54,8 @@ * query) can be added as a sub-request to a transaction. */ #define ASSERT_CAN_ADD_RETRIEVAL \ - if (!IsStatus(TransactionStatus::INITIAL()) && !IsStatus(TransactionStatus::GO_ON())) { \ + if (this->status.GetCode() != StatusCode::INITIAL && \ + this->status.GetCode() != StatusCode::GO_ON) { \ return StatusCode::TRANSACTION_STATUS_ERROR; \ } \ switch (this->transaction_type) { \ @@ -90,7 +91,8 @@ * sub-request to a transaction. */ #define ASSERT_CAN_ADD_DELETION \ - if (!IsStatus(TransactionStatus::INITIAL()) && !IsStatus(TransactionStatus::GO_ON())) { \ + if (this->status.GetCode() != StatusCode::INITIAL && \ + this->status.GetCode() != StatusCode::GO_ON) { \ return StatusCode::TRANSACTION_STATUS_ERROR; \ } \ switch (this->transaction_type) { \ @@ -112,7 +114,8 @@ * sub-request to a transaction. */ #define ASSERT_CAN_ADD_INSERTION \ - if (!IsStatus(TransactionStatus::INITIAL()) && !IsStatus(TransactionStatus::GO_ON())) { \ + if (this->status.GetCode() != StatusCode::INITIAL && \ + this->status.GetCode() != StatusCode::GO_ON) { \ return StatusCode::TRANSACTION_STATUS_ERROR; \ } \ switch (this->transaction_type) { \ @@ -134,7 +137,8 @@ * sub-request to a transaction. */ #define ASSERT_CAN_ADD_UPDATE \ - if (!IsStatus(TransactionStatus::INITIAL()) && !IsStatus(TransactionStatus::GO_ON())) { \ + if (this->status.GetCode() != StatusCode::INITIAL && \ + this->status.GetCode() != StatusCode::GO_ON) { \ return StatusCode::TRANSACTION_STATUS_ERROR; \ } \ switch (this->transaction_type) { \ @@ -369,10 +373,6 @@ public: */ auto DeleteById(const std::string &id) noexcept -> StatusCode; - inline auto IsStatus(const TransactionStatus &status) const noexcept -> bool { - return this->status.GetCode() == status.GetCode(); - }; - /** * Execute this transaction in blocking mode and return the status. */ diff --git a/include/caosdb/unary_rpc_handler.h b/include/caosdb/unary_rpc_handler.h index 0d35eec..a505a87 100644 --- a/include/caosdb/unary_rpc_handler.h +++ b/include/caosdb/unary_rpc_handler.h @@ -63,8 +63,10 @@ public: : HandlerInterface(), state_(CallState::NewCall), completion_queue(completion_queue) {} void Start() override { - transaction_status = TransactionStatus::EXECUTING(); - OnNext(true); + if (state_ == CallState::NewCall) { + transaction_status = TransactionStatus::EXECUTING(); + OnNext(true); + } } bool OnNext(bool ok) override; diff --git a/src/caosdb/entity.cpp b/src/caosdb/entity.cpp index cea12b4..c409a46 100644 --- a/src/caosdb/entity.cpp +++ b/src/caosdb/entity.cpp @@ -20,12 +20,12 @@ * */ #include "caosdb/entity.h" -#include "caosdb/data_type.h" // for DataType -#include "caosdb/entity/v1alpha1/main.pb.h" // for Messages -#include "caosdb/protobuf_helper.h" // for get_arena -#include "caosdb/value.h" // for Value -#include <google/protobuf/arena.h> // for Arena -#include <new> // for operator new +#include "caosdb/data_type.h" // for DataType +#include "caosdb/entity/v1alpha1/main.pb.h" // for Messages +#include "caosdb/protobuf_helper.h" // for get_arena +#include "caosdb/value.h" // for Value +#include <google/protobuf/arena.h> // for Arena +#include <new> // for operator new namespace caosdb::entity { using ProtoParent = caosdb::entity::v1alpha1::Parent; diff --git a/src/caosdb/file_transmission/download_request_handler.cpp b/src/caosdb/file_transmission/download_request_handler.cpp index a157754..be47bf8 100644 --- a/src/caosdb/file_transmission/download_request_handler.cpp +++ b/src/caosdb/file_transmission/download_request_handler.cpp @@ -123,7 +123,10 @@ bool DownloadRequestHandler::OnNext(bool ok) { return true; } -void DownloadRequestHandler::Cancel() { ctx_.TryCancel(); } +void DownloadRequestHandler::Cancel() { + state_ = CallState::CallComplete; + ctx_.TryCancel(); +} void DownloadRequestHandler::handleNewCallState() { CAOSDB_LOG_TRACE(logger_name) << "Enter DownloadRequestHandler::handleNewCallState. local_path = " diff --git a/src/caosdb/file_transmission/upload_request_handler.cpp b/src/caosdb/file_transmission/upload_request_handler.cpp index cc81702..0549db3 100644 --- a/src/caosdb/file_transmission/upload_request_handler.cpp +++ b/src/caosdb/file_transmission/upload_request_handler.cpp @@ -126,7 +126,10 @@ bool UploadRequestHandler::OnNext(bool ok) { return true; } -void UploadRequestHandler::Cancel() { ctx_.TryCancel(); } +void UploadRequestHandler::Cancel() { + state_ = CallState::CallComplete; + ctx_.TryCancel(); +} void UploadRequestHandler::handleNewCallState() { auto filename = file_descriptor_.local_path; diff --git a/src/caosdb/transaction.cpp b/src/caosdb/transaction.cpp index 7b686ca..0a17bc9 100644 --- a/src/caosdb/transaction.cpp +++ b/src/caosdb/transaction.cpp @@ -29,12 +29,12 @@ #include "caosdb/transaction_handler.h" // for EntityTransactionHandler #include <algorithm> // for max // IWYU pragma: no_include <bits/exception.h> -#include <boost/filesystem/path.hpp> // for operator<<, path -#include <boost/log/core/record.hpp> // for record -#include <boost/log/detail/attachable_sstream_buf.hpp> // for basic_ostring... -#include <boost/log/sources/record_ostream.hpp> // for basic_record_... -#include <boost/preprocessor/seq/limits/enum_256.hpp> // for BOOST_PP_SEQ_... -#include <boost/preprocessor/seq/limits/size_256.hpp> // for BOOST_PP_SEQ_... +#include <boost/filesystem/path.hpp> // for operator<<, path +#include <boost/log/core/record.hpp> // for record +#include <boost/log/detail/attachable_sstream_buf.hpp> // for basic_ostring... +#include <boost/log/sources/record_ostream.hpp> // for basic_record_... +#include <boost/preprocessor/seq/limits/enum_256.hpp> // for BOOST_PP_SEQ_... +#include <boost/preprocessor/seq/limits/size_256.hpp> // for BOOST_PP_SEQ_... // IWYU pragma: no_include <cxxabi.h> #include <exception> // IWYU pragma: keep #include <future> // for async, future @@ -45,10 +45,9 @@ #include <iosfwd> // for streamsize #include <map> // for map, operator!= #include <memory> // for unique_ptr -#include <system_error> // for std::system_error +#include <system_error> // for std::system_error #include <utility> // for move, pair - namespace caosdb::transaction { using caosdb::entity::v1alpha1::EntityTransactionService; using caosdb::entity::v1alpha1::FileTransmissionService; @@ -201,9 +200,14 @@ auto Transaction::DoExecuteTransaction() noexcept -> StatusCode { auto *registration_request = Arena::CreateMessage<RegisterFileUploadRequest>(get_arena()); auto *registration_response = Arena::CreateMessage<RegisterFileUploadResponse>(get_arena()); - handler_ = - std::make_unique<RegisterFileUploadHandler>(&handler_, file_service.get(), &completion_queue, - registration_request, registration_response); + { + const std::lock_guard<std::mutex> lock(this->transaction_mutex); + if (this->status.GetCode() == StatusCode::EXECUTING) { + handler_ = std::make_unique<RegisterFileUploadHandler>( + &handler_, file_service.get(), &completion_queue, registration_request, + registration_response); + } + } this->status = ProcessCalls(); if (registration_response->status() != RegistrationStatus::REGISTRATION_STATUS_ACCEPTED) { @@ -214,9 +218,14 @@ auto Transaction::DoExecuteTransaction() noexcept -> StatusCode { for (auto &file_descriptor : upload_files) { file_descriptor.file_transmission_id->set_registration_id( registration_response->registration_id()); - CAOSDB_LOG_INFO(logger_name) << "Uploading " << file_descriptor.local_path; - handler_ = std::make_unique<UploadRequestHandler>(&handler_, file_service.get(), - &completion_queue, file_descriptor); + { + const std::lock_guard<std::mutex> lock(this->transaction_mutex); + if (this->status.GetCode() == StatusCode::EXECUTING) { + CAOSDB_LOG_INFO(logger_name) << "Uploading " << file_descriptor.local_path; + handler_ = std::make_unique<UploadRequestHandler>(&handler_, file_service.get(), + &completion_queue, file_descriptor); + } + } this->status = ProcessCalls(); if (this->status.GetCode() != StatusCode::EXECUTING) { // this indicates an error during the upload @@ -225,10 +234,15 @@ auto Transaction::DoExecuteTransaction() noexcept -> StatusCode { } } - if(this->status.GetCode() == StatusCode::EXECUTING) { - CAOSDB_LOG_DEBUG(logger_name) << "RPC Request: " << RequestToString(); - handler_ = std::make_unique<EntityTransactionHandler>(&handler_, entity_service.get(), - &completion_queue, request, response); + if (this->status.GetCode() == StatusCode::EXECUTING) { + { + const std::lock_guard<std::mutex> lock(this->transaction_mutex); + if (this->status.GetCode() == StatusCode::EXECUTING) { + CAOSDB_LOG_DEBUG(logger_name) << "RPC Request: " << RequestToString(); + handler_ = std::make_unique<EntityTransactionHandler>(&handler_, entity_service.get(), + &completion_queue, request, response); + } + } this->status = ProcessCalls(); } @@ -249,13 +263,19 @@ auto Transaction::DoExecuteTransaction() noexcept -> StatusCode { for (const auto &item : download_files) { auto file_descriptor(item.second); - CAOSDB_LOG_INFO(logger_name) << "Downloading " << file_descriptor.local_path; + { + const std::lock_guard<std::mutex> lock(this->transaction_mutex); + if (this->status.GetCode() == StatusCode::EXECUTING) { + CAOSDB_LOG_INFO(logger_name) << "Downloading " << file_descriptor.local_path; - handler_ = std::make_unique<DownloadRequestHandler>(&handler_, file_service.get(), - &completion_queue, file_descriptor); + handler_ = std::make_unique<DownloadRequestHandler>(&handler_, file_service.get(), + &completion_queue, file_descriptor); + } + } this->status = ProcessCalls(); if (this->status.GetCode() != StatusCode::EXECUTING) { // this indicates an error during the download + break; return this->status.GetCode(); } } @@ -264,7 +284,8 @@ auto Transaction::DoExecuteTransaction() noexcept -> StatusCode { } auto Transaction::ExecuteAsynchronously() noexcept -> StatusCode { - if (!IsStatus(TransactionStatus::READY()) && !IsStatus(TransactionStatus::GO_ON())) { + const std::lock_guard<std::mutex> lock(this->transaction_mutex); + if (this->status.GetCode() != StatusCode::READY && this->status.GetCode() != StatusCode::GO_ON) { return StatusCode::TRANSACTION_STATUS_ERROR; } switch (this->transaction_type) { @@ -386,7 +407,7 @@ auto Transaction::WaitForIt() const noexcept -> TransactionStatus { this->transaction_future.wait(); - if(this->status.GetCode() != StatusCode::EXECUTING) { + if (this->status.GetCode() != StatusCode::EXECUTING) { return this->status; } @@ -395,6 +416,13 @@ auto Transaction::WaitForIt() const noexcept -> TransactionStatus { auto Transaction::ProcessCalls() -> TransactionStatus { CAOSDB_LOG_TRACE_ENTER_AND_LEAVE(logger_name, "Transaction::ProcessCalls()") + if (this->status.GetCode() != StatusCode::EXECUTING) { + CAOSDB_LOG_ERROR(logger_name) + << "Transaction::ProcessCalls() was called, TransactionStatus was: " + << std::to_string(this->status.GetCode()) << " - " << this->status.GetDescription(); + return status; + } + gpr_timespec deadline; deadline.tv_sec = 1; deadline.tv_nsec = 0; @@ -445,8 +473,18 @@ auto Transaction::ProcessCalls() -> TransactionStatus { } Transaction::~Transaction() { - CAOSDB_LOG_TRACE(logger_name) << "Enter Transaction::~Transaction()"; + CAOSDB_LOG_TRACE_ENTER_AND_LEAVE(logger_name, "Transaction::~Transaction()") this->Cancel(); +} + +void Transaction::Cancel() { + CAOSDB_LOG_TRACE_ENTER_AND_LEAVE(logger_name, "Transaction::Cancel()") + const std::lock_guard<std::mutex> lock(this->transaction_mutex); + + this->status = TransactionStatus::CANCELLED(); + if (handler_ != nullptr) { + handler_->Cancel(); + } completion_queue.Shutdown(); @@ -456,15 +494,6 @@ Transaction::~Transaction() { while (completion_queue.Next(&ignoredTag, &ok)) { ; } - CAOSDB_LOG_TRACE(logger_name) << "Leave Transaction::~Transaction()"; -} - -void Transaction::Cancel() { - // TODO(tf) State Canceled - this->status = TransactionStatus::CANCELLED(); - if (handler_ != nullptr) { - handler_->Cancel(); - } } } // namespace caosdb::transaction diff --git a/src/caosdb/transaction_handler.cpp b/src/caosdb/transaction_handler.cpp index e97e626..d6cc8d3 100644 --- a/src/caosdb/transaction_handler.cpp +++ b/src/caosdb/transaction_handler.cpp @@ -22,17 +22,13 @@ EntityTransactionHandler::EntityTransactionHandler(HandlerTag tag, response_(response) {} void EntityTransactionHandler::handleNewCallState() { - CAOSDB_LOG_TRACE(logger_name) << "Enter EntityTransactionHandler::handleNewCallState with " - "CompletionQueue " - << completion_queue; + CAOSDB_LOG_TRACE_ENTER_AND_LEAVE(logger_name, "EntityTransactionHandler::handleNewCallState()") rpc_ = stub_->PrepareAsyncMultiTransaction(&call_context, *request_, completion_queue); state_ = CallState::CallComplete; rpc_->StartCall(); rpc_->Finish(response_, &status_, tag_); - - CAOSDB_LOG_TRACE(logger_name) << "Leave EntityTransactionHandler::handleNewCallState"; } } // namespace caosdb::transaction diff --git a/src/caosdb/unary_rpc_handler.cpp b/src/caosdb/unary_rpc_handler.cpp index 90e007e..0c32462 100644 --- a/src/caosdb/unary_rpc_handler.cpp +++ b/src/caosdb/unary_rpc_handler.cpp @@ -105,7 +105,11 @@ bool UnaryRpcHandler::OnNext(bool ok) { return false; } -void UnaryRpcHandler::Cancel() { transaction_status = TransactionStatus::CANCELLED(); call_context.TryCancel(); } +void UnaryRpcHandler::Cancel() { + state_ = CallState::CallComplete; + transaction_status = TransactionStatus::CANCELLED(); + call_context.TryCancel(); +} void UnaryRpcHandler::handleCallCompleteState() { CAOSDB_LOG_TRACE(logger_name) << "Enter UnaryRpcHandler::handleCallCompleteState"; diff --git a/test/test_transaction.cpp b/test/test_transaction.cpp index a98fb46..eff9432 100644 --- a/test/test_transaction.cpp +++ b/test/test_transaction.cpp @@ -246,4 +246,38 @@ TEST(test_transaction, test_copy_result_set) { } } +TEST(test_transaction, test_multiple_execute) { + const auto *host = "localhost"; + auto configuration = InsecureConnectionConfiguration(host, 8000); + Connection connection(configuration); + auto transaction = connection.CreateTransaction(); + + EXPECT_EQ(transaction->GetStatus().GetCode(), StatusCode::INITIAL); + transaction->RetrieveById("asdf"); + + EXPECT_EQ(transaction->GetStatus().GetCode(), StatusCode::GO_ON); + + EXPECT_EQ(transaction->ExecuteAsynchronously(), StatusCode::EXECUTING); + EXPECT_EQ(transaction->ExecuteAsynchronously(), StatusCode::TRANSACTION_STATUS_ERROR); + EXPECT_EQ(transaction->ExecuteAsynchronously(), StatusCode::TRANSACTION_STATUS_ERROR); +} + +TEST(test_transaction, test_multiple_wait_for_it) { + const auto *host = "localhost"; + auto configuration = InsecureConnectionConfiguration(host, 8000); + Connection connection(configuration); + auto transaction = connection.CreateTransaction(); + + EXPECT_EQ(transaction->GetStatus().GetCode(), StatusCode::INITIAL); + transaction->RetrieveById("asdf"); + + EXPECT_EQ(transaction->GetStatus().GetCode(), StatusCode::GO_ON); + + EXPECT_EQ(transaction->ExecuteAsynchronously(), StatusCode::EXECUTING); + + EXPECT_EQ(transaction->WaitForIt().GetCode(), StatusCode::CONNECTION_ERROR); + EXPECT_EQ(transaction->WaitForIt().GetCode(), StatusCode::CONNECTION_ERROR); + EXPECT_EQ(transaction->WaitForIt().GetCode(), StatusCode::CONNECTION_ERROR); +} + } // namespace caosdb::transaction -- GitLab