diff --git a/include/CMakeLists.txt b/include/CMakeLists.txt index 62154f7c6f8f8d42aa478099c63ba8220bb60e0a..af287eba3b9bb6719f9313411ecf8f0ddd42b11f 100644 --- a/include/CMakeLists.txt +++ b/include/CMakeLists.txt @@ -40,12 +40,10 @@ set(libcaosdb_INCL ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/unary_rpc_handler.h ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/utility.h ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/file_transmission/register_file_upload_handler.h - ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/file_transmission/Client.h ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/file_transmission/upload_request_handler.h ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/file_transmission/download_request_handler.h ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/file_transmission/file_writer.h ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/file_transmission/file_reader.h - ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/file_transmission/file_lock.h ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/file_transmission/file_error.h ) diff --git a/include/caosdb/file_transmission/Client.h b/include/caosdb/file_transmission/Client.h deleted file mode 100644 index 3a2b4a8fc487112a4d89d845d0340620261c2fc4..0000000000000000000000000000000000000000 --- a/include/caosdb/file_transmission/Client.h +++ /dev/null @@ -1,41 +0,0 @@ -#include "caosdb/entity.h" // for FileDescriptor -#include "caosdb/entity/v1alpha1/main.grpc.pb.h" // for FileTransmissionS... -#include "caosdb/handler_interface.h" // for HandlerInterface -#include "caosdb/status_code.h" // for StatusCode -#include "caosdb/transaction_status.h" // for StatusCode -#include <grpcpp/impl/codegen/completion_queue.h> // for CompletionQueue -#include <memory> // for shared_ptr, uniqu... - -namespace caosdb::transaction { -using caosdb::StatusCode; -using caosdb::entity::FileDescriptor; -using caosdb::entity::v1alpha1::FileTransmissionService; - -class FileExchangeClient final { -public: - FileExchangeClient( - const std::shared_ptr<FileTransmissionService::Stub> &service) - : stub_(service) {} - - ~FileExchangeClient(); - - FileExchangeClient(const FileExchangeClient &) = delete; - FileExchangeClient &operator=(const FileExchangeClient &) = delete; - FileExchangeClient(FileExchangeClient &&) = delete; - FileExchangeClient &operator=(FileExchangeClient &&) = delete; - - StatusCode upload(const FileDescriptor &file_descriptor); - StatusCode download(const FileDescriptor &file_descriptor); - - void Cancel(); - -private: - int processMessages(); - - grpc::CompletionQueue cq_; - - std::shared_ptr<FileTransmissionService::Stub> stub_; - std::unique_ptr<HandlerInterface> handler_; -}; - -} // namespace caosdb::transaction diff --git a/include/caosdb/file_transmission/download_request_handler.h b/include/caosdb/file_transmission/download_request_handler.h index e6e8f9f4e9aeb53e87766b387b0e6107582e8868..0586c9379c1cf449c85fbcf44f1f213ac2aed945 100644 --- a/include/caosdb/file_transmission/download_request_handler.h +++ b/include/caosdb/file_transmission/download_request_handler.h @@ -31,10 +31,6 @@ public: DownloadRequestHandler(DownloadRequestHandler &&) = delete; DownloadRequestHandler &operator=(DownloadRequestHandler &&) = delete; - TransactionStatus GetStatus() override { - return TransactionStatus::EXECUTING(); - } - void Start() override { OnNext(true); } bool OnNext(bool ok) override; diff --git a/include/caosdb/file_transmission/file_error.h b/include/caosdb/file_transmission/file_error.h index f9bae6083fc7f1db4791f82fee474302b77cd420..b34d47f25f2eb28c2fb5b0df2a0bd3ccfe485c95 100644 --- a/include/caosdb/file_transmission/file_error.h +++ b/include/caosdb/file_transmission/file_error.h @@ -5,20 +5,9 @@ namespace caosdb::transaction { -class FileLockError : public std::runtime_error { -public: - FileLockError(const std::string &message) : std::runtime_error(message) {} -}; - class FileIOError : public std::runtime_error { public: FileIOError(const std::string &message) : std::runtime_error(message) {} }; -class FileNotManagedError : public std::runtime_error { -public: - FileNotManagedError(const std::string &message) - : std::runtime_error(message) {} -}; - } // namespace caosdb::transaction diff --git a/include/caosdb/file_transmission/file_lock.h b/include/caosdb/file_transmission/file_lock.h deleted file mode 100644 index 40092ea49e842f8848da40c38ee4b26135c846aa..0000000000000000000000000000000000000000 --- a/include/caosdb/file_transmission/file_lock.h +++ /dev/null @@ -1,12 +0,0 @@ -#pragma once - -#include <mutex> -#include <shared_mutex> - -namespace caosdb::transaction { - -using FileMutex = std::shared_timed_mutex; -using FileReadLock = std::shared_lock<FileMutex>; -using FileWriteLock = std::unique_lock<FileMutex>; - -} // namespace caosdb::transaction diff --git a/include/caosdb/file_transmission/file_reader.h b/include/caosdb/file_transmission/file_reader.h index 9c89f833d6cbca0cc444b004874db04563bca97a..1e881eb237e5676511255c5de662008a877ffd1f 100644 --- a/include/caosdb/file_transmission/file_reader.h +++ b/include/caosdb/file_transmission/file_reader.h @@ -1,12 +1,11 @@ #pragma once -#include "caosdb/file_transmission/file_lock.h" // for FileMutex, FileReadLock -#include <boost/filesystem/fstream.hpp> // for ifstream -#include <boost/filesystem/operations.hpp> // for exists -#include <boost/filesystem/path.hpp> // for path -#include <fstream> // for ifstream, size_t -#include <memory> // for shared_ptr -#include <string> // for string +#include <boost/filesystem/fstream.hpp> // for ifstream +#include <boost/filesystem/operations.hpp> // for exists +#include <boost/filesystem/path.hpp> // for path +#include <fstream> // for ifstream, size_t +#include <memory> // for shared_ptr +#include <string> // for string namespace caosdb::transaction { using boost::filesystem::exists; @@ -16,8 +15,6 @@ using boost::filesystem::path; class FileReader final { public: FileReader(boost::filesystem::path filename); - FileReader(boost::filesystem::path filename, - std::shared_ptr<FileMutex> mutexPtr); ~FileReader() = default; @@ -37,9 +34,6 @@ private: std::ifstream stream_; boost::filesystem::path filename_; unsigned long long size_; - - std::shared_ptr<FileMutex> mutexPtr_; - FileReadLock lock_; }; } // namespace caosdb::transaction diff --git a/include/caosdb/file_transmission/file_writer.h b/include/caosdb/file_transmission/file_writer.h index 114918c76d7cda3d713633f0581101ddafca0e8b..a300eab18de9164b6dad939ab4fa713f17fdc457 100644 --- a/include/caosdb/file_transmission/file_writer.h +++ b/include/caosdb/file_transmission/file_writer.h @@ -1,18 +1,15 @@ #pragma once -#include "caosdb/file_transmission/file_lock.h" // for FileMutex, FileWriteLock -#include <boost/filesystem/path.hpp> // for path -#include <fstream> // for ofstream -#include <memory> // for shared_ptr -#include <string> // for string +#include <boost/filesystem/path.hpp> // for path +#include <fstream> // for ofstream +#include <memory> // for shared_ptr +#include <string> // for string namespace caosdb::transaction { class FileWriter final { public: FileWriter(boost::filesystem::path filename); - FileWriter(boost::filesystem::path filename, - std::shared_ptr<FileMutex> mutexPtr); ~FileWriter() = default; @@ -29,9 +26,6 @@ private: std::ofstream stream_; boost::filesystem::path filename_; - - std::shared_ptr<FileMutex> mutexPtr_; - FileWriteLock lock_; }; } // namespace caosdb::transaction diff --git a/include/caosdb/file_transmission/upload_request_handler.h b/include/caosdb/file_transmission/upload_request_handler.h index 0bfd45581ef9eabbd6ce90dcb4604130225102b6..ca7123babc13d53eada37c25905778e054496538 100644 --- a/include/caosdb/file_transmission/upload_request_handler.h +++ b/include/caosdb/file_transmission/upload_request_handler.h @@ -32,10 +32,6 @@ public: UploadRequestHandler(UploadRequestHandler &&) = delete; UploadRequestHandler &operator=(UploadRequestHandler &&) = delete; - TransactionStatus GetStatus() override { - return TransactionStatus::EXECUTING(); - } - void Start() override { OnNext(true); } bool OnNext(bool ok) override; diff --git a/include/caosdb/handler_interface.h b/include/caosdb/handler_interface.h index ff1a42b047cc7f216cacc63c945e97adb709c66c..4ba563a300175478c8140e9b88c991e2f5321245 100644 --- a/include/caosdb/handler_interface.h +++ b/include/caosdb/handler_interface.h @@ -59,6 +59,8 @@ const static std::string logger_name = "caosdb::transaction"; class HandlerInterface { public: + HandlerInterface() : transaction_status(TransactionStatus::READY()) {} + virtual ~HandlerInterface() = default; virtual void Start() = 0; @@ -67,7 +69,10 @@ public: virtual void Cancel() = 0; - virtual TransactionStatus GetStatus() = 0; + inline TransactionStatus GetStatus() { return this->transaction_status; } + +protected: + TransactionStatus transaction_status; }; using HandlerPtr = std::unique_ptr<HandlerInterface>; diff --git a/include/caosdb/unary_rpc_handler.h b/include/caosdb/unary_rpc_handler.h index 816bd65683eeee0d5aebf34abb386c76cbb544fb..a8f83c19885a17c7ed9434d4752a27831b8642e4 100644 --- a/include/caosdb/unary_rpc_handler.h +++ b/include/caosdb/unary_rpc_handler.h @@ -10,8 +10,8 @@ namespace caosdb::transaction { class UnaryRpcHandler : public HandlerInterface { public: inline UnaryRpcHandler(grpc::CompletionQueue *completion_queue) - : state_(CallState::NewCall), completion_queue(completion_queue), - transaction_status(TransactionStatus::EXECUTING()) {} + : HandlerInterface(), state_(CallState::NewCall), + completion_queue(completion_queue) {} void Start() override { transaction_status = TransactionStatus::EXECUTING(); @@ -22,8 +22,6 @@ public: void Cancel() override; - TransactionStatus GetStatus() override { return transaction_status; } - protected: virtual void handleNewCallState() = 0; virtual void handleReceivingFileState() = 0; @@ -35,7 +33,6 @@ protected: grpc::ClientContext call_context; grpc::Status status_; - TransactionStatus transaction_status; }; } // namespace caosdb::transaction diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 7dd382a400699b052bf0edf04948532e1ab410cc..89180ee81fc6b07d4f081c90fd1200530a2d60b6 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -31,7 +31,6 @@ set(libcaosdb_SRC ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/transaction_handler.cpp ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/unary_rpc_handler.cpp ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/file_transmission/register_file_upload_handler.cpp - ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/file_transmission/client.cpp ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/file_transmission/upload_request_handler.cpp ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/file_transmission/download_request_handler.cpp ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/file_transmission/file_writer.cpp diff --git a/src/caosdb/file_transmission/client.cpp b/src/caosdb/file_transmission/client.cpp deleted file mode 100644 index fcb0bbda5d0b87b57629acf2631dab236e4135dd..0000000000000000000000000000000000000000 --- a/src/caosdb/file_transmission/client.cpp +++ /dev/null @@ -1,93 +0,0 @@ -#include "caosdb/file_transmission/Client.h" -#include "caosdb/file_transmission/download_request_handler.h" // for DownloadReq... -#include "caosdb/file_transmission/upload_request_handler.h" // for UploadReque... -#include "caosdb/logging.h" // for CAOSDB_LOG_... -#include "caosdb/status_code.h" // for StatusCode -#include <boost/log/core/record.hpp> // for record -#include <boost/log/sources/record_ostream.hpp> // for basic_recor... -#include <boost/preprocessor/seq/limits/enum_256.hpp> // for BOOST_PP_SE... -#include <boost/preprocessor/seq/limits/size_256.hpp> // for BOOST_PP_SE... -#include <exception> // IWYU pragma: keep -// IWYU pragma: no_include <bits/exception.h> -#include <grpcpp/impl/codegen/completion_queue.h> // for CompletionQ... - -namespace caosdb::transaction { -using caosdb::StatusCode; - -FileExchangeClient::~FileExchangeClient() { - this->Cancel(); - - cq_.Shutdown(); - - // drain the queue - void *ignoredTag = nullptr; - bool ok = false; - while (cq_.Next(&ignoredTag, &ok)) { - ; - } -} - -StatusCode FileExchangeClient::upload(const FileDescriptor &file_descriptor) { - handler_ = std::make_unique<UploadRequestHandler>(&handler_, stub_.get(), - &cq_, file_descriptor); - - int status = this->processMessages(); - if (status > 0) { - return StatusCode::FILE_UPLOAD_ERROR; - } - return StatusCode::SUCCESS; -} - -StatusCode FileExchangeClient::download(const FileDescriptor &file_descriptor) { - handler_ = std::make_unique<DownloadRequestHandler>(&handler_, stub_.get(), - &cq_, file_descriptor); - - int status = this->processMessages(); - if (status > 0) { - return StatusCode::FILE_DOWNLOAD_ERROR; - } - return StatusCode::SUCCESS; -} - -void FileExchangeClient::Cancel() { - if (handler_) { - handler_->Cancel(); - } -} - -int FileExchangeClient::processMessages() { - try { - handler_->Start(); - void *tag = nullptr; - bool ok = false; - while (true) { - if (cq_.Next(&tag, &ok)) { - if (tag != nullptr) { - // TODO(tf): assert - auto res = handler_->OnNext(ok); - if (!res) { - // TODO(tf): comment - handler_.reset(); - break; - } - } else { - CAOSDB_LOG_ERROR(logger_name) - << "Invalid tag delivered by notification queue."; - } - } else { - CAOSDB_LOG_ERROR(logger_name) - << "Notification queue has been shut down unexpectedly."; - return 1; - } - } - } catch (std::exception &e) { - CAOSDB_LOG_ERROR(logger_name) << "Caught exception: " << e.what(); - return 1; - } catch (...) { - CAOSDB_LOG_ERROR(logger_name) << "Caught unknown exception."; - return 1; - } - return 0; -} - -} // namespace caosdb::transaction diff --git a/src/caosdb/file_transmission/download_request_handler.cpp b/src/caosdb/file_transmission/download_request_handler.cpp index b0e44c3e60bd49ef64db2fcf64be8af6c00c9b5e..214ac6eeaa1be9986bc4ed92c6df19f3e1b5561b 100644 --- a/src/caosdb/file_transmission/download_request_handler.cpp +++ b/src/caosdb/file_transmission/download_request_handler.cpp @@ -33,7 +33,7 @@ using google::protobuf::Arena; DownloadRequestHandler::DownloadRequestHandler( HandlerTag tag, FileTransmissionService::Stub *stub, grpc::CompletionQueue *cq, FileDescriptor file_descriptor) - : tag_(tag), stub_(stub), cq_(cq), + : HandlerInterface(), tag_(tag), stub_(stub), cq_(cq), request_(Arena::CreateMessage<FileDownloadRequest>(get_arena())), response_(Arena::CreateMessage<FileDownloadResponse>(get_arena())), state_(CallState::NewCall), file_descriptor_(std::move(file_descriptor)), @@ -50,7 +50,7 @@ bool DownloadRequestHandler::OnNext(bool ok) { this->handleReceivingFileState(); } else if (state_ == CallState::CallComplete) { this->handleCallCompleteState(); - return false; // TODO(tf): comment + return false; } } else { state_ = CallState::CallComplete; @@ -58,19 +58,20 @@ bool DownloadRequestHandler::OnNext(bool ok) { } return true; - } catch (Exception &e) { - throw; } catch (std::exception &e) { - CAOSDB_LOG_ERROR(logger_name) << "Download processing error: " << e.what(); - throw; + CAOSDB_LOG_ERROR(logger_name) + << "DownloadRequestHandler caught an exception: " << e.what(); + transaction_status = TransactionStatus::GENERIC_ERROR(e.what()); + state_ = CallState::CallComplete; } catch (...) { CAOSDB_LOG_ERROR(logger_name) - << "Download processing error: unknown exception caught"; - throw; + << "Transaction error: unknown exception caught"; + transaction_status = TransactionStatus::GENERIC_ERROR( + "DownloadRequestHandler caught an unknown exception"); + state_ = CallState::CallComplete; } if (state_ == CallState::NewCall) { - // TODO(tf): comment return false; } @@ -133,22 +134,26 @@ void DownloadRequestHandler::handleReceivingFileState() { void DownloadRequestHandler::handleCallCompleteState() { CAOSDB_LOG_TRACE(logger_name) << "Enter DownloadRequestHandler::handleCallCompleteState"; + switch (status_.error_code()) { - case grpc::OK: - CAOSDB_LOG_INFO(logger_name) << "[" << file_descriptor_.local_path - << "]: download complete: " << bytesReceived_ - << " bytes received" << std::endl; - break; - - case grpc::UNAUTHENTICATED: - throw AuthenticationError(status_.error_message()); - case grpc::UNAVAILABLE: - throw ConnectionError(status_.error_message()); - default: - throw Exception(StatusCode::GENERIC_RPC_ERROR, - "GRPC error code " + std::to_string(status_.error_code()) + - " - " + status_.error_message()); + case grpc::OK: { + CAOSDB_LOG_INFO(logger_name) + << "DownloadRequestHandler finished successfully (" + << file_descriptor_.local_path << "): Download complete, " + << bytesReceived_ << " bytes received."; + } break; + default: { + auto code(static_cast<StatusCode>(status_.error_code())); + std::string description(get_status_description(code) + + " Original message: " + status_.error_message()); + transaction_status = TransactionStatus(code, description); + CAOSDB_LOG_ERROR(logger_name) + << "DownloadRequestHandler finished with an error (" + << file_descriptor_.local_path << "): Download aborted with code " << code + << " - " << description; + } break; } + CAOSDB_LOG_TRACE(logger_name) << "Leave DownloadRequestHandler::handleCallCompleteState"; } diff --git a/src/caosdb/file_transmission/file_reader.cpp b/src/caosdb/file_transmission/file_reader.cpp index 598cd99866c15e3fd2779329c625cdfb47742859..5251af1faadb878e47e56f589196659800808442 100644 --- a/src/caosdb/file_transmission/file_reader.cpp +++ b/src/caosdb/file_transmission/file_reader.cpp @@ -1,7 +1,6 @@ #include "caosdb/file_transmission/file_reader.h" -#include "caosdb/file_transmission/file_error.h" // for FileIOError, FileLockError +#include "caosdb/file_transmission/file_error.h" // for FileIOError #include <boost/filesystem/path.hpp> // for path -#include <mutex> // for try_to_lock #include <utility> // for move namespace caosdb::transaction { @@ -11,20 +10,7 @@ FileReader::FileReader(boost::filesystem::path filename) this->openFile(); } -FileReader::FileReader(boost::filesystem::path filename, - std::shared_ptr<FileMutex> mutexPtr) - : filename_(std::move(filename)), size_(0), mutexPtr_(std::move(mutexPtr)) { - this->openFile(); -} - void FileReader::openFile() { - if (mutexPtr_) { - lock_ = FileReadLock(*mutexPtr_, std::try_to_lock); - if (!lock_) { - throw FileLockError("Can't lock file for reading: " + filename_.string()); - } - } - stream_.open(filename_, std::ios::binary | std::ios::ate); if (!stream_) { throw FileIOError("Can't open file for reading: " + filename_.string()); diff --git a/src/caosdb/file_transmission/file_writer.cpp b/src/caosdb/file_transmission/file_writer.cpp index a0db572e5cc980fe4341be741e276f80e116fe51..314928ed2e84d5a01908c9aec42ebea8390d77be 100644 --- a/src/caosdb/file_transmission/file_writer.cpp +++ b/src/caosdb/file_transmission/file_writer.cpp @@ -1,7 +1,6 @@ #include "caosdb/file_transmission/file_writer.h" -#include "caosdb/file_transmission/file_error.h" // for FileIOError, FileLockError +#include "caosdb/file_transmission/file_error.h" // for FileIOError #include <boost/filesystem/path.hpp> // for path -#include <mutex> // for try_to_lock #include <utility> // for move namespace caosdb::transaction { @@ -11,20 +10,7 @@ FileWriter::FileWriter(boost::filesystem::path filename) this->openFile(); } -FileWriter::FileWriter(boost::filesystem::path filename, - std::shared_ptr<FileMutex> mutexPtr) - : filename_(std::move(filename)), mutexPtr_(std::move(mutexPtr)) { - this->openFile(); -} - void FileWriter::openFile() { - if (mutexPtr_) { - lock_ = FileWriteLock(*mutexPtr_, std::try_to_lock); - if (!lock_) { - throw FileLockError("Can't lock file for writing: " + filename_.string()); - } - } - stream_.open(filename_, std::ios::binary | std::ios::trunc); if (!stream_) { throw FileIOError("Can't open file for writing: " + filename_.string()); diff --git a/src/caosdb/file_transmission/upload_request_handler.cpp b/src/caosdb/file_transmission/upload_request_handler.cpp index 32e6fa792ea930dba2fea13a2c0ea891307a4839..78e199155776396ec6aa84cb6bf867c3d7e5d40a 100644 --- a/src/caosdb/file_transmission/upload_request_handler.cpp +++ b/src/caosdb/file_transmission/upload_request_handler.cpp @@ -36,7 +36,7 @@ UploadRequestHandler::UploadRequestHandler(HandlerTag tag, FileTransmissionService::Stub *stub, grpc::CompletionQueue *cq, FileDescriptor file_descriptor) - : tag_(tag), stub_(stub), cq_(cq), + : HandlerInterface(), tag_(tag), stub_(stub), cq_(cq), request_(Arena::CreateMessage<FileUploadRequest>(get_arena())), response_(Arena::CreateMessage<FileUploadResponse>(get_arena())), state_(CallState::NewCall), file_descriptor_(std::move(file_descriptor)), @@ -46,7 +46,7 @@ bool UploadRequestHandler::OnNext(bool ok) { try { if (state_ == CallState::CallComplete) { this->handleCallCompleteState(); - return false; // TODO(tf): comment + return false; } else if (ok) { if (state_ == CallState::NewCall) { this->handleNewCallState(); @@ -63,17 +63,20 @@ bool UploadRequestHandler::OnNext(bool ok) { } return true; - } catch (Exception &e) { - throw; } catch (std::exception &e) { - CAOSDB_LOG_ERROR(logger_name) << "Upload processing error: " << e.what(); + CAOSDB_LOG_ERROR(logger_name) + << "UploadRequestHandler caught an exception: " << e.what(); + transaction_status = TransactionStatus::GENERIC_ERROR(e.what()); + state_ = CallState::CallComplete; } catch (...) { CAOSDB_LOG_ERROR(logger_name) - << "Upload processing error: unknown exception caught"; + << "Transaction error: unknown exception caught"; + transaction_status = TransactionStatus::GENERIC_ERROR( + "UploadRequestHandler caught an unknown exception"); + state_ = CallState::CallComplete; } if (state_ == CallState::NewCall) { - // TODO(tf): comment return false; } @@ -90,6 +93,7 @@ void UploadRequestHandler::handleNewCallState() { rpc_ = stub_->PrepareAsyncFileUpload(&ctx_, response_, cq_); + transaction_status = TransactionStatus::EXECUTING(); state_ = CallState::SendingHeader; rpc_->StartCall(tag_); } @@ -138,21 +142,31 @@ void UploadRequestHandler::handleExpectingResponseState() { } void UploadRequestHandler::handleCallCompleteState() { + CAOSDB_LOG_TRACE(logger_name) + << "Enter UploadRequestHandler::handleCallCompleteState"; + switch (status_.error_code()) { case grpc::OK: { - auto bytesSent = fileReader_ ? fileReader_->fileSize() : 0; + auto bytesSent = fileReader_ != nullptr ? fileReader_->fileSize() : 0; CAOSDB_LOG_INFO(logger_name) - << "[" << file_descriptor_.local_path - << "]: upload complete: " << bytesSent << " bytes sent" << std::endl; + << "UploadRequestHandler finished successfully (" + << file_descriptor_.local_path << "): upload complete, " << bytesSent + << " bytes sent"; + } break; + default: { + auto code(static_cast<StatusCode>(status_.error_code())); + std::string description(get_status_description(code) + + " Original message: " + status_.error_message()); + transaction_status = TransactionStatus(code, description); + CAOSDB_LOG_ERROR(logger_name) + << "UploadRequestHandler finished with an error (" + << file_descriptor_.local_path << "): Upload aborted with code " << code + << " - " << description; } break; - - case grpc::UNAUTHENTICATED: - throw AuthenticationError(status_.error_message()); - case grpc::UNAVAILABLE: - throw ConnectionError(status_.error_message()); - default: - throw Exception(StatusCode::GENERIC_RPC_ERROR, status_.error_message()); } + + CAOSDB_LOG_TRACE(logger_name) + << "Leave UploadRequestHandler::handleCallCompleteState"; } } // namespace caosdb::transaction diff --git a/src/caosdb/transaction.cpp b/src/caosdb/transaction.cpp index a2008c74c57e5d0e84c6ba8b35a86f15020eb2a3..59eea7f61a91495bbeceb1b5466ea51eba1f0c33 100644 --- a/src/caosdb/transaction.cpp +++ b/src/caosdb/transaction.cpp @@ -20,9 +20,9 @@ #include "caosdb/transaction.h" #include "caosdb/entity/v1alpha1/main.grpc.pb.h" // for EntityTransac... #include "caosdb/entity/v1alpha1/main.pb.h" // for TransactionRe... -#include "caosdb/file_transmission/Client.h" // for FileExchangeC... #include "caosdb/file_transmission/register_file_upload_handler.h" -#include "caosdb/file_transmission/upload_request_handler.h" // Uplo... +#include "caosdb/file_transmission/upload_request_handler.h" // Upload... +#include "caosdb/file_transmission/download_request_handler.h" // Download... #include "caosdb/logging.h" // for CAOSDB_LOG_FATAL #include "caosdb/protobuf_helper.h" // for get_arena #include "caosdb/status_code.h" // for StatusCode @@ -318,8 +318,10 @@ auto Transaction::ExecuteAsynchronously() noexcept -> StatusCode { handler_ = std::make_unique<EntityTransactionHandler>( &handler_, entity_service.get(), &completion_queue, request, response); - this->status = ProcessCalls(); + if (this->status.GetCode() != StatusCode::EXECUTING) { + return StatusCode::EXECUTING; + } // file download afterwards if (status.GetCode() == StatusCode::SUCCESS && !download_files.empty()) { @@ -340,15 +342,15 @@ auto Transaction::ExecuteAsynchronously() noexcept -> StatusCode { } } - FileExchangeClient download_client(file_service); for (const auto &item : download_files) { auto file_descriptor(item.second); - CAOSDB_DEBUG_MESSAGE_STRING(*file_descriptor.file_transmission_id, out) CAOSDB_LOG_INFO(logger_name) - << "Downloading " << file_descriptor.local_path << ", " << out; - auto file_download_status = download_client.download(file_descriptor); - if (file_download_status != StatusCode::SUCCESS) { - this->status = TransactionStatus::FILE_DOWNLOAD_ERROR(); + << "Downloading " << file_descriptor.local_path; + + handler_ = std::make_unique<DownloadRequestHandler>( + &handler_, file_service.get(), &completion_queue, file_descriptor); + this->status = ProcessCalls(); + if (this->status.GetCode() != StatusCode::EXECUTING) { return StatusCode::EXECUTING; } } diff --git a/src/caosdb/unary_rpc_handler.cpp b/src/caosdb/unary_rpc_handler.cpp index b41c13af66cff101fc91fa0ee1d283ffc0289483..060f4c0dc4009afccd6420cb9856cb13a30006e5 100644 --- a/src/caosdb/unary_rpc_handler.cpp +++ b/src/caosdb/unary_rpc_handler.cpp @@ -69,7 +69,6 @@ void UnaryRpcHandler::handleCallCompleteState() { switch (status_.error_code()) { case grpc::OK: - transaction_status = TransactionStatus::SUCCESS(); CAOSDB_LOG_INFO(logger_name) << "UnaryRpcHandler finished successfully."; break; default: