Skip to content
Snippets Groups Projects
Verified Commit e54b19a1 authored by Timm Fitschen's avatar Timm Fitschen
Browse files

WIP: async transaction execution

parent 8057af27
No related branches found
No related tags found
2 merge requests!42Release 0.2.0,!28F async execute
...@@ -42,6 +42,26 @@ typedef boost::log::sources::severity_channel_logger_mt<int, std::string> boost_ ...@@ -42,6 +42,26 @@ typedef boost::log::sources::severity_channel_logger_mt<int, std::string> boost_
BOOST_LOG_GLOBAL_LOGGER(logger, boost_logger_class) BOOST_LOG_GLOBAL_LOGGER(logger, boost_logger_class)
/**
* Helper class for logging the entering and leaving of a function or method.
*
* Please Use the macro
*
* CAOSDB_LOG_TRACE_ENTER_AND_LEAVE(logger_name, function_name);
*/
class TraceEnterLeaveLogger {
public:
inline TraceEnterLeaveLogger(const std::string &channel, const std::string &function_name) : channel(channel), function_name(function_name) {
BOOST_LOG_CHANNEL_SEV(caosdb::logging::logger::get(), this->channel, CAOSDB_LOG_LEVEL_TRACE) << "Enter " << this->function_name;
}
inline ~TraceEnterLeaveLogger() {
BOOST_LOG_CHANNEL_SEV(caosdb::logging::logger::get(), this->channel, CAOSDB_LOG_LEVEL_TRACE) << "Leave " << this->function_name;
}
private:
const std::string &channel;
const std::string function_name;
};
/** /**
* This class stores the integer log level. * This class stores the integer log level.
*/ */
...@@ -199,6 +219,9 @@ void caosdb_log_trace(const char *channel, const char *msg); ...@@ -199,6 +219,9 @@ void caosdb_log_trace(const char *channel, const char *msg);
#define CAOSDB_LOG_TRACE(Channel) \ #define CAOSDB_LOG_TRACE(Channel) \
BOOST_LOG_CHANNEL_SEV(caosdb::logging::logger::get(), Channel, CAOSDB_LOG_LEVEL_TRACE) BOOST_LOG_CHANNEL_SEV(caosdb::logging::logger::get(), Channel, CAOSDB_LOG_LEVEL_TRACE)
#define CAOSDB_LOG_TRACE_ENTER_AND_LEAVE(Channel, FunctionName) \
const caosdb::logging::TraceEnterLeaveLogger trace_enter_leave_logger(Channel, FunctionName);
#define CAOSDB_LOG_ERROR_AND_RETURN_STATUS(Channel, StatusCode, Message) \ #define CAOSDB_LOG_ERROR_AND_RETURN_STATUS(Channel, StatusCode, Message) \
CAOSDB_LOG_ERROR(Channel) << "StatusCode (" << StatusCode << ") " \ CAOSDB_LOG_ERROR(Channel) << "StatusCode (" << StatusCode << ") " \
<< caosdb::get_status_description(StatusCode) << ": " << Message; \ << caosdb::get_status_description(StatusCode) << ": " << Message; \
......
...@@ -24,7 +24,7 @@ ...@@ -24,7 +24,7 @@
#include "caosdb/status_code.h" // for StatusCode, SUCCESS #include "caosdb/status_code.h" // for StatusCode, SUCCESS
#include <google/protobuf/arena.h> // for Arena #include <google/protobuf/arena.h> // for Arena
#include <google/protobuf/extension_set.h> // for Arena #include <google/protobuf/generated_message_util.h> // for Arena
#include <google/protobuf/util/json_util.h> // for JsonOptions, MessageToJs... #include <google/protobuf/util/json_util.h> // for JsonOptions, MessageToJs...
#include <string> // for string #include <string> // for string
......
...@@ -30,19 +30,21 @@ ...@@ -30,19 +30,21 @@
#include "caosdb/protobuf_helper.h" // for get_arena #include "caosdb/protobuf_helper.h" // for get_arena
#include "caosdb/status_code.h" // for StatusCode #include "caosdb/status_code.h" // for StatusCode
#include "caosdb/transaction_status.h" // for StatusCode #include "caosdb/transaction_status.h" // for StatusCode
#include <algorithm> // for max
#include <boost/log/core/record.hpp> // for record #include <boost/log/core/record.hpp> // for record
#include <boost/log/sources/record_ostream.hpp> // for basic_record_o... #include <boost/log/sources/record_ostream.hpp> // for basic_record_o...
#include <boost/preprocessor/seq/limits/enum_256.hpp> // for BOOST_PP_SEQ_E... #include <boost/preprocessor/seq/limits/enum_256.hpp> // for BOOST_PP_SEQ_E...
#include <boost/preprocessor/seq/limits/size_256.hpp> // for BOOST_PP_SEQ_S... #include <boost/preprocessor/seq/limits/size_256.hpp> // for BOOST_PP_SEQ_S...
#include <future> // for async, future
#include <google/protobuf/arena.h> // for Arena #include <google/protobuf/arena.h> // for Arena
#include <google/protobuf/generated_message_util.h> // for CreateMessage... #include <google/protobuf/generated_message_util.h> // for CreateMessage...
#include <google/protobuf/util/json_util.h> // for MessageToJsonS... #include <google/protobuf/util/json_util.h> // for MessageToJsonS...
#include <grpcpp/impl/codegen/completion_queue.h> // for CompletionQueue #include <grpcpp/impl/codegen/completion_queue.h> // for CompletionQueue
#include <algorithm> // for max
#include <iterator> // for iterator, next #include <iterator> // for iterator, next
#include <map> // for map #include <map> // for map
// IWYU pragma: no_include <ext/alloc_traits.h> // IWYU pragma: no_include <ext/alloc_traits.h>
#include <memory> // for unique_ptr #include <memory> // for unique_ptr
#include <mutex> // for mutex
#include <string> // for string #include <string> // for string
#include <utility> // for move #include <utility> // for move
#include <vector> // for vector #include <vector> // for vector
...@@ -177,6 +179,7 @@ using caosdb::entity::v1alpha1::MultiTransactionRequest; ...@@ -177,6 +179,7 @@ using caosdb::entity::v1alpha1::MultiTransactionRequest;
using caosdb::entity::v1alpha1::MultiTransactionResponse; using caosdb::entity::v1alpha1::MultiTransactionResponse;
using caosdb::entity::v1alpha1::RegisterFileUploadRequest; using caosdb::entity::v1alpha1::RegisterFileUploadRequest;
using caosdb::entity::v1alpha1::RegisterFileUploadResponse; using caosdb::entity::v1alpha1::RegisterFileUploadResponse;
using caosdb::entity::v1alpha1::RetrieveResponse;
using caosdb::transaction::TransactionStatus; using caosdb::transaction::TransactionStatus;
using TransactionResponseCase = using TransactionResponseCase =
caosdb::entity::v1alpha1::TransactionResponse::TransactionResponseCase; caosdb::entity::v1alpha1::TransactionResponse::TransactionResponseCase;
...@@ -513,6 +516,13 @@ protected: ...@@ -513,6 +516,13 @@ protected:
inline auto GetArena() const -> Arena * { return get_arena(); } inline auto GetArena() const -> Arena * { return get_arena(); }
private: private:
auto ProcessTerminated() const noexcept -> TransactionStatus;
auto ProcessRetrieveResponse(RetrieveResponse *retrieve_response,
std::vector<std::unique_ptr<Entity>> *entities,
bool *set_error) const noexcept -> std::unique_ptr<Entity>;
auto DoExecuteTransaction() noexcept -> StatusCode;
mutable std::mutex transaction_mutex;
mutable std::future<StatusCode> transaction_future;
grpc::CompletionQueue completion_queue; grpc::CompletionQueue completion_queue;
std::unique_ptr<HandlerInterface> handler_; std::unique_ptr<HandlerInterface> handler_;
......
...@@ -92,6 +92,13 @@ public: ...@@ -92,6 +92,13 @@ public:
* This status means that the transaction has been executed successfully. * This status means that the transaction has been executed successfully.
*/ */
CAOSDB_TRANSACTION_STATUS_DEFAULT_FACTORY(SUCCESS, StatusCode::SUCCESS) CAOSDB_TRANSACTION_STATUS_DEFAULT_FACTORY(SUCCESS, StatusCode::SUCCESS)
/**
* Factory for a CANCELLED status.
*
* This status means that the transaction has been canceled and should not be
* used anymore.
*/
CAOSDB_TRANSACTION_STATUS_DEFAULT_FACTORY(CANCELLED, StatusCode::CANCELLED)
/** /**
* Factory for a CONNECTION_ERROR status. * Factory for a CONNECTION_ERROR status.
* *
......
...@@ -20,7 +20,6 @@ ...@@ -20,7 +20,6 @@
*/ */
#include "caosdb/protobuf_helper.h" #include "caosdb/protobuf_helper.h"
#include <google/protobuf/arena.h> // for Arena #include <google/protobuf/arena.h> // for Arena
#include <google/protobuf/extension_set.h> // for Arena
namespace caosdb::utility { namespace caosdb::utility {
......
...@@ -28,14 +28,16 @@ ...@@ -28,14 +28,16 @@
#include "caosdb/status_code.h" // for StatusCode #include "caosdb/status_code.h" // for StatusCode
#include "caosdb/transaction_handler.h" // for EntityTransactionHandler #include "caosdb/transaction_handler.h" // for EntityTransactionHandler
#include <algorithm> // for max #include <algorithm> // for max
// IWYU pragma: no_include <bits/exception.h>
#include <boost/filesystem/path.hpp> // for operator<<, path #include <boost/filesystem/path.hpp> // for operator<<, path
#include <boost/log/core/record.hpp> // for record #include <boost/log/core/record.hpp> // for record
#include <boost/log/detail/attachable_sstream_buf.hpp> // for basic_ostring... #include <boost/log/detail/attachable_sstream_buf.hpp> // for basic_ostring...
#include <boost/log/sources/record_ostream.hpp> // for basic_record_... #include <boost/log/sources/record_ostream.hpp> // for basic_record_...
#include <boost/preprocessor/seq/limits/enum_256.hpp> // for BOOST_PP_SEQ_... #include <boost/preprocessor/seq/limits/enum_256.hpp> // for BOOST_PP_SEQ_...
#include <boost/preprocessor/seq/limits/size_256.hpp> // for BOOST_PP_SEQ_... #include <boost/preprocessor/seq/limits/size_256.hpp> // for BOOST_PP_SEQ_...
// IWYU pragma: no_include <bits/exception.h> // IWYU pragma: no_include <cxxabi.h>
#include <exception> // IWYU pragma: keep #include <exception> // IWYU pragma: keep
#include <future> // for async, future
#include <google/protobuf/arena.h> // for Arena #include <google/protobuf/arena.h> // for Arena
#include <google/protobuf/generated_message_util.h> // for CreateMessage... #include <google/protobuf/generated_message_util.h> // for CreateMessage...
#include <grpc/impl/codegen/gpr_types.h> // for gpr_timespec #include <grpc/impl/codegen/gpr_types.h> // for gpr_timespec
...@@ -43,8 +45,10 @@ ...@@ -43,8 +45,10 @@
#include <iosfwd> // for streamsize #include <iosfwd> // for streamsize
#include <map> // for map, operator!= #include <map> // for map, operator!=
#include <memory> // for unique_ptr #include <memory> // for unique_ptr
#include <system_error> // for std::system_error
#include <utility> // for move, pair #include <utility> // for move, pair
namespace caosdb::transaction { namespace caosdb::transaction {
using caosdb::entity::v1alpha1::EntityTransactionService; using caosdb::entity::v1alpha1::EntityTransactionService;
using caosdb::entity::v1alpha1::FileTransmissionService; using caosdb::entity::v1alpha1::FileTransmissionService;
...@@ -53,6 +57,7 @@ using caosdb::entity::v1alpha1::MultiTransactionResponse; ...@@ -53,6 +57,7 @@ using caosdb::entity::v1alpha1::MultiTransactionResponse;
using TransactionResponseCase = using TransactionResponseCase =
caosdb::entity::v1alpha1::TransactionResponse::TransactionResponseCase; caosdb::entity::v1alpha1::TransactionResponse::TransactionResponseCase;
using RetrieveResponseCase = caosdb::entity::v1alpha1::RetrieveResponse::RetrieveResponseCase; using RetrieveResponseCase = caosdb::entity::v1alpha1::RetrieveResponse::RetrieveResponseCase;
using RetrieveResponse = caosdb::entity::v1alpha1::RetrieveResponse;
using ProtoEntity = caosdb::entity::v1alpha1::Entity; using ProtoEntity = caosdb::entity::v1alpha1::Entity;
using google::protobuf::Arena; using google::protobuf::Arena;
using NextStatus = grpc::CompletionQueue::NextStatus; using NextStatus = grpc::CompletionQueue::NextStatus;
...@@ -188,29 +193,13 @@ auto Transaction::Execute() -> TransactionStatus { ...@@ -188,29 +193,13 @@ auto Transaction::Execute() -> TransactionStatus {
return status; return status;
} }
// TODO(tf) This has apparently a cognitive complexity of 39>25 (threshold). auto Transaction::DoExecuteTransaction() noexcept -> StatusCode {
auto Transaction::ExecuteAsynchronously() noexcept -> StatusCode { // NOLINT
if (!IsStatus(TransactionStatus::READY()) && !IsStatus(TransactionStatus::GO_ON())) {
return StatusCode::TRANSACTION_STATUS_ERROR;
}
switch (this->transaction_type) {
case MIXED_READ_AND_WRITE:
CAOSDB_LOG_ERROR_AND_RETURN_STATUS(
logger_name, StatusCode::UNSUPPORTED_FEATURE,
"MIXED_WRITE UNSUPPORTED: The current implementation does not support "
"mixed read and write transactions (containing retrievals, insertions, "
"deletions, and updates in one transaction).")
default:
break;
}
this->status = TransactionStatus::EXECUTING();
// upload files first // upload files first
if (!upload_files.empty()) { if (!upload_files.empty()) {
CAOSDB_LOG_INFO(logger_name) << "Number of files to be uploaded: " << upload_files.size(); CAOSDB_LOG_INFO(logger_name) << "Number of files to be uploaded: " << upload_files.size();
auto *registration_request = Arena::CreateMessage<RegisterFileUploadRequest>(GetArena()); auto *registration_request = Arena::CreateMessage<RegisterFileUploadRequest>(get_arena());
auto *registration_response = Arena::CreateMessage<RegisterFileUploadResponse>(GetArena()); auto *registration_response = Arena::CreateMessage<RegisterFileUploadResponse>(get_arena());
handler_ = handler_ =
std::make_unique<RegisterFileUploadHandler>(&handler_, file_service.get(), &completion_queue, std::make_unique<RegisterFileUploadHandler>(&handler_, file_service.get(), &completion_queue,
...@@ -219,7 +208,7 @@ auto Transaction::ExecuteAsynchronously() noexcept -> StatusCode { // NOLINT ...@@ -219,7 +208,7 @@ auto Transaction::ExecuteAsynchronously() noexcept -> StatusCode { // NOLINT
if (registration_response->status() != RegistrationStatus::REGISTRATION_STATUS_ACCEPTED) { if (registration_response->status() != RegistrationStatus::REGISTRATION_STATUS_ACCEPTED) {
this->status = TransactionStatus::FILE_UPLOAD_ERROR(); this->status = TransactionStatus::FILE_UPLOAD_ERROR();
return StatusCode::EXECUTING; return this->status.GetCode();
} }
for (auto &file_descriptor : upload_files) { for (auto &file_descriptor : upload_files) {
...@@ -230,17 +219,17 @@ auto Transaction::ExecuteAsynchronously() noexcept -> StatusCode { // NOLINT ...@@ -230,17 +219,17 @@ auto Transaction::ExecuteAsynchronously() noexcept -> StatusCode { // NOLINT
&completion_queue, file_descriptor); &completion_queue, file_descriptor);
this->status = ProcessCalls(); this->status = ProcessCalls();
if (this->status.GetCode() != StatusCode::EXECUTING) { if (this->status.GetCode() != StatusCode::EXECUTING) {
return StatusCode::EXECUTING; // this indicates an error during the upload
return this->status.GetCode();
} }
} }
} }
if(this->status.GetCode() == StatusCode::EXECUTING) {
CAOSDB_LOG_DEBUG(logger_name) << "RPC Request: " << RequestToString(); CAOSDB_LOG_DEBUG(logger_name) << "RPC Request: " << RequestToString();
handler_ = std::make_unique<EntityTransactionHandler>(&handler_, entity_service.get(), handler_ = std::make_unique<EntityTransactionHandler>(&handler_, entity_service.get(),
&completion_queue, request, response); &completion_queue, request, response);
this->status = ProcessCalls(); this->status = ProcessCalls();
if (this->status.GetCode() != StatusCode::EXECUTING) {
return StatusCode::EXECUTING;
} }
// file download afterwards // file download afterwards
...@@ -266,29 +255,41 @@ auto Transaction::ExecuteAsynchronously() noexcept -> StatusCode { // NOLINT ...@@ -266,29 +255,41 @@ auto Transaction::ExecuteAsynchronously() noexcept -> StatusCode { // NOLINT
&completion_queue, file_descriptor); &completion_queue, file_descriptor);
this->status = ProcessCalls(); this->status = ProcessCalls();
if (this->status.GetCode() != StatusCode::EXECUTING) { if (this->status.GetCode() != StatusCode::EXECUTING) {
return StatusCode::EXECUTING; // this indicates an error during the download
return this->status.GetCode();
} }
} }
} }
return StatusCode::EXECUTING; return this->status.GetCode();
} }
// TODO(tf) This has apparently a cognitive complexity of 36>25 (threshold). auto Transaction::ExecuteAsynchronously() noexcept -> StatusCode {
auto Transaction::WaitForIt() const noexcept -> TransactionStatus { // NOLINT if (!IsStatus(TransactionStatus::READY()) && !IsStatus(TransactionStatus::GO_ON())) {
if (this->status.GetCode() != StatusCode::EXECUTING) { return StatusCode::TRANSACTION_STATUS_ERROR;
return this->status;
} }
this->status = TransactionStatus::SUCCESS(); switch (this->transaction_type) {
bool set_error = false; case MIXED_READ_AND_WRITE:
auto *responses = this->response->mutable_responses(); CAOSDB_LOG_ERROR_AND_RETURN_STATUS(
std::vector<std::unique_ptr<Entity>> entities; logger_name, StatusCode::UNSUPPORTED_FEATURE,
for (auto &sub_response : *responses) { "MIXED_WRITE UNSUPPORTED: The current implementation does not support "
std::unique_ptr<Entity> result; "mixed read and write transactions (containing retrievals, insertions, "
switch (sub_response.transaction_response_case()) { "deletions, and updates in one transaction).")
default:
break;
}
this->status = TransactionStatus::EXECUTING();
case TransactionResponseCase::kRetrieveResponse: { this->transaction_future =
auto *retrieve_response = sub_response.mutable_retrieve_response(); std::async(std::launch::async, [this]() { return this->DoExecuteTransaction(); });
return StatusCode::EXECUTING;
}
auto Transaction::ProcessRetrieveResponse(RetrieveResponse *retrieve_response,
std::vector<std::unique_ptr<Entity>> *entities,
bool *set_error) const noexcept
-> std::unique_ptr<Entity> {
std::unique_ptr<Entity> result;
switch (retrieve_response->retrieve_response_case()) { switch (retrieve_response->retrieve_response_case()) {
case RetrieveResponseCase::kEntityResponse: { case RetrieveResponseCase::kEntityResponse: {
auto *retrieve_entity_response = retrieve_response->release_entity_response(); auto *retrieve_entity_response = retrieve_response->release_entity_response();
...@@ -304,23 +305,34 @@ auto Transaction::WaitForIt() const noexcept -> TransactionStatus { // NOLINT ...@@ -304,23 +305,34 @@ auto Transaction::WaitForIt() const noexcept -> TransactionStatus { // NOLINT
} break; } break;
case RetrieveResponseCase::kFindResult: { case RetrieveResponseCase::kFindResult: {
std::unique_ptr<Entity> find_result; std::unique_ptr<Entity> find_result;
for (auto &entity_response : for (auto &entity_response : *retrieve_response->mutable_find_result()->mutable_result_set()) {
*retrieve_response->mutable_find_result()->mutable_result_set()) {
find_result = std::make_unique<Entity>(&entity_response); find_result = std::make_unique<Entity>(&entity_response);
if (find_result->HasErrors()) { if (find_result->HasErrors()) {
set_error = true; *set_error = true;
} }
entities.push_back(std::move(find_result)); entities->push_back(std::move(find_result));
} }
} break; } break;
default: default:
CAOSDB_LOG_FATAL(logger_name) << "Received invalid QueryResponseCase."; CAOSDB_LOG_FATAL(logger_name) << "Received invalid QueryResponseCase.";
break; break;
} }
return result;
}
auto Transaction::ProcessTerminated() const noexcept -> TransactionStatus {
bool set_error = false;
auto *responses = this->response->mutable_responses();
std::vector<std::unique_ptr<Entity>> entities;
for (auto &sub_response : *responses) {
std::unique_ptr<Entity> result;
switch (sub_response.transaction_response_case()) {
case TransactionResponseCase::kRetrieveResponse: {
auto *retrieve_response = sub_response.mutable_retrieve_response();
result = ProcessRetrieveResponse(retrieve_response, &entities, &set_error);
break; // break TransactionResponseCase::kRetrieveResponse break; // break TransactionResponseCase::kRetrieveResponse
} }
case TransactionResponseCase::kInsertResponse: { case TransactionResponseCase::kInsertResponse: {
auto *inserted_id_response = sub_response.mutable_insert_response()->release_id_response(); auto *inserted_id_response = sub_response.mutable_insert_response()->release_id_response();
result = std::make_unique<Entity>(inserted_id_response); result = std::make_unique<Entity>(inserted_id_response);
...@@ -360,12 +372,29 @@ auto Transaction::WaitForIt() const noexcept -> TransactionStatus { // NOLINT ...@@ -360,12 +372,29 @@ auto Transaction::WaitForIt() const noexcept -> TransactionStatus { // NOLINT
if (set_error) { if (set_error) {
this->status = TransactionStatus::TRANSACTION_ERROR("The request terminated with errors."); this->status = TransactionStatus::TRANSACTION_ERROR("The request terminated with errors.");
} else {
this->status = TransactionStatus::SUCCESS();
} }
return this->status; return this->status;
} }
auto Transaction::WaitForIt() const noexcept -> TransactionStatus {
if (this->status.GetCode() != StatusCode::EXECUTING) {
return this->status;
}
this->transaction_future.wait();
if(this->status.GetCode() != StatusCode::EXECUTING) {
return this->status;
}
return ProcessTerminated();
}
auto Transaction::ProcessCalls() -> TransactionStatus { auto Transaction::ProcessCalls() -> TransactionStatus {
CAOSDB_LOG_TRACE_ENTER_AND_LEAVE(logger_name, "Transaction::ProcessCalls()")
gpr_timespec deadline; gpr_timespec deadline;
deadline.tv_sec = 1; deadline.tv_sec = 1;
deadline.tv_nsec = 0; deadline.tv_nsec = 0;
...@@ -373,6 +402,7 @@ auto Transaction::ProcessCalls() -> TransactionStatus { ...@@ -373,6 +402,7 @@ auto Transaction::ProcessCalls() -> TransactionStatus {
TransactionStatus result = TransactionStatus::EXECUTING(); TransactionStatus result = TransactionStatus::EXECUTING();
handler_->Start(); handler_->Start();
void *tag = nullptr; void *tag = nullptr;
bool ok = false; bool ok = false;
while (true) { while (true) {
...@@ -409,12 +439,13 @@ auto Transaction::ProcessCalls() -> TransactionStatus { ...@@ -409,12 +439,13 @@ auto Transaction::ProcessCalls() -> TransactionStatus {
return result; return result;
} }
} }
result = handler_->GetStatus(); result = handler_->GetStatus();
handler_.reset();
return result; return result;
} }
Transaction::~Transaction() { Transaction::~Transaction() {
CAOSDB_LOG_TRACE(logger_name) << "Enter Transaction::~Transaction()";
this->Cancel(); this->Cancel();
completion_queue.Shutdown(); completion_queue.Shutdown();
...@@ -425,10 +456,12 @@ Transaction::~Transaction() { ...@@ -425,10 +456,12 @@ Transaction::~Transaction() {
while (completion_queue.Next(&ignoredTag, &ok)) { while (completion_queue.Next(&ignoredTag, &ok)) {
; ;
} }
CAOSDB_LOG_TRACE(logger_name) << "Leave Transaction::~Transaction()";
} }
void Transaction::Cancel() { void Transaction::Cancel() {
// TODO(tf) State Canceled // TODO(tf) State Canceled
this->status = TransactionStatus::CANCELLED();
if (handler_ != nullptr) { if (handler_ != nullptr) {
handler_->Cancel(); handler_->Cancel();
} }
......
...@@ -105,7 +105,7 @@ bool UnaryRpcHandler::OnNext(bool ok) { ...@@ -105,7 +105,7 @@ bool UnaryRpcHandler::OnNext(bool ok) {
return false; return false;
} }
void UnaryRpcHandler::Cancel() { call_context.TryCancel(); } void UnaryRpcHandler::Cancel() { transaction_status = TransactionStatus::CANCELLED(); call_context.TryCancel(); }
void UnaryRpcHandler::handleCallCompleteState() { void UnaryRpcHandler::handleCallCompleteState() {
CAOSDB_LOG_TRACE(logger_name) << "Enter UnaryRpcHandler::handleCallCompleteState"; CAOSDB_LOG_TRACE(logger_name) << "Enter UnaryRpcHandler::handleCallCompleteState";
......
...@@ -37,11 +37,11 @@ TEST(test_issues, test_issue_11) { ...@@ -37,11 +37,11 @@ TEST(test_issues, test_issue_11) {
Connection connection(configuration); Connection connection(configuration);
auto transaction = connection.CreateTransaction(); auto transaction = connection.CreateTransaction();
ASSERT_EQ(transaction->GetResultSet().size(), 0); EXPECT_EQ(transaction->GetResultSet().size(), 0);
transaction->RetrieveById("100"); transaction->RetrieveById("100");
ASSERT_EQ(StatusCode::EXECUTING, transaction->ExecuteAsynchronously()); EXPECT_EQ(StatusCode::EXECUTING, transaction->ExecuteAsynchronously());
// Trying to obtain ResultSet while it is still empty. // Trying to obtain ResultSet while it is still empty.
ASSERT_EQ(transaction->GetResultSet().size(), 0); EXPECT_EQ(transaction->GetResultSet().size(), 0);
} }
} // namespace caosdb::transaction } // namespace caosdb::transaction
...@@ -204,7 +204,7 @@ TEST(test_transaction, test_retrieve_and_download) { ...@@ -204,7 +204,7 @@ TEST(test_transaction, test_retrieve_and_download) {
EXPECT_EQ(transaction->GetStatus().GetCode(), StatusCode::GO_ON); EXPECT_EQ(transaction->GetStatus().GetCode(), StatusCode::GO_ON);
EXPECT_EQ(transaction->ExecuteAsynchronously(), StatusCode::EXECUTING); EXPECT_EQ(transaction->ExecuteAsynchronously(), StatusCode::EXECUTING);
EXPECT_EQ(transaction->GetStatus().GetCode(), StatusCode::CONNECTION_ERROR); EXPECT_EQ(transaction->WaitForIt().GetCode(), StatusCode::CONNECTION_ERROR);
} }
TEST(test_transaction, test_insert_with_file) { TEST(test_transaction, test_insert_with_file) {
...@@ -221,7 +221,7 @@ TEST(test_transaction, test_insert_with_file) { ...@@ -221,7 +221,7 @@ TEST(test_transaction, test_insert_with_file) {
EXPECT_EQ(transaction->GetUploadFiles().size(), 1); EXPECT_EQ(transaction->GetUploadFiles().size(), 1);
transaction->ExecuteAsynchronously(); transaction->ExecuteAsynchronously();
EXPECT_EQ(transaction->GetStatus().GetCode(), StatusCode::FILE_UPLOAD_ERROR); EXPECT_EQ(transaction->WaitForIt().GetCode(), StatusCode::FILE_UPLOAD_ERROR);
} }
TEST(test_transaction, test_copy_result_set) { TEST(test_transaction, test_copy_result_set) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment