From ef5885d8c6251ee4cdf89ff22cc8596ea4f86bb1 Mon Sep 17 00:00:00 2001 From: Timm Fitschen <t.fitschen@indiscale.com> Date: Tue, 10 Aug 2021 17:12:03 +0200 Subject: [PATCH] WIP: files --- doc/Doxyfile.in | 16 ++-- include/caosdb/configuration.h | 1 + include/caosdb/entity.h | 21 ++++- include/caosdb/filestreaming/FileWriter.h | 12 ++- include/caosdb/protobuf_helper.h | 8 ++ include/caosdb/transaction.h | 7 +- include/caosdb/transaction_status.h | 8 ++ src/caosdb/configuration.cpp | 40 +++++---- .../filestreaming/DownloadRequestHandler.cpp | 67 +++++++++++---- src/caosdb/filestreaming/FileWriter.cpp | 11 +-- src/caosdb/logging.cpp | 6 ++ src/caosdb/transaction.cpp | 86 +++++++++++++++++-- test/test_transaction.cpp | 2 +- 13 files changed, 219 insertions(+), 66 deletions(-) diff --git a/doc/Doxyfile.in b/doc/Doxyfile.in index 798198a..787d718 100644 --- a/doc/Doxyfile.in +++ b/doc/Doxyfile.in @@ -485,7 +485,7 @@ NUM_PROC_THREADS = 1 # normally produced when WARNINGS is set to YES. # The default value is: NO. -EXTRACT_ALL = NO +EXTRACT_ALL = YES # If the EXTRACT_PRIVATE tag is set to YES, all private members of a class will # be included in the documentation. @@ -2317,7 +2317,7 @@ HIDE_UNDOC_RELATIONS = YES # set to NO # The default value is: NO. -HAVE_DOT = NO +HAVE_DOT = YES # The DOT_NUM_THREADS specifies the number of dot invocations doxygen is allowed # to run in parallel. When set to 0 doxygen will base this on the number of @@ -2383,7 +2383,7 @@ GROUP_GRAPHS = YES # The default value is: NO. # This tag requires that the tag HAVE_DOT is set to YES. -UML_LOOK = NO +UML_LOOK = YES # If the UML_LOOK tag is enabled, the fields and methods are shown inside the # class node. If there are many fields or methods and many nodes the graph may @@ -2396,7 +2396,7 @@ UML_LOOK = NO # Minimum value: 0, maximum value: 100, default value: 10. # This tag requires that the tag UML_LOOK is set to YES. -UML_LIMIT_NUM_FIELDS = 10 +UML_LIMIT_NUM_FIELDS = 50 # If the DOT_UML_DETAILS tag is set to NO, doxygen will show attributes and # methods without types and arguments in the UML graphs. If the DOT_UML_DETAILS @@ -2409,7 +2409,7 @@ UML_LIMIT_NUM_FIELDS = 10 # The default value is: NO. # This tag requires that the tag UML_LOOK is set to YES. -DOT_UML_DETAILS = NO +DOT_UML_DETAILS = YES # The DOT_WRAP_THRESHOLD tag can be used to set the maximum number of characters # to display on a single line. If the actual line length exceeds this threshold @@ -2426,7 +2426,7 @@ DOT_WRAP_THRESHOLD = 17 # The default value is: NO. # This tag requires that the tag HAVE_DOT is set to YES. -TEMPLATE_RELATIONS = NO +TEMPLATE_RELATIONS = YES # If the INCLUDE_GRAPH, ENABLE_PREPROCESSING and SEARCH_INCLUDES tags are set to # YES then doxygen will generate a graph for each documented file showing the @@ -2566,7 +2566,7 @@ PLANTUML_INCLUDE_PATH = # Minimum value: 0, maximum value: 10000, default value: 50. # This tag requires that the tag HAVE_DOT is set to YES. -DOT_GRAPH_MAX_NODES = 50 +DOT_GRAPH_MAX_NODES = 100 # The MAX_DOT_GRAPH_DEPTH tag can be used to set the maximum depth of the graphs # generated by dot. A depth value of 3 means that only nodes reachable from the @@ -2590,7 +2590,7 @@ MAX_DOT_GRAPH_DEPTH = 0 # The default value is: NO. # This tag requires that the tag HAVE_DOT is set to YES. -DOT_TRANSPARENT = NO +DOT_TRANSPARENT = YES # Set the DOT_MULTI_TARGETS tag to YES to allow dot to generate multiple output # files in one run (i.e. multiple -o and -T options on the command line). This diff --git a/include/caosdb/configuration.h b/include/caosdb/configuration.h index 80bc927..4910bee 100644 --- a/include/caosdb/configuration.h +++ b/include/caosdb/configuration.h @@ -113,6 +113,7 @@ public: friend class ConfigurationManager; private: + auto ConvertLogLevel(const std::string &string_level) const -> int; auto CreateConsoleSinkConfiguration(const object &from, const std::string &name, int level) const -> std::shared_ptr<caosdb::logging::SinkConfiguration>; diff --git a/include/caosdb/entity.h b/include/caosdb/entity.h index e612326..28d5446 100644 --- a/include/caosdb/entity.h +++ b/include/caosdb/entity.h @@ -31,6 +31,7 @@ #include "caosdb/entity/v1alpha1/main.pb.h" // for ProtoEntity, ProtoParent... #include "caosdb/status_code.h" +#include "caosdb/logging.h" #include "google/protobuf/message.h" // for RepeatedPtrField #include "caosdb/message_code.h" // for get_message_code, Messag... #include "google/protobuf/util/json_util.h" // for MessageToJsonString, Jso... @@ -54,6 +55,8 @@ using caosdb::entity::v1alpha1::EntityResponse; using caosdb::entity::v1alpha1::FileTransmissionId; using google::protobuf::RepeatedPtrField; +const std::string logger_name = "caosdb::entity"; + struct FileDescriptor { FileTransmissionId *file_transmission_id; ProtoFileDescriptor *wrapped; @@ -484,9 +487,6 @@ public: auto CopyTo(ProtoEntity *target) -> void; auto SetFilePath(const std::string &path) -> void; - inline auto GetFileTransmissionId() const -> const FileTransmissionId & { - return *(this->file_transmission_id); - } inline auto HasFile() const -> bool { return !this->file_descriptor.local_path.empty(); } @@ -502,18 +502,32 @@ public: return this->file_descriptor; } + inline auto GetLocalPath() const noexcept -> const boost::filesystem::path & { + return this->file_descriptor.local_path; + } + inline auto SetLocalPath(const boost::filesystem::path &local_path) noexcept -> StatusCode { if (GetRole() != "File") { + CAOSDB_LOG_WARN(logger_name) + << "Entity::SetLocalPath failed. This is not a file entity."; return StatusCode::NOT_A_FILE_ENTITY; } if (!exists(local_path)) { + CAOSDB_LOG_WARN(logger_name) + << "Entity::SetLocalPath failed. This file does not exists: " + << local_path.string(); return StatusCode::FILE_DOES_NOT_EXIST_LOCALLY; } if (is_directory(local_path)) { + CAOSDB_LOG_WARN(logger_name) + << "Entity::SetLocalPath failed. This file is a directory: " + << local_path.string(); return StatusCode::PATH_IS_A_DIRECTORY; } + CAOSDB_LOG_TRACE(logger_name) + << "Entity::SetLocalPath(" << local_path.string() << ");"; this->file_descriptor.local_path = local_path; return StatusCode::SUCCESS; } @@ -534,7 +548,6 @@ private: static auto CreateMessagesField() -> RepeatedPtrField<ProtoMessage> *; auto SetId(const std::string &id) -> void; auto SetVersionId(const std::string &id) -> void; - FileTransmissionId *file_transmission_id = nullptr; FileDescriptor file_descriptor; ProtoEntity *wrapped; Properties properties; diff --git a/include/caosdb/filestreaming/FileWriter.h b/include/caosdb/filestreaming/FileWriter.h index 81a2ae3..1ef9bd5 100644 --- a/include/caosdb/filestreaming/FileWriter.h +++ b/include/caosdb/filestreaming/FileWriter.h @@ -1,6 +1,10 @@ #pragma once -#include "FileLock.h" +#include "caosdb/filestreaming/FileLock.h" + +#include <boost/filesystem.hpp> +#include <boost/filesystem/fstream.hpp> +#include <boost/filesystem/string_file.hpp> #include <fstream> #include <memory> @@ -10,8 +14,8 @@ namespace FileExchange { class FileWriter final { public: - FileWriter(const std::string &filename); - FileWriter(const std::string &filename, + FileWriter(const boost::filesystem::path &filename); + FileWriter(const boost::filesystem::path &filename, const std::shared_ptr<FileMutex> &mutexPtr); ~FileWriter() = default; @@ -28,7 +32,7 @@ private: void openFile(); std::ofstream stream_; - std::string filename_; + boost::filesystem::path filename_; std::shared_ptr<FileMutex> mutexPtr_; FileWriteLock lock_; diff --git a/include/caosdb/protobuf_helper.h b/include/caosdb/protobuf_helper.h index ce7dd5e..838908e 100644 --- a/include/caosdb/protobuf_helper.h +++ b/include/caosdb/protobuf_helper.h @@ -23,6 +23,14 @@ #define CAOSDB_PROTOBUF_HELPER_H #include <google/protobuf/arena.h> +#include <google/protobuf/util/json_util.h> // for MessageToJsonString, Jso... + +#define CAOSDB_DEBUG_MESSAGE_STRING(message, out) \ + std::string out; \ + { \ + google::protobuf::util::JsonOptions options; \ + google::protobuf::util::MessageToJsonString(message, &out, options); \ + } namespace caosdb::utility { diff --git a/include/caosdb/transaction.h b/include/caosdb/transaction.h index 1973f44..3fd1e52 100644 --- a/include/caosdb/transaction.h +++ b/include/caosdb/transaction.h @@ -305,6 +305,8 @@ public: return *(this->entity); } + friend Transaction; + private: std::unique_ptr<Entity> entity; }; @@ -342,7 +344,8 @@ public: * If the file cannot be downloaded due to unsufficient permissions an error * is appended. */ - auto RetrieveAndDownloadFilesById(const std::string &id) noexcept + auto RetrieveAndDownloadFilesById(const std::string &id, + const std::string &local_path) noexcept -> StatusCode; /** @@ -487,7 +490,7 @@ public: } std::vector<FileDescriptor> upload_files; - std::vector<FileDescriptor> download_files; + std::map<std::string, FileDescriptor> download_files; private: auto RegisterUploadFile(RegisterFileUploadResponse *response) -> void; diff --git a/include/caosdb/transaction_status.h b/include/caosdb/transaction_status.h index e3bdf0b..c018ef5 100644 --- a/include/caosdb/transaction_status.h +++ b/include/caosdb/transaction_status.h @@ -127,6 +127,14 @@ public: */ CAOSDB_TRANSACTION_STATUS_DEFAULT_FACTORY(FILE_UPLOAD_ERROR, StatusCode::FILE_UPLOAD_ERROR); + /** + * Factory for a FILE_DOWN_ERROR status. + * + * This status means that the transaction failed during the download of the + * file blobs of file entities. + */ + CAOSDB_TRANSACTION_STATUS_DEFAULT_FACTORY(FILE_DOWNLOAD_ERROR, + StatusCode::FILE_DOWNLOAD_ERROR); /** * Factory for a TRANSACTION_ERROR status. * diff --git a/src/caosdb/configuration.cpp b/src/caosdb/configuration.cpp index 76c4372..5ebda79 100644 --- a/src/caosdb/configuration.cpp +++ b/src/caosdb/configuration.cpp @@ -302,8 +302,9 @@ auto LoggingConfigurationHelper::CreateSinkConfiguration( assert(from.contains("destination")); const auto &destination = std::string(from.at("destination").as_string().c_str()); + int level = from.contains("level") - ? static_cast<int>(from.at("level").as_int64()) + ? ConvertLogLevel(from.at("level").as_string().c_str()) : default_level; if (destination == "file") { @@ -317,38 +318,43 @@ auto LoggingConfigurationHelper::CreateSinkConfiguration( } } -auto LoggingConfigurationHelper::CreateLoggingConfiguration( - const object &from) const -> LoggingConfiguration { - auto default_level_str = from.contains("level") - ? std::string(from.at("level").as_string().c_str()) - : ""; - int default_level = 0; +auto LoggingConfigurationHelper::ConvertLogLevel( + const std::string &string_level) const -> int { static std::map<std::string, int> log_level_names = { {"", CAOSDB_DEFAULT_LOG_LEVEL}, {"off", CAOSDB_LOG_LEVEL_OFF}, {"fatal", CAOSDB_LOG_LEVEL_FATAL}, {"error", CAOSDB_LOG_LEVEL_ERROR}, {"warn", CAOSDB_LOG_LEVEL_WARN}, {"info", CAOSDB_LOG_LEVEL_INFO}, {"debug", CAOSDB_LOG_LEVEL_DEBUG}, {"trace", CAOSDB_LOG_LEVEL_TRACE}, {"all", CAOSDB_LOG_LEVEL_ALL}}; - try { - default_level = CAOSDB_DEFAULT_LOG_LEVEL; + return log_level_names.at(string_level); } catch (const std::out_of_range &exc) { - throw ConfigurationError("Unknown log level: " + default_level_str); + throw ConfigurationError("Unknown log level: " + string_level); } +} + +auto LoggingConfigurationHelper::CreateLoggingConfiguration( + const object &from) const -> LoggingConfiguration { + auto default_level_str = from.contains("level") + ? std::string(from.at("level").as_string().c_str()) + : ""; + int default_level = ConvertLogLevel(default_level_str); auto result = LoggingConfiguration(default_level); if (default_level == CAOSDB_LOG_LEVEL_OFF) { return result; } - const auto &sinks = from.at("sinks").as_object(); - if (!sinks.empty()) { - const auto *elem = sinks.begin(); + if (from.contains("sinks")) { + const auto &sinks = from.at("sinks").as_object(); + if (!sinks.empty()) { + const auto *elem = sinks.begin(); - while (elem != sinks.end()) { - result.AddSink(CreateSinkConfiguration( - elem->value().as_object(), elem->key().to_string(), default_level)); - elem = std::next(elem); + while (elem != sinks.end()) { + result.AddSink(CreateSinkConfiguration( + elem->value().as_object(), elem->key().to_string(), default_level)); + elem = std::next(elem); + } } } diff --git a/src/caosdb/filestreaming/DownloadRequestHandler.cpp b/src/caosdb/filestreaming/DownloadRequestHandler.cpp index a485e4c..74be7e7 100644 --- a/src/caosdb/filestreaming/DownloadRequestHandler.cpp +++ b/src/caosdb/filestreaming/DownloadRequestHandler.cpp @@ -1,11 +1,16 @@ #include "caosdb/filestreaming/DownloadRequestHandler.h" #include "caosdb/protobuf_helper.h" - -#include <grpc/support/log.h> +#include "caosdb/exceptions.h" +#include "caosdb/status_code.h" +#include "caosdb/logging.h" #include <iostream> namespace FileExchange { +using caosdb::StatusCode; +using caosdb::exceptions::AuthenticationError; +using caosdb::exceptions::ConnectionError; +using caosdb::exceptions::Exception; using caosdb::utility::get_arena; using google::protobuf::Arena; @@ -38,10 +43,15 @@ bool DownloadRequestHandler::onNext(bool ok) { } return true; + } catch (Exception &e) { + throw; } catch (std::exception &e) { - gpr_log(GPR_ERROR, "Download processing error: %s", e.what()); + CAOSDB_LOG_ERROR(logger_name) << "Download processing error: " << e.what(); + throw; } catch (...) { - gpr_log(GPR_ERROR, "Download processing error: unknown exception caught"); + CAOSDB_LOG_ERROR(logger_name) + << "Download processing error: unknown exception caught"; + throw; } if (state_ == CallState::NewCall) { @@ -57,27 +67,47 @@ bool DownloadRequestHandler::onNext(bool ok) { void DownloadRequestHandler::cancel() { ctx_.TryCancel(); } void DownloadRequestHandler::handleNewCallState() { + CAOSDB_LOG_TRACE(logger_name) + << "Enter DownloadRequestHandler::handleNewCallState. local_path = " + << file_descriptor_.local_path + << ", download_id = " << file_descriptor_.file_transmission_id; const std::size_t ServerDefaultChunkSize = 0; + fileWriter_ = std::make_unique<FileWriter>(file_descriptor_.local_path); + CAOSDB_DEBUG_MESSAGE_STRING(*request_, request_out) + CAOSDB_LOG_TRACE(logger_name) << "HERE1" << request_out; + CAOSDB_DEBUG_MESSAGE_STRING(*(file_descriptor_.file_transmission_id), + file_transmission_id_out) + CAOSDB_LOG_TRACE(logger_name) << "HERE2" << file_transmission_id_out; request_->mutable_file_transmission_id()->CopyFrom( *(file_descriptor_.file_transmission_id)); + CAOSDB_LOG_TRACE(logger_name) << "HERE2"; rpc_ = stub_->PrepareAsyncFileDownload(&ctx_, *request_, cq_); + CAOSDB_LOG_TRACE(logger_name) << "HERE3"; state_ = CallState::SendingRequest; rpc_->StartCall(tag_); + CAOSDB_LOG_TRACE(logger_name) + << "Leave DownloadRequestHandler::handleNewCallState"; } void DownloadRequestHandler::handleSendingRequestState() { + CAOSDB_LOG_TRACE(logger_name) + << "Enter DownloadRequestHandler::handleSendingRequestState"; state_ = CallState::ReceivingFile; rpc_->Read(response_, tag_); + CAOSDB_LOG_TRACE(logger_name) + << "Leave DownloadRequestHandler::handleSendingRequestState"; } void DownloadRequestHandler::handleReceivingFileState() { + CAOSDB_LOG_TRACE(logger_name) + << "Enter DownloadRequestHandler::handleReceivingFileState"; if (response_->has_chunk()) { auto &chunkData = response_->chunk().data(); if (chunkData.empty()) { - gpr_log(GPR_INFO, "Received an empty FileChunk, ignoring"); + CAOSDB_LOG_DEBUG(logger_name) << "Received an empty FileChunk, ignoring"; } else { fileWriter_->write(chunkData); bytesReceived_ += chunkData.size(); @@ -89,26 +119,31 @@ void DownloadRequestHandler::handleReceivingFileState() { } else { throw std::runtime_error("File chunk expected"); } + CAOSDB_LOG_TRACE(logger_name) + << "Leave DownloadRequestHandler::handleReceivingFileState"; } void DownloadRequestHandler::handleCallCompleteState() { + CAOSDB_LOG_TRACE(logger_name) + << "Enter DownloadRequestHandler::handleCallCompleteState"; switch (status_.error_code()) { case grpc::OK: - std::cout << "[" << file_descriptor_.local_path - << "]: download complete: " << bytesReceived_ << " bytes received" - << std::endl; - break; - - case grpc::CANCELLED: - std::cout << "[" << file_descriptor_.local_path << "]: download cancelled" - << std::endl; + CAOSDB_LOG_INFO(logger_name) << "[" << file_descriptor_.local_path + << "]: download complete: " << bytesReceived_ + << " bytes received" << std::endl; break; + case grpc::UNAUTHENTICATED: + throw AuthenticationError(status_.error_message()); + case grpc::UNAVAILABLE: + throw ConnectionError(status_.error_message()); default: - std::cout << "[" << file_descriptor_.local_path - << "]: download failed: " << status_.error_message() << std::endl; - break; + throw Exception(StatusCode::GENERIC_RPC_ERROR, + "GRPC error code " + std::to_string(status_.error_code()) + + " - " + status_.error_message()); } + CAOSDB_LOG_TRACE(logger_name) + << "Leave DownloadRequestHandler::handleCallCompleteState"; } } // namespace FileExchange diff --git a/src/caosdb/filestreaming/FileWriter.cpp b/src/caosdb/filestreaming/FileWriter.cpp index 9c99331..ade045a 100644 --- a/src/caosdb/filestreaming/FileWriter.cpp +++ b/src/caosdb/filestreaming/FileWriter.cpp @@ -3,11 +3,12 @@ using namespace FileExchange; -FileWriter::FileWriter(const std::string &filename) : filename_(filename) { +FileWriter::FileWriter(const boost::filesystem::path &filename) + : filename_(filename) { this->openFile(); } -FileWriter::FileWriter(const std::string &filename, +FileWriter::FileWriter(const boost::filesystem::path &filename, const std::shared_ptr<FileMutex> &mutexPtr) : filename_(filename), mutexPtr_(mutexPtr) { this->openFile(); @@ -17,13 +18,13 @@ void FileWriter::openFile() { if (mutexPtr_) { lock_ = FileWriteLock(*mutexPtr_, std::try_to_lock); if (!lock_) { - throw FileLockError("Can't lock file for writing: " + filename_); + throw FileLockError("Can't lock file for writing: " + filename_.string()); } } stream_.open(filename_, std::ios::binary | std::ios::trunc); if (!stream_) { - throw FileIOError("Can't open file for writing: " + filename_); + throw FileIOError("Can't open file for writing: " + filename_.string()); } } @@ -31,7 +32,7 @@ void FileWriter::write(const std::string &buffer) { auto bufferSize = buffer.size(); if (bufferSize > 0) { if (!stream_.write(buffer.data(), bufferSize)) { - throw FileIOError("Can't write file: " + filename_); + throw FileIOError("Can't write file: " + filename_.string()); } } } diff --git a/src/caosdb/logging.cpp b/src/caosdb/logging.cpp index 3618826..3be4d38 100644 --- a/src/caosdb/logging.cpp +++ b/src/caosdb/logging.cpp @@ -156,6 +156,12 @@ auto initialize_logging_defaults() -> int { // Called if custom logging settings are specified. auto initialize_logging(const LoggingConfiguration &configuration) -> void { + // first: turn everything off + boost::log::settings off_settings; + off_settings["Core.DisableLogging"] = true; + boost::log::init_from_settings(off_settings); + + // now set everything up boost::log::settings new_settings; if (configuration.GetLevel() == CAOSDB_LOG_LEVEL_OFF) { diff --git a/src/caosdb/transaction.cpp b/src/caosdb/transaction.cpp index 6f4155e..15f320a 100644 --- a/src/caosdb/transaction.cpp +++ b/src/caosdb/transaction.cpp @@ -32,10 +32,11 @@ #include "grpcpp/impl/codegen/status.h" // for Status #include "grpcpp/impl/codegen/status_code_enum.h" // for StatusCode, UNAUTH... #include <cassert> // for assert -#include <map> // for map -#include <memory> // for allocator, unique_ptr -#include <stdexcept> // for out_of_range -#include <utility> // for move +#include <google/protobuf/util/json_util.h> // for MessageToJsonString, Jso... +#include <map> // for map +#include <memory> // for allocator, unique_ptr +#include <stdexcept> // for out_of_range +#include <utility> // for move namespace caosdb { @@ -84,6 +85,10 @@ auto get_status_description(int code) -> const std::string & { {StatusCode::PATH_IS_A_DIRECTORY, "The given path is a directory."}, {StatusCode::FILE_DOES_NOT_EXIST_LOCALLY, "The file does not not exist in the local file system."}, + {StatusCode::FILE_DOWNLOAD_ERROR, + "The transaction failed during the download of the files"}, + {StatusCode::FILE_UPLOAD_ERROR, + "The transaction failed during the upload of the files"}, {StatusCode::UNSUPPORTED_FEATURE, "This feature is not available in the this client implementation."}}; try { @@ -139,8 +144,8 @@ auto Transaction::RetrieveById(const std::string &id) noexcept -> StatusCode { return this->status.GetCode(); } -auto Transaction::RetrieveAndDownloadFilesById(const std::string &id) noexcept - -> StatusCode { +auto Transaction::RetrieveAndDownloadFilesById( + const std::string &id, const std::string &local_path) noexcept -> StatusCode { ASSERT_CAN_ADD_RETRIEVAL auto *retrieve_request = @@ -148,6 +153,8 @@ auto Transaction::RetrieveAndDownloadFilesById(const std::string &id) noexcept retrieve_request->set_id(id); retrieve_request->set_register_file_download(true); + download_files[id].local_path = local_path; + this->status = TransactionStatus::GO_ON(); return this->status.GetCode(); } @@ -303,6 +310,47 @@ auto Transaction::ExecuteAsynchronously() noexcept -> StatusCode { } else { this->status = TransactionStatus::SUCCESS(); } + + // file download afterwards + if (status.GetCode() == StatusCode::SUCCESS && download_files.size() > 0) { + // run over all retrieved entities and get the download_id + for (auto sub_response : *(response->mutable_responses())) { + if (sub_response.wrapped_response_case() == + TransactionResponseCase::kRetrieveResponse) { + if (sub_response.retrieve_response() + .entity_response() + .has_download_id()) { + auto entity_response = + sub_response.mutable_retrieve_response()->mutable_entity_response(); + auto entity_id = entity_response->entity().id(); + download_files[entity_id].file_transmission_id = + entity_response->release_download_id(); + // TODO(tf) handle error + } + } + } + + auto *registration_response = + Arena::CreateMessage<RegisterFileUploadResponse>(get_arena()); + RegisterUploadFile(registration_response); + + // TODO if(registration_response.status != REGISTRATION_STATUS_ACCEPTED){ + // return StatusCode::FILE_UPLOAD_FAILED + // } + FileExchangeClient download_client(file_service); + for (auto item : download_files) { + auto file_descriptor(item.second); + CAOSDB_DEBUG_MESSAGE_STRING(*file_descriptor.file_transmission_id, out) + CAOSDB_LOG_INFO(logger_name) + << "Downloading " << file_descriptor.local_path << ", " << out; + auto file_download_status = download_client.download(file_descriptor); + if (!file_download_status == StatusCode::SUCCESS) { + this->status = TransactionStatus::FILE_DOWNLOAD_ERROR(); + // TODO (tf) handle multiple errors? + return StatusCode::EXECUTING; + } + } + } return StatusCode::EXECUTING; } @@ -315,13 +363,23 @@ auto Transaction::WaitForIt() const noexcept -> TransactionStatus { switch (retrieve_response->wrapped_response_case()) { case QueryResponseCase::kEntityResponse: { auto *entity = retrieve_response->release_entity_response(); + auto id = entity->entity().id(); if (!entity->errors().empty()) { this->status = TransactionStatus::TRANSACTION_ERROR( "The request returned with errors."); } + this->result_set = std::make_unique<UniqueResult>(entity); + if (!id.empty() && download_files.count(id) == 1) { + const auto &local_path = download_files.at(id).local_path; + const auto &unique_result_set = + dynamic_cast<const UniqueResult &>(*result_set.get()); + unique_result_set.entity->SetLocalPath(local_path); + } } break; case QueryResponseCase::kSelectResult: { + CAOSDB_LOG_ERROR(logger_name) << "Results of a SELECT query cannot be " + "processed by this client yet."; // TODO(tf) Select queries } break; case QueryResponseCase::kCountResult: { @@ -331,7 +389,7 @@ auto Transaction::WaitForIt() const noexcept -> TransactionStatus { std::make_unique<MultiResultSet>(std::move(entities)); } break; default: - // TODO(tf) Error + CAOSDB_LOG_FATAL(logger_name) << "Received invalid QueryResponseCase."; break; } } break; @@ -363,12 +421,14 @@ auto Transaction::WaitForIt() const noexcept -> TransactionStatus { deletedIdResponse->release_id_response()); } break; default: - // TODO(tf) Error + CAOSDB_LOG_FATAL(logger_name) << "Received invalid WrappedResponseCase."; break; } } else { auto *responses = this->response->mutable_responses(); std::vector<std::unique_ptr<Entity>> entities; + CAOSDB_LOG_DEBUG(logger_name) + << "Number of subresponses: " << responses->size(); for (auto sub_response : *responses) { switch (sub_response.wrapped_response_case()) { case TransactionResponseCase::kRetrieveResponse: @@ -388,10 +448,18 @@ auto Transaction::WaitForIt() const noexcept -> TransactionStatus { sub_response.mutable_update_response()->release_id_response())); break; default: - // TODO(tf) Errors + CAOSDB_LOG_FATAL(logger_name) + << "Received invalid TransactionResponseCase."; break; } } + for (auto &entity : entities) { + auto id = entity->GetId(); + if (!id.empty() && download_files.count(id) == 1) { + const auto &local_path = download_files.at(id).local_path; + entity->SetLocalPath(local_path); + } + } this->result_set = std::make_unique<MultiResultSet>(std::move(entities)); } diff --git a/test/test_transaction.cpp b/test/test_transaction.cpp index 1ed3a07..ff64f3d 100644 --- a/test/test_transaction.cpp +++ b/test/test_transaction.cpp @@ -201,7 +201,7 @@ TEST(test_transaction, test_retrieve_and_download) { auto transaction = connection.CreateTransaction(); EXPECT_EQ(transaction->GetStatus().GetCode(), StatusCode::INITIAL); - transaction->RetrieveAndDownloadFilesById("asdf"); + transaction->RetrieveAndDownloadFilesById("asdf", "local_path"); EXPECT_EQ(transaction->GetStatus().GetCode(), StatusCode::GO_ON); -- GitLab