diff --git a/include/caosdb/file_transmission/download_request_handler.h b/include/caosdb/file_transmission/download_request_handler.h index 3af72cc5939ced7a2f6c2edf86981cc8ef62cbb0..cd3d05b4144acc54bcf39d8a584a1e5d4466a7fa 100644 --- a/include/caosdb/file_transmission/download_request_handler.h +++ b/include/caosdb/file_transmission/download_request_handler.h @@ -83,7 +83,11 @@ public: DownloadRequestHandler(DownloadRequestHandler &&) = delete; DownloadRequestHandler &operator=(DownloadRequestHandler &&) = delete; - void Start() override { OnNext(true); } + void Start() override { + if (state_ == CallState::NewCall) { + OnNext(true); + } + } bool OnNext(bool ok) override; diff --git a/include/caosdb/file_transmission/upload_request_handler.h b/include/caosdb/file_transmission/upload_request_handler.h index 9965d54ef870c0ab0ae21b65e456859fb1ef34a3..81d84fac70c2a45ad6f4f18a65ee4374dfd31c71 100644 --- a/include/caosdb/file_transmission/upload_request_handler.h +++ b/include/caosdb/file_transmission/upload_request_handler.h @@ -81,7 +81,11 @@ public: UploadRequestHandler(UploadRequestHandler &&) = delete; UploadRequestHandler &operator=(UploadRequestHandler &&) = delete; - void Start() override { OnNext(true); } + void Start() override { + if (state_ == CallState::NewCall) { + OnNext(true); + } + } bool OnNext(bool ok) override; diff --git a/include/caosdb/logging.h b/include/caosdb/logging.h index d684c47b26e1252df0540562a01f0a221fa83f02..a119c9cfa2b7c740a63ea355e3ede80187a41305 100644 --- a/include/caosdb/logging.h +++ b/include/caosdb/logging.h @@ -24,12 +24,17 @@ #define CAOSDB_LOGGING_H #include "caosdb/log_level.h" // for CAOSDB_LOG_... -#include "boost/log/sources/global_logger_storage.hpp" // for BOOST_LOG_I... -#include "boost/log/sources/record_ostream.hpp" // IWYU pragma: keep -#include "boost/log/sources/severity_channel_logger.hpp" // for BOOST_LOG_C... -#include "boost/log/utility/setup/settings.hpp" // for settings -#include "boost/smart_ptr/intrusive_ptr.hpp" // for intrusive_ptr -#include "boost/smart_ptr/intrusive_ref_counter.hpp" // for intrusive_p... +#include <boost/log/sources/global_logger_storage.hpp> // for BOOST_LOG_I... +#include <boost/log/sources/record_ostream.hpp> // IWYU pragma: keep +#include <boost/log/sources/severity_channel_logger.hpp> // for BOOST_LOG_C... +#include <boost/log/utility/setup/settings.hpp> // for settings +#include <boost/smart_ptr/intrusive_ptr.hpp> // for intrusive_ptr +#include <boost/smart_ptr/intrusive_ref_counter.hpp> // for intrusive_p... +#include <boost/log/core/record.hpp> // for record +#include <boost/log/detail/attachable_sstream_buf.hpp> // for basic_ostri... +#include <boost/preprocessor/seq/limits/enum_256.hpp> // for BOOST_PP_SE... +#include <boost/preprocessor/seq/limits/size_256.hpp> // for BOOST_PP_SE... +#include <iosfwd> // for streamsize #include <memory> // for shared_ptr #include <string> // for string #include <vector> // for vector @@ -51,12 +56,16 @@ BOOST_LOG_GLOBAL_LOGGER(logger, boost_logger_class) */ class TraceEnterLeaveLogger { public: - inline TraceEnterLeaveLogger(const std::string &channel, const std::string &function_name) : channel(channel), function_name(function_name) { - BOOST_LOG_CHANNEL_SEV(caosdb::logging::logger::get(), this->channel, CAOSDB_LOG_LEVEL_TRACE) << "Enter " << this->function_name; + inline TraceEnterLeaveLogger(const std::string &channel, const std::string &function_name) + : channel(channel), function_name(function_name) { + BOOST_LOG_CHANNEL_SEV(caosdb::logging::logger::get(), this->channel, CAOSDB_LOG_LEVEL_TRACE) + << "Enter " << this->function_name; } inline ~TraceEnterLeaveLogger() { - BOOST_LOG_CHANNEL_SEV(caosdb::logging::logger::get(), this->channel, CAOSDB_LOG_LEVEL_TRACE) << "Leave " << this->function_name; + BOOST_LOG_CHANNEL_SEV(caosdb::logging::logger::get(), this->channel, CAOSDB_LOG_LEVEL_TRACE) + << "Leave " << this->function_name; } + private: const std::string &channel; const std::string function_name; diff --git a/include/caosdb/transaction.h b/include/caosdb/transaction.h index d9cf3fd614c8d32da83f2721479e7b57d08c8c83..9d5f3ab07810cb5941680045ef80428f76411b75 100644 --- a/include/caosdb/transaction.h +++ b/include/caosdb/transaction.h @@ -44,7 +44,7 @@ #include <map> // for map // IWYU pragma: no_include <ext/alloc_traits.h> #include <memory> // for unique_ptr -#include <mutex> // for mutex +#include <mutex> // for mutex #include <string> // for string #include <utility> // for move #include <vector> // for vector @@ -54,7 +54,8 @@ * query) can be added as a sub-request to a transaction. */ #define ASSERT_CAN_ADD_RETRIEVAL \ - if (!IsStatus(TransactionStatus::INITIAL()) && !IsStatus(TransactionStatus::GO_ON())) { \ + if (this->status.GetCode() != StatusCode::INITIAL && \ + this->status.GetCode() != StatusCode::GO_ON) { \ return StatusCode::TRANSACTION_STATUS_ERROR; \ } \ switch (this->transaction_type) { \ @@ -90,7 +91,8 @@ * sub-request to a transaction. */ #define ASSERT_CAN_ADD_DELETION \ - if (!IsStatus(TransactionStatus::INITIAL()) && !IsStatus(TransactionStatus::GO_ON())) { \ + if (this->status.GetCode() != StatusCode::INITIAL && \ + this->status.GetCode() != StatusCode::GO_ON) { \ return StatusCode::TRANSACTION_STATUS_ERROR; \ } \ switch (this->transaction_type) { \ @@ -112,7 +114,8 @@ * sub-request to a transaction. */ #define ASSERT_CAN_ADD_INSERTION \ - if (!IsStatus(TransactionStatus::INITIAL()) && !IsStatus(TransactionStatus::GO_ON())) { \ + if (this->status.GetCode() != StatusCode::INITIAL && \ + this->status.GetCode() != StatusCode::GO_ON) { \ return StatusCode::TRANSACTION_STATUS_ERROR; \ } \ switch (this->transaction_type) { \ @@ -134,7 +137,8 @@ * sub-request to a transaction. */ #define ASSERT_CAN_ADD_UPDATE \ - if (!IsStatus(TransactionStatus::INITIAL()) && !IsStatus(TransactionStatus::GO_ON())) { \ + if (this->status.GetCode() != StatusCode::INITIAL && \ + this->status.GetCode() != StatusCode::GO_ON) { \ return StatusCode::TRANSACTION_STATUS_ERROR; \ } \ switch (this->transaction_type) { \ @@ -369,10 +373,6 @@ public: */ auto DeleteById(const std::string &id) noexcept -> StatusCode; - inline auto IsStatus(const TransactionStatus &status) const noexcept -> bool { - return this->status.GetCode() == status.GetCode(); - }; - /** * Execute this transaction in blocking mode and return the status. */ diff --git a/include/caosdb/unary_rpc_handler.h b/include/caosdb/unary_rpc_handler.h index 0d35eec0f03d546b70ae5c1c59aa057896bb575b..a505a8724d67b5434b3535e15bc1ccb9eb635de4 100644 --- a/include/caosdb/unary_rpc_handler.h +++ b/include/caosdb/unary_rpc_handler.h @@ -63,8 +63,10 @@ public: : HandlerInterface(), state_(CallState::NewCall), completion_queue(completion_queue) {} void Start() override { - transaction_status = TransactionStatus::EXECUTING(); - OnNext(true); + if (state_ == CallState::NewCall) { + transaction_status = TransactionStatus::EXECUTING(); + OnNext(true); + } } bool OnNext(bool ok) override; diff --git a/src/caosdb/entity.cpp b/src/caosdb/entity.cpp index cea12b41df4cd9c2dea7bd55c9052f44a7b06fe8..c409a46e8ad37aee93b67b2a36109b1fe8f8c640 100644 --- a/src/caosdb/entity.cpp +++ b/src/caosdb/entity.cpp @@ -20,12 +20,12 @@ * */ #include "caosdb/entity.h" -#include "caosdb/data_type.h" // for DataType -#include "caosdb/entity/v1alpha1/main.pb.h" // for Messages -#include "caosdb/protobuf_helper.h" // for get_arena -#include "caosdb/value.h" // for Value -#include <google/protobuf/arena.h> // for Arena -#include <new> // for operator new +#include "caosdb/data_type.h" // for DataType +#include "caosdb/entity/v1alpha1/main.pb.h" // for Messages +#include "caosdb/protobuf_helper.h" // for get_arena +#include "caosdb/value.h" // for Value +#include <google/protobuf/arena.h> // for Arena +#include <new> // for operator new namespace caosdb::entity { using ProtoParent = caosdb::entity::v1alpha1::Parent; diff --git a/src/caosdb/file_transmission/download_request_handler.cpp b/src/caosdb/file_transmission/download_request_handler.cpp index a157754b337e798fc4c144cb44abea153053ed79..be47bf8f359d3531c7f3746a29b7dc6198524aaa 100644 --- a/src/caosdb/file_transmission/download_request_handler.cpp +++ b/src/caosdb/file_transmission/download_request_handler.cpp @@ -123,7 +123,10 @@ bool DownloadRequestHandler::OnNext(bool ok) { return true; } -void DownloadRequestHandler::Cancel() { ctx_.TryCancel(); } +void DownloadRequestHandler::Cancel() { + state_ = CallState::CallComplete; + ctx_.TryCancel(); +} void DownloadRequestHandler::handleNewCallState() { CAOSDB_LOG_TRACE(logger_name) << "Enter DownloadRequestHandler::handleNewCallState. local_path = " diff --git a/src/caosdb/file_transmission/upload_request_handler.cpp b/src/caosdb/file_transmission/upload_request_handler.cpp index cc8170249b932fb8bfa572fd345746bbbacc16af..0549db3794e2b1de8716e0867faed296ad179c0f 100644 --- a/src/caosdb/file_transmission/upload_request_handler.cpp +++ b/src/caosdb/file_transmission/upload_request_handler.cpp @@ -126,7 +126,10 @@ bool UploadRequestHandler::OnNext(bool ok) { return true; } -void UploadRequestHandler::Cancel() { ctx_.TryCancel(); } +void UploadRequestHandler::Cancel() { + state_ = CallState::CallComplete; + ctx_.TryCancel(); +} void UploadRequestHandler::handleNewCallState() { auto filename = file_descriptor_.local_path; diff --git a/src/caosdb/transaction.cpp b/src/caosdb/transaction.cpp index 7b686ca055e7111afebad9f668f9da80091d2065..0a17bc9790cac73592342292bd62a32855b75e27 100644 --- a/src/caosdb/transaction.cpp +++ b/src/caosdb/transaction.cpp @@ -29,12 +29,12 @@ #include "caosdb/transaction_handler.h" // for EntityTransactionHandler #include <algorithm> // for max // IWYU pragma: no_include <bits/exception.h> -#include <boost/filesystem/path.hpp> // for operator<<, path -#include <boost/log/core/record.hpp> // for record -#include <boost/log/detail/attachable_sstream_buf.hpp> // for basic_ostring... -#include <boost/log/sources/record_ostream.hpp> // for basic_record_... -#include <boost/preprocessor/seq/limits/enum_256.hpp> // for BOOST_PP_SEQ_... -#include <boost/preprocessor/seq/limits/size_256.hpp> // for BOOST_PP_SEQ_... +#include <boost/filesystem/path.hpp> // for operator<<, path +#include <boost/log/core/record.hpp> // for record +#include <boost/log/detail/attachable_sstream_buf.hpp> // for basic_ostring... +#include <boost/log/sources/record_ostream.hpp> // for basic_record_... +#include <boost/preprocessor/seq/limits/enum_256.hpp> // for BOOST_PP_SEQ_... +#include <boost/preprocessor/seq/limits/size_256.hpp> // for BOOST_PP_SEQ_... // IWYU pragma: no_include <cxxabi.h> #include <exception> // IWYU pragma: keep #include <future> // for async, future @@ -45,10 +45,9 @@ #include <iosfwd> // for streamsize #include <map> // for map, operator!= #include <memory> // for unique_ptr -#include <system_error> // for std::system_error +#include <system_error> // for std::system_error #include <utility> // for move, pair - namespace caosdb::transaction { using caosdb::entity::v1alpha1::EntityTransactionService; using caosdb::entity::v1alpha1::FileTransmissionService; @@ -201,9 +200,14 @@ auto Transaction::DoExecuteTransaction() noexcept -> StatusCode { auto *registration_request = Arena::CreateMessage<RegisterFileUploadRequest>(get_arena()); auto *registration_response = Arena::CreateMessage<RegisterFileUploadResponse>(get_arena()); - handler_ = - std::make_unique<RegisterFileUploadHandler>(&handler_, file_service.get(), &completion_queue, - registration_request, registration_response); + { + const std::lock_guard<std::mutex> lock(this->transaction_mutex); + if (this->status.GetCode() == StatusCode::EXECUTING) { + handler_ = std::make_unique<RegisterFileUploadHandler>( + &handler_, file_service.get(), &completion_queue, registration_request, + registration_response); + } + } this->status = ProcessCalls(); if (registration_response->status() != RegistrationStatus::REGISTRATION_STATUS_ACCEPTED) { @@ -214,9 +218,14 @@ auto Transaction::DoExecuteTransaction() noexcept -> StatusCode { for (auto &file_descriptor : upload_files) { file_descriptor.file_transmission_id->set_registration_id( registration_response->registration_id()); - CAOSDB_LOG_INFO(logger_name) << "Uploading " << file_descriptor.local_path; - handler_ = std::make_unique<UploadRequestHandler>(&handler_, file_service.get(), - &completion_queue, file_descriptor); + { + const std::lock_guard<std::mutex> lock(this->transaction_mutex); + if (this->status.GetCode() == StatusCode::EXECUTING) { + CAOSDB_LOG_INFO(logger_name) << "Uploading " << file_descriptor.local_path; + handler_ = std::make_unique<UploadRequestHandler>(&handler_, file_service.get(), + &completion_queue, file_descriptor); + } + } this->status = ProcessCalls(); if (this->status.GetCode() != StatusCode::EXECUTING) { // this indicates an error during the upload @@ -225,10 +234,15 @@ auto Transaction::DoExecuteTransaction() noexcept -> StatusCode { } } - if(this->status.GetCode() == StatusCode::EXECUTING) { - CAOSDB_LOG_DEBUG(logger_name) << "RPC Request: " << RequestToString(); - handler_ = std::make_unique<EntityTransactionHandler>(&handler_, entity_service.get(), - &completion_queue, request, response); + if (this->status.GetCode() == StatusCode::EXECUTING) { + { + const std::lock_guard<std::mutex> lock(this->transaction_mutex); + if (this->status.GetCode() == StatusCode::EXECUTING) { + CAOSDB_LOG_DEBUG(logger_name) << "RPC Request: " << RequestToString(); + handler_ = std::make_unique<EntityTransactionHandler>(&handler_, entity_service.get(), + &completion_queue, request, response); + } + } this->status = ProcessCalls(); } @@ -249,13 +263,19 @@ auto Transaction::DoExecuteTransaction() noexcept -> StatusCode { for (const auto &item : download_files) { auto file_descriptor(item.second); - CAOSDB_LOG_INFO(logger_name) << "Downloading " << file_descriptor.local_path; + { + const std::lock_guard<std::mutex> lock(this->transaction_mutex); + if (this->status.GetCode() == StatusCode::EXECUTING) { + CAOSDB_LOG_INFO(logger_name) << "Downloading " << file_descriptor.local_path; - handler_ = std::make_unique<DownloadRequestHandler>(&handler_, file_service.get(), - &completion_queue, file_descriptor); + handler_ = std::make_unique<DownloadRequestHandler>(&handler_, file_service.get(), + &completion_queue, file_descriptor); + } + } this->status = ProcessCalls(); if (this->status.GetCode() != StatusCode::EXECUTING) { // this indicates an error during the download + break; return this->status.GetCode(); } } @@ -264,7 +284,8 @@ auto Transaction::DoExecuteTransaction() noexcept -> StatusCode { } auto Transaction::ExecuteAsynchronously() noexcept -> StatusCode { - if (!IsStatus(TransactionStatus::READY()) && !IsStatus(TransactionStatus::GO_ON())) { + const std::lock_guard<std::mutex> lock(this->transaction_mutex); + if (this->status.GetCode() != StatusCode::READY && this->status.GetCode() != StatusCode::GO_ON) { return StatusCode::TRANSACTION_STATUS_ERROR; } switch (this->transaction_type) { @@ -386,7 +407,7 @@ auto Transaction::WaitForIt() const noexcept -> TransactionStatus { this->transaction_future.wait(); - if(this->status.GetCode() != StatusCode::EXECUTING) { + if (this->status.GetCode() != StatusCode::EXECUTING) { return this->status; } @@ -395,6 +416,13 @@ auto Transaction::WaitForIt() const noexcept -> TransactionStatus { auto Transaction::ProcessCalls() -> TransactionStatus { CAOSDB_LOG_TRACE_ENTER_AND_LEAVE(logger_name, "Transaction::ProcessCalls()") + if (this->status.GetCode() != StatusCode::EXECUTING) { + CAOSDB_LOG_ERROR(logger_name) + << "Transaction::ProcessCalls() was called, TransactionStatus was: " + << std::to_string(this->status.GetCode()) << " - " << this->status.GetDescription(); + return status; + } + gpr_timespec deadline; deadline.tv_sec = 1; deadline.tv_nsec = 0; @@ -445,8 +473,18 @@ auto Transaction::ProcessCalls() -> TransactionStatus { } Transaction::~Transaction() { - CAOSDB_LOG_TRACE(logger_name) << "Enter Transaction::~Transaction()"; + CAOSDB_LOG_TRACE_ENTER_AND_LEAVE(logger_name, "Transaction::~Transaction()") this->Cancel(); +} + +void Transaction::Cancel() { + CAOSDB_LOG_TRACE_ENTER_AND_LEAVE(logger_name, "Transaction::Cancel()") + const std::lock_guard<std::mutex> lock(this->transaction_mutex); + + this->status = TransactionStatus::CANCELLED(); + if (handler_ != nullptr) { + handler_->Cancel(); + } completion_queue.Shutdown(); @@ -456,15 +494,6 @@ Transaction::~Transaction() { while (completion_queue.Next(&ignoredTag, &ok)) { ; } - CAOSDB_LOG_TRACE(logger_name) << "Leave Transaction::~Transaction()"; -} - -void Transaction::Cancel() { - // TODO(tf) State Canceled - this->status = TransactionStatus::CANCELLED(); - if (handler_ != nullptr) { - handler_->Cancel(); - } } } // namespace caosdb::transaction diff --git a/src/caosdb/transaction_handler.cpp b/src/caosdb/transaction_handler.cpp index e97e626139dd967353491310627f96acefbdb8eb..d6cc8d3270949b0bb79e6cc2828c75ad9d772ae9 100644 --- a/src/caosdb/transaction_handler.cpp +++ b/src/caosdb/transaction_handler.cpp @@ -22,17 +22,13 @@ EntityTransactionHandler::EntityTransactionHandler(HandlerTag tag, response_(response) {} void EntityTransactionHandler::handleNewCallState() { - CAOSDB_LOG_TRACE(logger_name) << "Enter EntityTransactionHandler::handleNewCallState with " - "CompletionQueue " - << completion_queue; + CAOSDB_LOG_TRACE_ENTER_AND_LEAVE(logger_name, "EntityTransactionHandler::handleNewCallState()") rpc_ = stub_->PrepareAsyncMultiTransaction(&call_context, *request_, completion_queue); state_ = CallState::CallComplete; rpc_->StartCall(); rpc_->Finish(response_, &status_, tag_); - - CAOSDB_LOG_TRACE(logger_name) << "Leave EntityTransactionHandler::handleNewCallState"; } } // namespace caosdb::transaction diff --git a/src/caosdb/unary_rpc_handler.cpp b/src/caosdb/unary_rpc_handler.cpp index 90e007e816feb48a68f5eb150e6f70267d24952a..0c324624a07b532fa28e9f58baeb5860eb2783b9 100644 --- a/src/caosdb/unary_rpc_handler.cpp +++ b/src/caosdb/unary_rpc_handler.cpp @@ -105,7 +105,11 @@ bool UnaryRpcHandler::OnNext(bool ok) { return false; } -void UnaryRpcHandler::Cancel() { transaction_status = TransactionStatus::CANCELLED(); call_context.TryCancel(); } +void UnaryRpcHandler::Cancel() { + state_ = CallState::CallComplete; + transaction_status = TransactionStatus::CANCELLED(); + call_context.TryCancel(); +} void UnaryRpcHandler::handleCallCompleteState() { CAOSDB_LOG_TRACE(logger_name) << "Enter UnaryRpcHandler::handleCallCompleteState"; diff --git a/test/test_transaction.cpp b/test/test_transaction.cpp index a98fb469eba1687b093d0884e3eb70ffa8bfc510..eff94329bd47d08675095f39c818e6cc38a10127 100644 --- a/test/test_transaction.cpp +++ b/test/test_transaction.cpp @@ -246,4 +246,38 @@ TEST(test_transaction, test_copy_result_set) { } } +TEST(test_transaction, test_multiple_execute) { + const auto *host = "localhost"; + auto configuration = InsecureConnectionConfiguration(host, 8000); + Connection connection(configuration); + auto transaction = connection.CreateTransaction(); + + EXPECT_EQ(transaction->GetStatus().GetCode(), StatusCode::INITIAL); + transaction->RetrieveById("asdf"); + + EXPECT_EQ(transaction->GetStatus().GetCode(), StatusCode::GO_ON); + + EXPECT_EQ(transaction->ExecuteAsynchronously(), StatusCode::EXECUTING); + EXPECT_EQ(transaction->ExecuteAsynchronously(), StatusCode::TRANSACTION_STATUS_ERROR); + EXPECT_EQ(transaction->ExecuteAsynchronously(), StatusCode::TRANSACTION_STATUS_ERROR); +} + +TEST(test_transaction, test_multiple_wait_for_it) { + const auto *host = "localhost"; + auto configuration = InsecureConnectionConfiguration(host, 8000); + Connection connection(configuration); + auto transaction = connection.CreateTransaction(); + + EXPECT_EQ(transaction->GetStatus().GetCode(), StatusCode::INITIAL); + transaction->RetrieveById("asdf"); + + EXPECT_EQ(transaction->GetStatus().GetCode(), StatusCode::GO_ON); + + EXPECT_EQ(transaction->ExecuteAsynchronously(), StatusCode::EXECUTING); + + EXPECT_EQ(transaction->WaitForIt().GetCode(), StatusCode::CONNECTION_ERROR); + EXPECT_EQ(transaction->WaitForIt().GetCode(), StatusCode::CONNECTION_ERROR); + EXPECT_EQ(transaction->WaitForIt().GetCode(), StatusCode::CONNECTION_ERROR); +} + } // namespace caosdb::transaction