From cba3d3697555059ab9e17002cf98d9d71a4c5509 Mon Sep 17 00:00:00 2001 From: Timm Fitschen <t.fitschen@indiscale.com> Date: Tue, 10 Aug 2021 09:53:02 +0200 Subject: [PATCH] WIP: files --- include/CMakeLists.txt | 10 + include/caosdb/entity.h | 122 +++++--- include/caosdb/filestreaming/Client.h | 44 +++ .../filestreaming/DownloadRequestHandler.h | 60 ++++ include/caosdb/filestreaming/FileError.h | 24 ++ include/caosdb/filestreaming/FileLock.h | 12 + include/caosdb/filestreaming/FileManager.h | 53 ++++ include/caosdb/filestreaming/FileReader.h | 46 +++ include/caosdb/filestreaming/FileWriter.h | 37 +++ .../caosdb/filestreaming/HandlerInterface.h | 21 ++ include/caosdb/filestreaming/LICENSE | 21 ++ include/caosdb/filestreaming/RequestStatus.h | 19 ++ .../filestreaming/UploadRequestHandler.h | 67 +++++ include/caosdb/status_code.h | 5 + include/caosdb/transaction.h | 46 +-- include/caosdb/transaction_status.h | 8 + include/caosdb/utility.h | 2 + proto | 2 +- src/CMakeLists.txt | 7 + src/caosdb/entity.cpp | 40 +-- src/caosdb/filestreaming/Client.cpp | 84 ++++++ .../filestreaming/DownloadRequestHandler.cpp | 114 ++++++++ src/caosdb/filestreaming/FileManager.cpp | 59 ++++ src/caosdb/filestreaming/FileReader.cpp | 52 ++++ src/caosdb/filestreaming/FileWriter.cpp | 37 +++ src/caosdb/filestreaming/LICENSE | 21 ++ src/caosdb/filestreaming/RequestStatus.cpp | 19 ++ .../filestreaming/UploadRequestHandler.cpp | 141 ++++++++++ src/caosdb/transaction.cpp | 263 ++++++++++-------- test/caosdb_test_utility.h.in | 1 + test/test_data/test_caosdb_client.json | 6 - test/test_entity.cpp | 32 ++- test/test_transaction.cpp | 63 ++++- 33 files changed, 1317 insertions(+), 221 deletions(-) create mode 100644 include/caosdb/filestreaming/Client.h create mode 100644 include/caosdb/filestreaming/DownloadRequestHandler.h create mode 100644 include/caosdb/filestreaming/FileError.h create mode 100644 include/caosdb/filestreaming/FileLock.h create mode 100644 include/caosdb/filestreaming/FileManager.h create mode 100644 include/caosdb/filestreaming/FileReader.h create mode 100644 include/caosdb/filestreaming/FileWriter.h create mode 100644 include/caosdb/filestreaming/HandlerInterface.h create mode 100644 include/caosdb/filestreaming/LICENSE create mode 100644 include/caosdb/filestreaming/RequestStatus.h create mode 100644 include/caosdb/filestreaming/UploadRequestHandler.h create mode 100644 src/caosdb/filestreaming/Client.cpp create mode 100644 src/caosdb/filestreaming/DownloadRequestHandler.cpp create mode 100644 src/caosdb/filestreaming/FileManager.cpp create mode 100644 src/caosdb/filestreaming/FileReader.cpp create mode 100644 src/caosdb/filestreaming/FileWriter.cpp create mode 100644 src/caosdb/filestreaming/LICENSE create mode 100644 src/caosdb/filestreaming/RequestStatus.cpp create mode 100644 src/caosdb/filestreaming/UploadRequestHandler.cpp diff --git a/include/CMakeLists.txt b/include/CMakeLists.txt index 542ad97..a37100e 100644 --- a/include/CMakeLists.txt +++ b/include/CMakeLists.txt @@ -36,6 +36,16 @@ set(libcaosdb_INCL ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/transaction.h ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/transaction_status.h ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/utility.h + ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/filestreaming/Client.h + ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/filestreaming/UploadRequestHandler.h + ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/filestreaming/DownloadRequestHandler.h + ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/filestreaming/HandlerInterface.h + ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/filestreaming/RequestStatus.h + ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/filestreaming/FileWriter.h + ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/filestreaming/FileReader.h + ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/filestreaming/FileLock.h + ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/filestreaming/FileError.h + ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/filestreaming/FileManager.h ) # pass variable to parent scope diff --git a/include/caosdb/entity.h b/include/caosdb/entity.h index 67dd71a..e612326 100644 --- a/include/caosdb/entity.h +++ b/include/caosdb/entity.h @@ -29,31 +29,35 @@ #ifndef CAOSDB_ENTITY_H #define CAOSDB_ENTITY_H -#include <string> // for string -#include "caosdb/entity/v1alpha1/main.pb.h" // for RepeatedPtrField, Message +#include "caosdb/entity/v1alpha1/main.pb.h" // for ProtoEntity, ProtoParent... +#include "caosdb/status_code.h" +#include "google/protobuf/message.h" // for RepeatedPtrField #include "caosdb/message_code.h" // for get_message_code, Messag... #include "google/protobuf/util/json_util.h" // for MessageToJsonString, Jso... -#include "boost/filesystem/path.hpp" // for path +#include <boost/filesystem/path.hpp> // for path +#include <boost/filesystem.hpp> +#include <random> +#include <string> // for string +#include <stdexcept> // for out_of_range namespace caosdb::entity { +using boost::filesystem::exists; +using boost::filesystem::is_directory; using caosdb::entity::v1alpha1::IdResponse; using ProtoParent = caosdb::entity::v1alpha1::Parent; using ProtoProperty = caosdb::entity::v1alpha1::Property; using ProtoEntity = caosdb::entity::v1alpha1::Entity; +using ProtoFileDescriptor = caosdb::entity::v1alpha1::FileDescriptor; +using ProtoMessage = caosdb::entity::v1alpha1::Message; +using caosdb::StatusCode; +using caosdb::entity::v1alpha1::EntityResponse; using caosdb::entity::v1alpha1::FileTransmissionId; +using google::protobuf::RepeatedPtrField; -class FileDescriptor { -public: - auto GetEntityId() const -> const std::string &; - auto GetLocalPath() const -> const boost::filesystem::path &; - auto GetRemotePath() const -> const std::string &; - auto GetSize() const -> long long; - -private: - std::string entity_id; - std::string remote_path; - std::string local_path; - long long size; +struct FileDescriptor { + FileTransmissionId *file_transmission_id; + ProtoFileDescriptor *wrapped; + boost::filesystem::path local_path; }; /** @@ -79,10 +83,9 @@ public: friend class Messages; private: - explicit inline Message(caosdb::entity::v1alpha1::Message *wrapped) - : wrapped(wrapped){}; + explicit inline Message(ProtoMessage *wrapped) : wrapped(wrapped){}; - caosdb::entity::v1alpha1::Message *wrapped; + ProtoMessage *wrapped; }; /** @@ -90,8 +93,16 @@ private: */ class Messages { public: - [[nodiscard]] inline auto Size() const -> int { return wrapped->size(); } + [[nodiscard]] inline auto Size() const -> int { + if (wrapped == nullptr) { + return 0; + } + return wrapped->size(); + } [[nodiscard]] inline auto At(int index) const -> const Message { + if (wrapped == nullptr) { + throw std::out_of_range("Number of messages: 0"); + } return Message(&(wrapped->at(index))); } @@ -103,8 +114,7 @@ public: private: inline Messages() : wrapped(nullptr){}; - ::google::protobuf::RepeatedPtrField<caosdb::entity::v1alpha1::Message> - *wrapped; + RepeatedPtrField<ProtoMessage> *wrapped; }; /** @@ -232,8 +242,7 @@ public: private: inline Parents(){}; explicit inline Parents( - ::google::protobuf::RepeatedPtrField<caosdb::entity::v1alpha1::Parent> - *wrapped) + RepeatedPtrField<caosdb::entity::v1alpha1::Parent> *wrapped) : wrapped(wrapped){}; /** @@ -246,8 +255,7 @@ private: * The collection of parent messages which serves as a backend for this * class. */ - ::google::protobuf::RepeatedPtrField<caosdb::entity::v1alpha1::Parent> - *wrapped; + RepeatedPtrField<caosdb::entity::v1alpha1::Parent> *wrapped; }; /** @@ -369,8 +377,7 @@ public: private: inline Properties(){}; explicit inline Properties( - ::google::protobuf::RepeatedPtrField<caosdb::entity::v1alpha1::Property> - *wrapped) + RepeatedPtrField<caosdb::entity::v1alpha1::Property> *wrapped) : wrapped(wrapped){}; /** @@ -380,8 +387,7 @@ private: */ auto Append(const Property &property) -> void; - ::google::protobuf::RepeatedPtrField<caosdb::entity::v1alpha1::Property> - *wrapped; + RepeatedPtrField<caosdb::entity::v1alpha1::Property> *wrapped; }; /** @@ -394,12 +400,18 @@ public: this->wrapped->CopyFrom(*original.wrapped); }; explicit Entity(IdResponse *idResponse); - explicit inline Entity(ProtoEntity *wrapped) : wrapped(wrapped) { - errors.wrapped = this->wrapped->mutable_errors(); - warnings.wrapped = this->wrapped->mutable_warnings(); - infos.wrapped = this->wrapped->mutable_infos(); + explicit Entity(ProtoEntity *wrapped) : wrapped(wrapped) { properties.wrapped = this->wrapped->mutable_properties(); parents.wrapped = this->wrapped->mutable_parents(); + errors.wrapped = CreateMessagesField(); + warnings.wrapped = CreateMessagesField(); + infos.wrapped = CreateMessagesField(); + }; + explicit inline Entity(EntityResponse *response) + : Entity(response->release_entity()) { + errors.wrapped->Swap(response->mutable_errors()); + warnings.wrapped->Swap(response->mutable_warnings()); + infos.wrapped->Swap(response->mutable_infos()); }; [[nodiscard]] inline auto GetId() const noexcept -> const std::string & { @@ -471,21 +483,59 @@ public: */ auto CopyTo(ProtoEntity *target) -> void; - auto SetFileTransmissionId(const std::string ®istration_id, - const std::string &file_id) -> void; auto SetFilePath(const std::string &path) -> void; inline auto GetFileTransmissionId() const -> const FileTransmissionId & { return *(this->file_transmission_id); } inline auto HasFile() const -> bool { - return this->file_transmission_id != nullptr; + return !this->file_descriptor.local_path.empty(); + } + auto SetFileTransmissionRegistrationId(const std::string ®istration_id) + -> void; + inline auto SetFileTransmissionId(FileTransmissionId *file_transmission_id) + -> void { + file_transmission_id->set_file_id(GetNextFileId()); + file_descriptor.file_transmission_id = file_transmission_id; + } + + inline auto GetFileDescriptor() -> FileDescriptor & { + return this->file_descriptor; + } + + inline auto SetLocalPath(const boost::filesystem::path &local_path) noexcept + -> StatusCode { + if (GetRole() != "File") { + return StatusCode::NOT_A_FILE_ENTITY; + } + if (!exists(local_path)) { + return StatusCode::FILE_DOES_NOT_EXIST_LOCALLY; + } + if (is_directory(local_path)) { + return StatusCode::PATH_IS_A_DIRECTORY; + } + + this->file_descriptor.local_path = local_path; + return StatusCode::SUCCESS; } private: + inline auto GetNextFileId() -> std::string { + std::string str = "0123456789abcdef"; + std::mt19937 generator(std::random_device{}()); + std::uniform_int_distribution<int> distribution(0, str.size() - 1); + std::string result(10, '\0'); + + for (auto &dis : result) { + dis = str[distribution(generator)]; + } + return result; + } static auto CreateProtoEntity() -> ProtoEntity *; + static auto CreateMessagesField() -> RepeatedPtrField<ProtoMessage> *; auto SetId(const std::string &id) -> void; auto SetVersionId(const std::string &id) -> void; FileTransmissionId *file_transmission_id = nullptr; + FileDescriptor file_descriptor; ProtoEntity *wrapped; Properties properties; Parents parents; diff --git a/include/caosdb/filestreaming/Client.h b/include/caosdb/filestreaming/Client.h new file mode 100644 index 0000000..28507bf --- /dev/null +++ b/include/caosdb/filestreaming/Client.h @@ -0,0 +1,44 @@ +#include "caosdb/filestreaming/HandlerInterface.h" +#include "caosdb/entity/v1alpha1/main.grpc.pb.h" +#include "caosdb/entity.h" +#include "caosdb/status_code.h" + +#include <grpcpp/grpcpp.h> + +#include <iostream> +#include <memory> +#include <string> + +namespace FileExchange { +using caosdb::StatusCode; +using caosdb::entity::FileDescriptor; +using caosdb::entity::v1alpha1::FileTransmissionService; + +class FileExchangeClient final { +public: + FileExchangeClient( + const std::shared_ptr<FileTransmissionService::Stub> &service) + : stub_(service) {} + + ~FileExchangeClient(); + + FileExchangeClient(const FileExchangeClient &) = delete; + FileExchangeClient &operator=(const FileExchangeClient &) = delete; + FileExchangeClient(FileExchangeClient &&) = delete; + FileExchangeClient &operator=(FileExchangeClient &&) = delete; + + StatusCode upload(const FileDescriptor &file_descriptor); + StatusCode download(const FileDescriptor &file_descriptor); + + void cancel(); + +private: + int processMessages(); + + grpc::CompletionQueue cq_; + + std::shared_ptr<FileTransmissionService::Stub> stub_; + std::unique_ptr<HandlerInterface> handler_; +}; + +} // namespace FileExchange diff --git a/include/caosdb/filestreaming/DownloadRequestHandler.h b/include/caosdb/filestreaming/DownloadRequestHandler.h new file mode 100644 index 0000000..dbb0d2e --- /dev/null +++ b/include/caosdb/filestreaming/DownloadRequestHandler.h @@ -0,0 +1,60 @@ +#include "caosdb/filestreaming/HandlerInterface.h" +#include "caosdb/entity/v1alpha1/main.grpc.pb.h" +#include "caosdb/entity.h" +#include "caosdb/filestreaming/FileWriter.h" + +#include <grpcpp/grpcpp.h> + +namespace FileExchange { +using caosdb::entity::FileDescriptor; +using caosdb::entity::v1alpha1::FileDownloadRequest; +using caosdb::entity::v1alpha1::FileDownloadResponse; +using caosdb::entity::v1alpha1::FileTransmissionService; + +class DownloadRequestHandler : public HandlerInterface { +public: + DownloadRequestHandler(HandlerTag tag, FileTransmissionService::Stub *stub, + grpc::CompletionQueue *cq, + const FileDescriptor &file_descriptor); + + ~DownloadRequestHandler() override = default; + + DownloadRequestHandler(const DownloadRequestHandler &) = delete; + DownloadRequestHandler &operator=(const DownloadRequestHandler &) = delete; + DownloadRequestHandler(DownloadRequestHandler &&) = delete; + DownloadRequestHandler &operator=(DownloadRequestHandler &&) = delete; + + bool onNext(bool ok) override; + + void cancel() override; + +protected: + enum class CallState { NewCall, SendingRequest, ReceivingFile, CallComplete }; + + void handleNewCallState(); + void handleSendingRequestState(); + void handleReceivingFileState(); + void handleCallCompleteState(); + + HandlerTag tag_; + + FileTransmissionService::Stub *stub_; + grpc::CompletionQueue *cq_; + grpc::ClientContext ctx_; + + std::unique_ptr<grpc::ClientAsyncReader<FileDownloadResponse>> rpc_; + + FileDownloadRequest *request_; + FileDownloadResponse *response_; + grpc::Status status_; + + CallState state_; + + std::unique_ptr<FileWriter> fileWriter_; + + FileDescriptor file_descriptor_; + + unsigned long long bytesReceived_; +}; + +} // namespace FileExchange diff --git a/include/caosdb/filestreaming/FileError.h b/include/caosdb/filestreaming/FileError.h new file mode 100644 index 0000000..609fd39 --- /dev/null +++ b/include/caosdb/filestreaming/FileError.h @@ -0,0 +1,24 @@ +#pragma once + +#include <stdexcept> +#include <string> + +namespace FileExchange { + +class FileLockError : public std::runtime_error { +public: + FileLockError(const std::string &message) : std::runtime_error(message) {} +}; + +class FileIOError : public std::runtime_error { +public: + FileIOError(const std::string &message) : std::runtime_error(message) {} +}; + +class FileNotManagedError : public std::runtime_error { +public: + FileNotManagedError(const std::string &message) + : std::runtime_error(message) {} +}; + +} // namespace FileExchange diff --git a/include/caosdb/filestreaming/FileLock.h b/include/caosdb/filestreaming/FileLock.h new file mode 100644 index 0000000..5fcf4d2 --- /dev/null +++ b/include/caosdb/filestreaming/FileLock.h @@ -0,0 +1,12 @@ +#pragma once + +#include <mutex> +#include <shared_mutex> + +namespace FileExchange { + +using FileMutex = std::shared_timed_mutex; +using FileReadLock = std::shared_lock<FileMutex>; +using FileWriteLock = std::unique_lock<FileMutex>; + +} // namespace FileExchange diff --git a/include/caosdb/filestreaming/FileManager.h b/include/caosdb/filestreaming/FileManager.h new file mode 100644 index 0000000..2f67e43 --- /dev/null +++ b/include/caosdb/filestreaming/FileManager.h @@ -0,0 +1,53 @@ +#pragma once + +#include "FileLock.h" +#include "FileReader.h" +#include "FileWriter.h" + +#include <memory> +#include <mutex> +#include <string> +#include <unordered_map> + +namespace FileExchange { + +class FileManager final { +public: + FileManager() = default; + + FileManager(const std::string &filename); + + template <class InputIt> FileManager(InputIt first, InputIt last); + + ~FileManager() = default; + + // FileManager is not copyable + FileManager(const FileManager &) = delete; + FileManager &operator=(const FileManager &) = delete; + + // FileManager is movable + FileManager(FileManager &&) = default; + FileManager &operator=(FileManager &&) = default; + + bool isManaged(const std::string &filename) const; + + std::unique_ptr<FileReader> readFile(const std::string &filename) const; + std::unique_ptr<FileWriter> writeFile(const std::string &filename); + +private: + void manageFile(const std::string &filename); + + mutable std::unordered_map<std::string, std::shared_ptr<FileMutex>> + fileMutexes_; + mutable std::mutex mgrMutex_; +}; + +template <class InputIt> FileManager::FileManager(InputIt first, InputIt last) { + while (first != last) { + const auto &filename = *first; + this->manageFile(filename); + ++first; + } +} + +} // namespace FileExchange diff --git a/include/caosdb/filestreaming/FileReader.h b/include/caosdb/filestreaming/FileReader.h new file mode 100644 index 0000000..3d12d42 --- /dev/null +++ b/include/caosdb/filestreaming/FileReader.h @@ -0,0 +1,46 @@ +#pragma once + +#include "caosdb/filestreaming/FileLock.h" + +#include <boost/filesystem.hpp> +#include <boost/filesystem/fstream.hpp> +#include <boost/filesystem/string_file.hpp> +#include <fstream> +#include <memory> +#include <string> + +namespace FileExchange { +using boost::filesystem::exists; +using boost::filesystem::ifstream; +using boost::filesystem::path; + +class FileReader final { +public: + FileReader(const boost::filesystem::path &filename); + FileReader(const boost::filesystem::path &filename, + const std::shared_ptr<FileMutex> &mutexPtr); + + ~FileReader() = default; + + FileReader(const FileReader &) = delete; + FileReader &operator=(const FileReader &) = delete; + + FileReader(FileReader &&) = default; + FileReader &operator=(FileReader &&) = default; + + unsigned long long fileSize() const { return size_; } + + std::size_t read(std::string &buffer); + +private: + void openFile(); + + std::ifstream stream_; + boost::filesystem::path filename_; + unsigned long long size_; + + std::shared_ptr<FileMutex> mutexPtr_; + FileReadLock lock_; +}; + +} // namespace FileExchange diff --git a/include/caosdb/filestreaming/FileWriter.h b/include/caosdb/filestreaming/FileWriter.h new file mode 100644 index 0000000..81a2ae3 --- /dev/null +++ b/include/caosdb/filestreaming/FileWriter.h @@ -0,0 +1,37 @@ +#pragma once + +#include "FileLock.h" + +#include <fstream> +#include <memory> +#include <string> + +namespace FileExchange { + +class FileWriter final { +public: + FileWriter(const std::string &filename); + FileWriter(const std::string &filename, + const std::shared_ptr<FileMutex> &mutexPtr); + + ~FileWriter() = default; + + FileWriter(const FileWriter &) = delete; + FileWriter &operator=(const FileWriter &) = delete; + + FileWriter(FileWriter &&) = default; + FileWriter &operator=(FileWriter &&) = default; + + void write(const std::string &buffer); + +private: + void openFile(); + + std::ofstream stream_; + std::string filename_; + + std::shared_ptr<FileMutex> mutexPtr_; + FileWriteLock lock_; +}; + +} // namespace FileExchange diff --git a/include/caosdb/filestreaming/HandlerInterface.h b/include/caosdb/filestreaming/HandlerInterface.h new file mode 100644 index 0000000..e1ab99d --- /dev/null +++ b/include/caosdb/filestreaming/HandlerInterface.h @@ -0,0 +1,21 @@ +#pragma once + +#include <memory> +#include <string> + +namespace FileExchange { +const std::string logger_name("caosdb::transaction::file_transmission"); + +class HandlerInterface { +public: + virtual ~HandlerInterface() = default; + + virtual bool onNext(bool ok) = 0; + + virtual void cancel() = 0; +}; + +using HandlerPtr = std::unique_ptr<HandlerInterface>; +using HandlerTag = HandlerPtr *; + +} // namespace FileExchange diff --git a/include/caosdb/filestreaming/LICENSE b/include/caosdb/filestreaming/LICENSE new file mode 100644 index 0000000..83d0cd5 --- /dev/null +++ b/include/caosdb/filestreaming/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2019 NeiRo21 + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/include/caosdb/filestreaming/RequestStatus.h b/include/caosdb/filestreaming/RequestStatus.h new file mode 100644 index 0000000..dad810d --- /dev/null +++ b/include/caosdb/filestreaming/RequestStatus.h @@ -0,0 +1,19 @@ +#pragma once + +#include <grpcpp/grpcpp.h> + +namespace FileExchange { + +namespace RequestStatus { + +extern const grpc::Status FileHeaderExpected; +extern const grpc::Status FileNameEmpty; +extern const grpc::Status FileChunkExpected; +extern const grpc::Status FileNotFound; +extern const grpc::Status FileLocked; +extern const grpc::Status FileIOError; +extern const grpc::Status UnknownError; + +} // namespace RequestStatus + +} // namespace FileExchange diff --git a/include/caosdb/filestreaming/UploadRequestHandler.h b/include/caosdb/filestreaming/UploadRequestHandler.h new file mode 100644 index 0000000..4c35f59 --- /dev/null +++ b/include/caosdb/filestreaming/UploadRequestHandler.h @@ -0,0 +1,67 @@ +#include "caosdb/filestreaming/HandlerInterface.h" +#include "caosdb/entity.h" +#include "caosdb/entity/v1alpha1/main.grpc.pb.h" +#include "caosdb/filestreaming/FileManager.h" + +#include <grpcpp/grpcpp.h> + +namespace FileExchange { +using caosdb::entity::FileDescriptor; +using caosdb::entity::v1alpha1::FileTransmissionService; +using caosdb::entity::v1alpha1::FileUploadRequest; +using caosdb::entity::v1alpha1::FileUploadResponse; + +class UploadRequestHandler : public HandlerInterface { +public: + UploadRequestHandler(HandlerTag tag, FileTransmissionService::Stub *stub, + grpc::CompletionQueue *cq, + const FileDescriptor &file_descriptor); + + ~UploadRequestHandler() override = default; + + UploadRequestHandler(const UploadRequestHandler &) = delete; + UploadRequestHandler &operator=(const UploadRequestHandler &) = delete; + UploadRequestHandler(UploadRequestHandler &&) = delete; + UploadRequestHandler &operator=(UploadRequestHandler &&) = delete; + + bool onNext(bool ok) override; + + void cancel() override; + +protected: + enum class CallState { + NewCall, + SendingHeader, + SendingFile, + ExpectingResponse, + CallComplete + }; + + void handleNewCallState(); + void handleSendingHeaderState(); + void handleSendingFileState(); + void handleExpectingResponseState(); + void handleCallCompleteState(); + + HandlerTag tag_; + + FileTransmissionService::Stub *stub_; + grpc::CompletionQueue *cq_; + grpc::ClientContext ctx_; + + std::unique_ptr<grpc::ClientAsyncWriter<FileUploadRequest>> rpc_; + + FileUploadRequest *request_; + FileUploadResponse *response_; + grpc::Status status_; + + CallState state_; + + std::unique_ptr<FileReader> fileReader_; + + FileDescriptor file_descriptor_; + + unsigned long long bytesToSend_; +}; + +} // namespace FileExchange diff --git a/include/caosdb/status_code.h b/include/caosdb/status_code.h index a6fcda9..fe42d7e 100644 --- a/include/caosdb/status_code.h +++ b/include/caosdb/status_code.h @@ -52,6 +52,11 @@ enum StatusCode { TRANSACTION_TYPE_ERROR = 26, UNSUPPORTED_FEATURE = 27, ORIGINAL_ENTITY_MISSING_ID = 28, + NOT_A_FILE_ENTITY = 29, + PATH_IS_A_DIRECTORY = 30, + FILE_DOES_NOT_EXIST_LOCALLY = 31, + FILE_UPLOAD_ERROR = 32, + FILE_DOWNLOAD_ERROR = 33, }; auto get_status_description(int code) -> const std::string &; diff --git a/include/caosdb/transaction.h b/include/caosdb/transaction.h index a6d2be0..1973f44 100644 --- a/include/caosdb/transaction.h +++ b/include/caosdb/transaction.h @@ -160,17 +160,18 @@ namespace caosdb::transaction { using caosdb::entity::Entity; using caosdb::entity::FileDescriptor; -using ProtoEntity = caosdb::entity::v1alpha1::Entity; +using caosdb::entity::v1alpha1::EntityResponse; using caosdb::entity::v1alpha1::EntityTransactionService; +using caosdb::entity::v1alpha1::FileDownloadRequest; using caosdb::entity::v1alpha1::FileDownloadResponse; +using caosdb::entity::v1alpha1::FileTransmissionId; using caosdb::entity::v1alpha1::FileTransmissionService; using caosdb::entity::v1alpha1::FileUploadRequest; using caosdb::entity::v1alpha1::FileUploadResponse; using caosdb::entity::v1alpha1::IdResponse; using caosdb::entity::v1alpha1::MultiTransactionRequest; using caosdb::entity::v1alpha1::MultiTransactionResponse; -using caosdb::entity::v1alpha1::RegisterFileDownloadRequest; -using caosdb::entity::v1alpha1::RegisterFileDownloadResponse; +using caosdb::entity::v1alpha1::RegisterFileUploadRequest; using caosdb::entity::v1alpha1::RegisterFileUploadResponse; using caosdb::transaction::TransactionStatus; using WrappedResponseCase = @@ -242,10 +243,11 @@ template <class T> auto ResultSet<T>::end() const -> ResultSet<T>::iterator { return ResultSet<T>::iterator(this, Size()); } -template <class T> class IMultiResultSet : public ResultSet<T> { +template <class T> class AbstractMultiResultSet : public ResultSet<T> { public: - virtual ~IMultiResultSet() = default; - inline explicit IMultiResultSet(std::vector<std::unique_ptr<T>> result_set) + virtual ~AbstractMultiResultSet() = default; + inline explicit AbstractMultiResultSet( + std::vector<std::unique_ptr<T>> result_set) : items(std::move(result_set)) {} [[nodiscard]] inline auto Size() const noexcept -> int override { return this->items.size(); @@ -258,7 +260,10 @@ protected: std::vector<std::unique_ptr<T>> items; }; -class FilesResultSet : public IMultiResultSet<FileDescriptor> { +/** + * Container of files which have been downloaded during a transaction. + */ +class FilesResultSet : public AbstractMultiResultSet<FileDescriptor> { public: ~FilesResultSet() = default; explicit FilesResultSet( @@ -271,7 +276,7 @@ public: * In contrast to UniqueResult, this one can also hold multiple entities or zero * entities. */ -class MultiResultSet : public IMultiResultSet<Entity> { +class MultiResultSet : public AbstractMultiResultSet<Entity> { public: ~MultiResultSet() = default; explicit MultiResultSet(std::vector<std::unique_ptr<Entity>> result_set); @@ -286,8 +291,8 @@ public: class UniqueResult : public ResultSet<Entity> { public: ~UniqueResult() = default; - explicit inline UniqueResult(ProtoEntity *protoEntity) - : entity(new Entity(protoEntity)){}; + explicit inline UniqueResult(EntityResponse *entityResponse) + : entity(new Entity(entityResponse)){}; explicit inline UniqueResult(IdResponse *idResponse) : entity(new Entity(idResponse)){}; [[nodiscard]] auto GetEntity() const -> const Entity &; @@ -309,14 +314,6 @@ private: */ class Transaction { public: - auto UploadFile(FileUploadResponse *response, - const std::string ®istration_id) -> void; - auto RegisterUploadFile(RegisterFileUploadResponse *response) -> void; - auto DownloadFile(FileDownloadResponse *response, - const RegisterFileDownloadResponse ®istration_response) - -> void; - auto RegisterDownloadFile(const RegisterFileDownloadRequest &request, - RegisterFileDownloadResponse *response) -> void; /** * The transaction type restricts the kind of sub-transaction which may be * added to a transaction (insertion, update, deletion, retrieval). @@ -347,10 +344,6 @@ public: */ auto RetrieveAndDownloadFilesById(const std::string &id) noexcept -> StatusCode; - auto DownloadFilesById(const std::string &path) noexcept -> StatusCode; - auto RetrieveAndDownloadFilesByQuery(const std::string &query) noexcept - -> StatusCode; - auto DownloadFilesByQuery(const std::string &query) noexcept -> StatusCode; /** * Return a ResultSet<FileDescriptor>. @@ -493,7 +486,16 @@ public: return out; } + std::vector<FileDescriptor> upload_files; + std::vector<FileDescriptor> download_files; + private: + auto RegisterUploadFile(RegisterFileUploadResponse *response) -> void; + auto UploadFile(FileUploadResponse *response, + const FileDescriptor &file_descriptor, + const std::string ®istration_id) -> void; + auto DownloadFile(FileDownloadResponse *response, + const FileTransmissionId &file_transmission_id) -> void; bool has_query = false; TransactionType transaction_type = TransactionType::NONE; mutable std::unique_ptr<ResultSet<Entity>> result_set; diff --git a/include/caosdb/transaction_status.h b/include/caosdb/transaction_status.h index 6f2f5ce..e3bdf0b 100644 --- a/include/caosdb/transaction_status.h +++ b/include/caosdb/transaction_status.h @@ -119,6 +119,14 @@ public: caosdb::get_status_description(StatusCode::AUTHENTICATION_ERROR) + " Original error: " + details); } + /** + * Factory for a FILE_UPLOAD_ERROR status. + * + * This status means that the transaction failed during the upload of the + * file blobs of file entities. + */ + CAOSDB_TRANSACTION_STATUS_DEFAULT_FACTORY(FILE_UPLOAD_ERROR, + StatusCode::FILE_UPLOAD_ERROR); /** * Factory for a TRANSACTION_ERROR status. * diff --git a/include/caosdb/utility.h b/include/caosdb/utility.h index 30e2174..dd53ff9 100644 --- a/include/caosdb/utility.h +++ b/include/caosdb/utility.h @@ -33,6 +33,8 @@ #include <memory> #include <string> #include <string_view> +#include <mutex> +#include <shared_mutex> namespace caosdb::utility { using boost::filesystem::exists; diff --git a/proto b/proto index d78d4c7..ab54941 160000 --- a/proto +++ b/proto @@ -1 +1 @@ -Subproject commit d78d4c76f5cd08dd418e1ab42183d1172cb0b383 +Subproject commit ab5494165946c032325a2d37dec1d563ffc8a959 diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 95b2dab..7d35fad 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -28,6 +28,13 @@ set(libcaosdb_SRC ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/configuration.cpp ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/protobuf_helper.cpp ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/transaction.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/filestreaming/Client.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/filestreaming/UploadRequestHandler.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/filestreaming/DownloadRequestHandler.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/filestreaming/RequestStatus.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/filestreaming/FileWriter.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/filestreaming/FileReader.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/filestreaming/FileManager.cpp ) # pass variable to parent scope diff --git a/src/caosdb/entity.cpp b/src/caosdb/entity.cpp index fbd2caa..23984f2 100644 --- a/src/caosdb/entity.cpp +++ b/src/caosdb/entity.cpp @@ -29,7 +29,10 @@ using caosdb::entity::v1alpha1::IdResponse; using ProtoParent = caosdb::entity::v1alpha1::Parent; using ProtoProperty = caosdb::entity::v1alpha1::Property; using ProtoEntity = caosdb::entity::v1alpha1::Entity; +using ProtoMessage = caosdb::entity::v1alpha1::Message; +using ProtoFileDescriptor = caosdb::entity::v1alpha1::FileDescriptor; using caosdb::utility::get_arena; +using google::protobuf::Arena; Parent::Parent() : wrapped(Parent::CreateProtoParent()) { // TODO(fspreck) Re-enable once we have decided how to attach @@ -40,7 +43,7 @@ Parent::Parent() : wrapped(Parent::CreateProtoParent()) { } auto Parent::CreateProtoParent() -> ProtoParent * { - return google::protobuf::Arena::CreateMessage<ProtoParent>(get_arena()); + return Arena::CreateMessage<ProtoParent>(get_arena()); } auto Parent::SetName(const std::string &name) -> void { @@ -71,7 +74,7 @@ auto Parents::Append(const Parent &parent) -> void { Property::Property() : wrapped(Property::CreateProtoProperty()) {} auto Property::CreateProtoProperty() -> ProtoProperty * { - return google::protobuf::Arena::CreateMessage<ProtoProperty>(get_arena()); + return Arena::CreateMessage<ProtoProperty>(get_arena()); } [[nodiscard]] auto Property::GetId() const -> const std::string & { @@ -152,23 +155,20 @@ auto Entity::AppendProperty(const Property &property) -> void { } auto Entity::CreateProtoEntity() -> ProtoEntity * { - return google::protobuf::Arena::CreateMessage<ProtoEntity>(get_arena()); + return Arena::CreateMessage<ProtoEntity>(get_arena()); } -Entity::Entity() : wrapped(Entity::CreateProtoEntity()) { - properties.wrapped = this->wrapped->mutable_properties(); - parents.wrapped = this->wrapped->mutable_parents(); - errors.wrapped = this->wrapped->mutable_errors(); - warnings.wrapped = this->wrapped->mutable_warnings(); - infos.wrapped = this->wrapped->mutable_infos(); +auto Entity::CreateMessagesField() -> RepeatedPtrField<ProtoMessage> * { + return Arena::CreateMessage<RepeatedPtrField<ProtoMessage>>(get_arena()); } +Entity::Entity() : Entity(Entity::CreateProtoEntity()) {} + Entity::Entity(IdResponse *idResponse) : Entity() { this->wrapped->set_id(idResponse->id()); - this->wrapped->mutable_errors()->Swap(idResponse->mutable_entity_errors()); - this->wrapped->mutable_warnings()->Swap( - idResponse->mutable_entity_warnings()); - this->wrapped->mutable_infos()->Swap(idResponse->mutable_entity_infos()); + this->errors.wrapped->Swap(idResponse->mutable_errors()); + this->warnings.wrapped->Swap(idResponse->mutable_warnings()); + this->infos.wrapped->Swap(idResponse->mutable_infos()); } auto Entity::SetId(const std::string &id) -> void { this->wrapped->set_id(id); } @@ -205,13 +205,13 @@ auto Entity::SetFilePath(const std::string &path) -> void { this->wrapped->mutable_file_descriptor()->set_path(path); } -auto Entity::SetFileTransmissionId(const std::string ®istration_id, - const std::string &file_id) -> void { - this->file_transmission_id = - google::protobuf::Arena::CreateMessage<FileTransmissionId>(get_arena()); +// auto Entity::SetFileTransmissionId(const std::string ®istration_id, +// const std::string &file_id) -> void { +// this->file_transmission_id = +// Arena::CreateMessage<FileTransmissionId>(get_arena()); - this->file_transmission_id->set_registration_id(registration_id); - this->file_transmission_id->set_file_id(file_id); -} +// this->file_transmission_id->set_registration_id(registration_id); +// this->file_transmission_id->set_file_id(file_id); +//} } // namespace caosdb::entity diff --git a/src/caosdb/filestreaming/Client.cpp b/src/caosdb/filestreaming/Client.cpp new file mode 100644 index 0000000..4bfc55f --- /dev/null +++ b/src/caosdb/filestreaming/Client.cpp @@ -0,0 +1,84 @@ +#include "caosdb/filestreaming/Client.h" +#include "caosdb/logging.h" +#include "caosdb/status_code.h" +#include "caosdb/filestreaming/DownloadRequestHandler.h" +#include "caosdb/filestreaming/UploadRequestHandler.h" + +namespace FileExchange { +using caosdb::StatusCode; + +FileExchangeClient::~FileExchangeClient() { + this->cancel(); + + cq_.Shutdown(); + + // drain the queue + void *ignoredTag = nullptr; + bool ok = false; + while (cq_.Next(&ignoredTag, &ok)) + ; +} + +StatusCode FileExchangeClient::upload(const FileDescriptor &file_descriptor) { + handler_ = std::make_unique<UploadRequestHandler>(&handler_, stub_.get(), + &cq_, file_descriptor); + + int status = this->processMessages(); + if (status > 0) { + return StatusCode::FILE_UPLOAD_ERROR; + } + return StatusCode::SUCCESS; +} + +StatusCode FileExchangeClient::download(const FileDescriptor &file_descriptor) { + handler_ = std::make_unique<DownloadRequestHandler>(&handler_, stub_.get(), + &cq_, file_descriptor); + + int status = this->processMessages(); + if (status > 0) { + return StatusCode::FILE_DOWNLOAD_ERROR; + } + return StatusCode::SUCCESS; +} + +void FileExchangeClient::cancel() { + if (handler_) { + handler_->cancel(); + } +} + +int FileExchangeClient::processMessages() { + try { + void *tag = nullptr; + bool ok = false; + while (true) { + if (cq_.Next(&tag, &ok)) { + if (tag) { + // TODO assert + auto res = handler_->onNext(ok); + if (!res) { + // TODO comment + handler_.reset(); + break; + } + } else { + CAOSDB_LOG_ERROR(logger_name) + << "Invalid tag delivered by notification queue."; + } + } else { + CAOSDB_LOG_ERROR(logger_name) + << "Notification queue has been shut down unexpectedly."; + return 1; + } + } + } catch (std::exception &e) { + CAOSDB_LOG_ERROR(logger_name) << "Caught exception: " << e.what(); + return 1; + } catch (...) { + CAOSDB_LOG_ERROR(logger_name) << "Caught unknown exception."; + return 1; + } + return 0; +} + +} // namespace FileExchange diff --git a/src/caosdb/filestreaming/DownloadRequestHandler.cpp b/src/caosdb/filestreaming/DownloadRequestHandler.cpp new file mode 100644 index 0000000..a485e4c --- /dev/null +++ b/src/caosdb/filestreaming/DownloadRequestHandler.cpp @@ -0,0 +1,114 @@ +#include "caosdb/filestreaming/DownloadRequestHandler.h" +#include "caosdb/protobuf_helper.h" + +#include <grpc/support/log.h> + +#include <iostream> + +namespace FileExchange { +using caosdb::utility::get_arena; +using google::protobuf::Arena; + +DownloadRequestHandler::DownloadRequestHandler( + HandlerTag tag, FileTransmissionService::Stub *stub, + grpc::CompletionQueue *cq, const FileDescriptor &file_descriptor) + : tag_(tag), stub_(stub), cq_(cq), state_(CallState::NewCall), + file_descriptor_(file_descriptor), bytesReceived_(0), + request_(Arena::CreateMessage<FileDownloadRequest>(get_arena())), + response_(Arena::CreateMessage<FileDownloadResponse>(get_arena())) { + this->onNext(true); +} + +bool DownloadRequestHandler::onNext(bool ok) { + try { + if (ok) { + if (state_ == CallState::NewCall) { + this->handleNewCallState(); + } else if (state_ == CallState::SendingRequest) { + this->handleSendingRequestState(); + } else if (state_ == CallState::ReceivingFile) { + this->handleReceivingFileState(); + } else if (state_ == CallState::CallComplete) { + this->handleCallCompleteState(); + return false; // TODO comment + } + } else { + state_ = CallState::CallComplete; + rpc_->Finish(&status_, tag_); + } + + return true; + } catch (std::exception &e) { + gpr_log(GPR_ERROR, "Download processing error: %s", e.what()); + } catch (...) { + gpr_log(GPR_ERROR, "Download processing error: unknown exception caught"); + } + + if (state_ == CallState::NewCall) { + // TODO comment + return false; + } + + ctx_.TryCancel(); + + return true; +} + +void DownloadRequestHandler::cancel() { ctx_.TryCancel(); } + +void DownloadRequestHandler::handleNewCallState() { + const std::size_t ServerDefaultChunkSize = 0; + + request_->mutable_file_transmission_id()->CopyFrom( + *(file_descriptor_.file_transmission_id)); + + rpc_ = stub_->PrepareAsyncFileDownload(&ctx_, *request_, cq_); + + state_ = CallState::SendingRequest; + rpc_->StartCall(tag_); +} + +void DownloadRequestHandler::handleSendingRequestState() { + state_ = CallState::ReceivingFile; + rpc_->Read(response_, tag_); +} + +void DownloadRequestHandler::handleReceivingFileState() { + if (response_->has_chunk()) { + auto &chunkData = response_->chunk().data(); + if (chunkData.empty()) { + gpr_log(GPR_INFO, "Received an empty FileChunk, ignoring"); + } else { + fileWriter_->write(chunkData); + bytesReceived_ += chunkData.size(); + } + + state_ = CallState::ReceivingFile; + response_->Clear(); + rpc_->Read(response_, tag_); + } else { + throw std::runtime_error("File chunk expected"); + } +} + +void DownloadRequestHandler::handleCallCompleteState() { + switch (status_.error_code()) { + case grpc::OK: + std::cout << "[" << file_descriptor_.local_path + << "]: download complete: " << bytesReceived_ << " bytes received" + << std::endl; + break; + + case grpc::CANCELLED: + std::cout << "[" << file_descriptor_.local_path << "]: download cancelled" + << std::endl; + break; + + default: + std::cout << "[" << file_descriptor_.local_path + << "]: download failed: " << status_.error_message() << std::endl; + break; + } +} + +} // namespace FileExchange diff --git a/src/caosdb/filestreaming/FileManager.cpp b/src/caosdb/filestreaming/FileManager.cpp new file mode 100644 index 0000000..47a013a --- /dev/null +++ b/src/caosdb/filestreaming/FileManager.cpp @@ -0,0 +1,59 @@ +#include "caosdb/filestreaming/FileManager.h" +#include "caosdb/filestreaming/FileError.h" + +using namespace FileExchange; + +using FileMutexSP = std::shared_ptr<FileMutex>; + +FileManager::FileManager(const std::string &filename) { + this->manageFile(filename); +} + +bool FileManager::isManaged(const std::string &filename) const { + std::lock_guard<std::mutex> lock(mgrMutex_); + + return fileMutexes_.count(filename) > 0; +} + +std::unique_ptr<FileReader> +FileManager::readFile(const std::string &filename) const { + FileMutexSP *mutexPtr = nullptr; + + { + std::lock_guard<std::mutex> lock(mgrMutex_); + + auto it = fileMutexes_.find(filename); + if (it == fileMutexes_.end()) { + std::string message = "File is not managed by the server: " + filename; + throw FileNotManagedError(message); + } else { + mutexPtr = &it->second; + } + } + + return std::make_unique<FileReader>(filename, *mutexPtr); +} + +std::unique_ptr<FileWriter> +FileManager::writeFile(const std::string &filename) { + FileMutexSP *mutexPtr = nullptr; + + { + std::lock_guard<std::mutex> lock(mgrMutex_); + + auto it = fileMutexes_.find(filename); + if (it == fileMutexes_.end()) { + it = fileMutexes_.emplace(filename, std::make_shared<FileMutex>()).first; + } + + mutexPtr = &it->second; + } + + return std::make_unique<FileWriter>(filename, *mutexPtr); +} + +void FileManager::manageFile(const std::string &filename) { + if (fileMutexes_.count(filename) == 0) { + fileMutexes_.emplace(filename, std::make_shared<FileMutex>()); + } +} diff --git a/src/caosdb/filestreaming/FileReader.cpp b/src/caosdb/filestreaming/FileReader.cpp new file mode 100644 index 0000000..33df9f4 --- /dev/null +++ b/src/caosdb/filestreaming/FileReader.cpp @@ -0,0 +1,52 @@ +#include "caosdb/filestreaming/FileReader.h" +#include "caosdb/filestreaming/FileError.h" + +using namespace FileExchange; + +FileReader::FileReader(const boost::filesystem::path &filename) + : filename_(filename), size_(0) { + this->openFile(); +} + +FileReader::FileReader(const boost::filesystem::path &filename, + const std::shared_ptr<FileMutex> &mutexPtr) + : filename_(filename), size_(0), mutexPtr_(mutexPtr) { + this->openFile(); +} + +void FileReader::openFile() { + if (mutexPtr_) { + lock_ = FileReadLock(*mutexPtr_, std::try_to_lock); + if (!lock_) { + throw FileLockError("Can't lock file for reading: " + filename_.string()); + } + } + + stream_.open(filename_, std::ios::binary | std::ios::ate); + if (!stream_) { + throw FileIOError("Can't open file for reading: " + filename_.string()); + } + + auto size = stream_.tellg(); + stream_.seekg(0); + if (size > 0) { + size_ = static_cast<decltype(size_)>(size); + } +} + +std::size_t FileReader::read(std::string &buffer) { + std::size_t bytesRead = 0; + + if (!stream_.eof()) { + auto bufferSize = buffer.size(); + if (bufferSize > 0) { + if (!stream_.read(&buffer[0], bufferSize)) { + throw FileIOError("Can't read file: " + filename_.string()); + } + + bytesRead = static_cast<std::size_t>(stream_.gcount()); + } + } + + return bytesRead; +} diff --git a/src/caosdb/filestreaming/FileWriter.cpp b/src/caosdb/filestreaming/FileWriter.cpp new file mode 100644 index 0000000..9c99331 --- /dev/null +++ b/src/caosdb/filestreaming/FileWriter.cpp @@ -0,0 +1,37 @@ +#include "caosdb/filestreaming/FileWriter.h" +#include "caosdb/filestreaming/FileError.h" + +using namespace FileExchange; + +FileWriter::FileWriter(const std::string &filename) : filename_(filename) { + this->openFile(); +} + +FileWriter::FileWriter(const std::string &filename, + const std::shared_ptr<FileMutex> &mutexPtr) + : filename_(filename), mutexPtr_(mutexPtr) { + this->openFile(); +} + +void FileWriter::openFile() { + if (mutexPtr_) { + lock_ = FileWriteLock(*mutexPtr_, std::try_to_lock); + if (!lock_) { + throw FileLockError("Can't lock file for writing: " + filename_); + } + } + + stream_.open(filename_, std::ios::binary | std::ios::trunc); + if (!stream_) { + throw FileIOError("Can't open file for writing: " + filename_); + } +} + +void FileWriter::write(const std::string &buffer) { + auto bufferSize = buffer.size(); + if (bufferSize > 0) { + if (!stream_.write(buffer.data(), bufferSize)) { + throw FileIOError("Can't write file: " + filename_); + } + } +} diff --git a/src/caosdb/filestreaming/LICENSE b/src/caosdb/filestreaming/LICENSE new file mode 100644 index 0000000..83d0cd5 --- /dev/null +++ b/src/caosdb/filestreaming/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2019 NeiRo21 + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/src/caosdb/filestreaming/RequestStatus.cpp b/src/caosdb/filestreaming/RequestStatus.cpp new file mode 100644 index 0000000..e6f7308 --- /dev/null +++ b/src/caosdb/filestreaming/RequestStatus.cpp @@ -0,0 +1,19 @@ +#include "caosdb/filestreaming/RequestStatus.h" + +namespace FileExchange { + +namespace RequestStatus { + +const grpc::Status FileHeaderExpected(grpc::INVALID_ARGUMENT, + "File header expected"); +const grpc::Status FileNameEmpty(grpc::INVALID_ARGUMENT, "File name is empty"); +const grpc::Status FileChunkExpected(grpc::INVALID_ARGUMENT, + "File chunk expected"); +const grpc::Status FileNotFound(grpc::NOT_FOUND, "File not found"); +const grpc::Status FileLocked(grpc::UNAVAILABLE, "File is locked"); +const grpc::Status FileIOError(grpc::ABORTED, "File IO error"); +const grpc::Status UnknownError(grpc::UNKNOWN, "Unknown error"); + +} // namespace RequestStatus + +} // namespace FileExchange diff --git a/src/caosdb/filestreaming/UploadRequestHandler.cpp b/src/caosdb/filestreaming/UploadRequestHandler.cpp new file mode 100644 index 0000000..30f4ce0 --- /dev/null +++ b/src/caosdb/filestreaming/UploadRequestHandler.cpp @@ -0,0 +1,141 @@ +#include "caosdb/filestreaming/UploadRequestHandler.h" +#include "caosdb/protobuf_helper.h" +#include "caosdb/exceptions.h" +#include "caosdb/status_code.h" +#include "caosdb/logging.h" + +#include <algorithm> +#include <iostream> + +namespace FileExchange { +using caosdb::StatusCode; +using caosdb::exceptions::AuthenticationError; +using caosdb::exceptions::ConnectionError; +using caosdb::exceptions::Exception; +using caosdb::utility::get_arena; +using google::protobuf::Arena; + +UploadRequestHandler::UploadRequestHandler( + HandlerTag tag, FileTransmissionService::Stub *stub, + grpc::CompletionQueue *cq, const FileDescriptor &file_descriptor) + : tag_(tag), stub_(stub), cq_(cq), state_(CallState::NewCall), + file_descriptor_(file_descriptor), bytesToSend_(0), + request_(Arena::CreateMessage<FileUploadRequest>(get_arena())), + response_(Arena::CreateMessage<FileUploadResponse>(get_arena())) { + this->onNext(true); +} + +bool UploadRequestHandler::onNext(bool ok) { + try { + if (state_ == CallState::CallComplete) { + this->handleCallCompleteState(); + return false; // TODO comment + } else if (ok) { + if (state_ == CallState::NewCall) { + this->handleNewCallState(); + } else if (state_ == CallState::SendingHeader) { + this->handleSendingHeaderState(); + } else if (state_ == CallState::SendingFile) { + this->handleSendingFileState(); + } else if (state_ == CallState::ExpectingResponse) { + this->handleExpectingResponseState(); + } + } else { + state_ = CallState::CallComplete; + rpc_->Finish(&status_, tag_); + } + + return true; + } catch (Exception &e) { + throw; + } catch (std::exception &e) { + CAOSDB_LOG_ERROR(logger_name) << "Upload processing error: " << e.what(); + } catch (...) { + CAOSDB_LOG_ERROR(logger_name) + << "Upload processing error: unknown exception caught"; + } + + if (state_ == CallState::NewCall) { + // TODO comment + return false; + } + + ctx_.TryCancel(); + + return true; +} + +void UploadRequestHandler::cancel() { ctx_.TryCancel(); } + +void UploadRequestHandler::handleNewCallState() { + auto filename = file_descriptor_.local_path; + fileReader_ = std::make_unique<FileReader>(filename); + + rpc_ = stub_->PrepareAsyncFileUpload(&ctx_, response_, cq_); + + state_ = CallState::SendingHeader; + rpc_->StartCall(tag_); +} + +void UploadRequestHandler::handleSendingHeaderState() { + auto *tid = request_->mutable_chunk()->mutable_file_transmission_id(); + tid->CopyFrom(*(file_descriptor_.file_transmission_id)); + + bytesToSend_ = fileReader_->fileSize(); + + if (bytesToSend_ > 0) { + state_ = CallState::SendingFile; + } else { + state_ = CallState::ExpectingResponse; + } + + rpc_->Write(*request_, tag_); +} + +void UploadRequestHandler::handleSendingFileState() { + const unsigned long long DefaultChunkSize = 4 * 1024; // 4K + + auto chunkSize = std::min(DefaultChunkSize, bytesToSend_); + + request_->Clear(); + auto buffer = request_->mutable_chunk()->mutable_data(); + buffer->resize(chunkSize); + + fileReader_->read(*buffer); + bytesToSend_ -= chunkSize; + + grpc::WriteOptions writeOptions; + if (bytesToSend_ > 0) { + state_ = CallState::SendingFile; + } else { + state_ = CallState::ExpectingResponse; + writeOptions.set_last_message(); + } + + rpc_->Write(*request_, writeOptions, tag_); +} + +void UploadRequestHandler::handleExpectingResponseState() { + state_ = CallState::CallComplete; + rpc_->Finish(&status_, tag_); +} + +void UploadRequestHandler::handleCallCompleteState() { + switch (status_.error_code()) { + case grpc::OK: { + auto bytesSent = fileReader_ ? fileReader_->fileSize() : 0; + CAOSDB_LOG_INFO(logger_name) + << "[" << file_descriptor_.local_path + << "]: upload complete: " << bytesSent << " bytes sent" << std::endl; + } break; + + case grpc::UNAUTHENTICATED: + throw AuthenticationError(status_.error_message()); + case grpc::UNAVAILABLE: + throw ConnectionError(status_.error_message()); + default: + throw Exception(StatusCode::GENERIC_RPC_ERROR, status_.error_message()); + } +} + +} // namespace FileExchange diff --git a/src/caosdb/transaction.cpp b/src/caosdb/transaction.cpp index e2c5243..6f4155e 100644 --- a/src/caosdb/transaction.cpp +++ b/src/caosdb/transaction.cpp @@ -20,6 +20,7 @@ #include "caosdb/transaction.h" #include "caosdb/entity/v1alpha1/main.grpc.pb.h" // for EntityTransactionS... #include "caosdb/entity/v1alpha1/main.pb.h" // for SingleRetrieveRequest +#include "caosdb/filestreaming/Client.h" #include "caosdb/logging.h" #include "caosdb/protobuf_helper.h" // for get_arena #include "caosdb/status_code.h" // for StatusCode, AUTHEN... @@ -79,6 +80,10 @@ auto get_status_description(int code) -> const std::string & { "have " "an id. This is the case when you did not retrieve it before applying any " "changes and instantiated the Entity class explicitely."}, + {StatusCode::NOT_A_FILE_ENTITY, "You can only add files to file entities."}, + {StatusCode::PATH_IS_A_DIRECTORY, "The given path is a directory."}, + {StatusCode::FILE_DOES_NOT_EXIST_LOCALLY, + "The file does not not exist in the local file system."}, {StatusCode::UNSUPPORTED_FEATURE, "This feature is not available in the this client implementation."}}; try { @@ -92,28 +97,22 @@ auto get_status_description(int code) -> const std::string & { namespace caosdb::transaction { using caosdb::entity::v1alpha1::EntityTransactionService; -using caosdb::entity::v1alpha1::FileDownloadRequest; -using caosdb::entity::v1alpha1::FileDownloadResponse; using caosdb::entity::v1alpha1::FileTransmissionService; -using caosdb::entity::v1alpha1::FileUploadRequest; -using caosdb::entity::v1alpha1::FileUploadResponse; using caosdb::entity::v1alpha1::MultiTransactionRequest; using caosdb::entity::v1alpha1::MultiTransactionResponse; -using caosdb::entity::v1alpha1::RegisterFileDownloadRequest; -using caosdb::entity::v1alpha1::RegisterFileDownloadResponse; -using caosdb::entity::v1alpha1::RegisterFileUploadRequest; -using caosdb::entity::v1alpha1::RegisterFileUploadResponse; -using WrappedResponseCase = +using TransactionResponseCase = caosdb::entity::v1alpha1::TransactionResponse::WrappedResponseCase; using QueryResponseCase = - caosdb::entity::v1alpha1::RetrieveResponse::QueryResponseCase; + caosdb::entity::v1alpha1::RetrieveResponse::WrappedResponseCase; using caosdb::utility::get_arena; using grpc::ClientAsyncResponseReader; using ProtoEntity = caosdb::entity::v1alpha1::Entity; +using FileExchange::FileExchangeClient; +using google::protobuf::Arena; using grpc::CompletionQueue; MultiResultSet::MultiResultSet(std::vector<std::unique_ptr<Entity>> result_set) - : IMultiResultSet<Entity>(std::move(result_set)) {} + : AbstractMultiResultSet<Entity>(std::move(result_set)) {} [[nodiscard]] auto UniqueResult::GetEntity() const -> const Entity & { const Entity *result = this->entity.get(); @@ -123,10 +122,8 @@ MultiResultSet::MultiResultSet(std::vector<std::unique_ptr<Entity>> result_set) Transaction::Transaction( std::shared_ptr<EntityTransactionService::Stub> entity_service, std::shared_ptr<FileTransmissionService::Stub> file_service) - : request(google::protobuf::Arena::CreateMessage<MultiTransactionRequest>( - get_arena())), - response(google::protobuf::Arena::CreateMessage<MultiTransactionResponse>( - get_arena())) { + : request(Arena::CreateMessage<MultiTransactionRequest>(get_arena())), + response(Arena::CreateMessage<MultiTransactionResponse>(get_arena())) { this->entity_service = std::move(entity_service); this->file_service = std::move(file_service); this->query_count = -1; @@ -142,6 +139,19 @@ auto Transaction::RetrieveById(const std::string &id) noexcept -> StatusCode { return this->status.GetCode(); } +auto Transaction::RetrieveAndDownloadFilesById(const std::string &id) noexcept + -> StatusCode { + ASSERT_CAN_ADD_RETRIEVAL + + auto *retrieve_request = + this->request->add_requests()->mutable_retrieve_request(); + retrieve_request->set_id(id); + retrieve_request->set_register_file_download(true); + + this->status = TransactionStatus::GO_ON(); + return this->status.GetCode(); +} + auto Transaction::Query(const std::string &query) noexcept -> StatusCode { ASSERT_CAN_ADD_QUERY @@ -168,14 +178,17 @@ auto Transaction::DeleteById(const std::string &id) noexcept -> StatusCode { auto Transaction::InsertEntity(Entity *entity) noexcept -> StatusCode { ASSERT_CAN_ADD_INSERTION - auto *sub_request = this->request->add_requests(); - auto *proto_entity = sub_request->mutable_insert_request()->mutable_entity(); + auto *entity_request = this->request->add_requests() + ->mutable_insert_request() + ->mutable_entity_request(); + auto *proto_entity = entity_request->mutable_entity(); // copy the original entity for the transaction entity->CopyTo(proto_entity); if (entity->HasFile()) { - sub_request->mutable_insert_request()->mutable_upload_id()->CopyFrom( - entity->GetFileTransmissionId()); + auto *file_transmission_id = entity_request->mutable_upload_id(); + entity->SetFileTransmissionId(file_transmission_id); + upload_files.push_back(entity->GetFileDescriptor()); } this->status = TransactionStatus::READY(); return this->status.GetCode(); @@ -184,10 +197,17 @@ auto Transaction::InsertEntity(Entity *entity) noexcept -> StatusCode { auto Transaction::UpdateEntity(Entity *entity) noexcept -> StatusCode { ASSERT_CAN_ADD_UPDATE - auto *sub_request = this->request->add_requests(); - auto *proto_entity = sub_request->mutable_update_request()->mutable_entity(); + auto *entity_request = this->request->add_requests() + ->mutable_update_request() + ->mutable_entity_request(); + auto *proto_entity = entity_request->mutable_entity(); entity->CopyTo(proto_entity); + if (entity->HasFile()) { + auto *file_transmission_id = entity_request->mutable_upload_id(); + entity->SetFileTransmissionId(file_transmission_id); + upload_files.push_back(entity->GetFileDescriptor()); + } this->status = TransactionStatus::READY(); return this->status.GetCode(); } @@ -224,6 +244,29 @@ auto Transaction::ExecuteAsynchronously() noexcept -> StatusCode { } this->status = TransactionStatus::EXECUTING(); + // upload files first + if (upload_files.size() > 0) { + auto *registration_response = + Arena::CreateMessage<RegisterFileUploadResponse>(get_arena()); + RegisterUploadFile(registration_response); + + // TODO if(registration_response.status != REGISTRATION_STATUS_ACCEPTED){ + // return StatusCode::FILE_UPLOAD_FAILED + // } + FileExchangeClient upload_client(file_service); + for (auto file_descriptor : upload_files) { + file_descriptor.file_transmission_id->set_registration_id( + registration_response->registration_id()); + CAOSDB_LOG_INFO(logger_name) + << "Uploading " << file_descriptor.local_path; + auto file_upload_status = upload_client.upload(file_descriptor); + if (!file_upload_status == StatusCode::SUCCESS) { + this->status = TransactionStatus::FILE_UPLOAD_ERROR(); + return StatusCode::EXECUTING; + } + } + } + grpc::Status grpc_status; CompletionQueue cq; @@ -269,9 +312,9 @@ auto Transaction::WaitForIt() const noexcept -> TransactionStatus { switch (responses->wrapped_response_case()) { case WrappedResponseCase::kRetrieveResponse: { auto *retrieve_response = responses->mutable_retrieve_response(); - switch (retrieve_response->query_response_case()) { - case QueryResponseCase::kEntity: { - auto *entity = retrieve_response->release_entity(); + switch (retrieve_response->wrapped_response_case()) { + case QueryResponseCase::kEntityResponse: { + auto *entity = retrieve_response->release_entity_response(); if (!entity->errors().empty()) { this->status = TransactionStatus::TRANSACTION_ERROR( "The request returned with errors."); @@ -294,30 +337,33 @@ auto Transaction::WaitForIt() const noexcept -> TransactionStatus { } break; case WrappedResponseCase::kUpdateResponse: { auto *updatedIdResponse = responses->mutable_update_response(); - if (!updatedIdResponse->entity_errors().empty()) { + if (!updatedIdResponse->id_response().errors().empty()) { this->status = TransactionStatus::TRANSACTION_ERROR( "The request returned with errors."); } - this->result_set = std::make_unique<UniqueResult>(updatedIdResponse); + this->result_set = std::make_unique<UniqueResult>( + updatedIdResponse->release_id_response()); } break; case WrappedResponseCase::kInsertResponse: { auto *insertedIdResponse = responses->mutable_insert_response(); - if (!insertedIdResponse->entity_errors().empty()) { + if (!insertedIdResponse->id_response().errors().empty()) { this->status = TransactionStatus::TRANSACTION_ERROR( "The request returned with errors."); } - this->result_set = std::make_unique<UniqueResult>(insertedIdResponse); + this->result_set = std::make_unique<UniqueResult>( + insertedIdResponse->release_id_response()); } break; case WrappedResponseCase::kDeleteResponse: { auto *deletedIdResponse = responses->mutable_delete_response(); - if (!deletedIdResponse->entity_errors().empty()) { + if (!deletedIdResponse->id_response().errors().empty()) { this->status = TransactionStatus::TRANSACTION_ERROR( "The request returned with errors."); } - this->result_set = std::make_unique<UniqueResult>(deletedIdResponse); + this->result_set = std::make_unique<UniqueResult>( + deletedIdResponse->release_id_response()); } break; default: - // TODO(tf) Error and Update + // TODO(tf) Error break; } } else { @@ -325,20 +371,24 @@ auto Transaction::WaitForIt() const noexcept -> TransactionStatus { std::vector<std::unique_ptr<Entity>> entities; for (auto sub_response : *responses) { switch (sub_response.wrapped_response_case()) { - case WrappedResponseCase::kRetrieveResponse: + case TransactionResponseCase::kRetrieveResponse: entities.push_back(std::make_unique<Entity>( - sub_response.mutable_retrieve_response()->release_entity())); + sub_response.mutable_retrieve_response()->release_entity_response())); break; - case WrappedResponseCase::kInsertResponse: - entities.push_back( - std::make_unique<Entity>(sub_response.release_insert_response())); + case TransactionResponseCase::kInsertResponse: + entities.push_back(std::make_unique<Entity>( + sub_response.mutable_insert_response()->release_id_response())); + break; + case TransactionResponseCase::kDeleteResponse: + entities.push_back(std::make_unique<Entity>( + sub_response.mutable_delete_response()->release_id_response())); break; - case WrappedResponseCase::kDeleteResponse: - entities.push_back( - std::make_unique<Entity>(sub_response.release_insert_response())); + case TransactionResponseCase::kUpdateResponse: + entities.push_back(std::make_unique<Entity>( + sub_response.mutable_update_response()->release_id_response())); break; default: - // TODO(tf) Updates + // TODO(tf) Errors break; } } @@ -372,85 +422,58 @@ auto Transaction::RegisterUploadFile(RegisterFileUploadResponse *response) assert(ok); } -auto Transaction::UploadFile(FileUploadResponse *response, - const std::string ®istration_id) -> void { - grpc::Status grpc_status; - CompletionQueue cq; - grpc::ClientContext context; - - FileUploadRequest request; - auto *chunk = request.mutable_chunk(); - auto *file_transmission_id = chunk->mutable_file_transmission_id(); - file_transmission_id->set_registration_id(registration_id); - file_transmission_id->set_file_id("test.txt"); - chunk->set_data("this is some data"); - - std::unique_ptr<ClientAsyncResponseReader<FileUploadResponse>> rpc( - this->file_service->PrepareAsyncFileUpload(&context, request, &cq)); - - rpc->StartCall(); - - int tag = 1; - void *send_tag = static_cast<void *>(&tag); - rpc->Finish(response, &grpc_status, send_tag); - void *recv_tag = nullptr; - bool ok = false; - - cq.Next(&recv_tag, &ok); - assert(recv_tag == send_tag); - assert(ok); -} - -auto Transaction::DownloadFile( - FileDownloadResponse *response, - const RegisterFileDownloadResponse ®istration_response) -> void { - - FileDownloadRequest request; - auto *file_transmission_id = request.mutable_file_transmission_id(); - file_transmission_id->CopyFrom( - registration_response.files(0).file_transmission_id()); - grpc::Status grpc_status; - CompletionQueue cq; - - grpc::ClientContext context; - std::unique_ptr<ClientAsyncResponseReader<FileDownloadResponse>> rpc( - this->file_service->PrepareAsyncFileDownload(&context, request, &cq)); - - rpc->StartCall(); - - int tag = 1; - void *send_tag = static_cast<void *>(&tag); - rpc->Finish(response, &grpc_status, send_tag); - void *recv_tag = nullptr; - bool ok = false; - - cq.Next(&recv_tag, &ok); - assert(recv_tag == send_tag); - assert(ok); -} - -auto Transaction::RegisterDownloadFile( - const RegisterFileDownloadRequest &request, - RegisterFileDownloadResponse *response) -> void { - grpc::Status grpc_status; - CompletionQueue cq; - - grpc::ClientContext context; - std::unique_ptr<ClientAsyncResponseReader<RegisterFileDownloadResponse>> rpc( - this->file_service->PrepareAsyncRegisterFileDownload(&context, request, - &cq)); - - rpc->StartCall(); - - int tag = 1; - void *send_tag = static_cast<void *>(&tag); - rpc->Finish(response, &grpc_status, send_tag); - void *recv_tag = nullptr; - bool ok = false; - - cq.Next(&recv_tag, &ok); - assert(recv_tag == send_tag); - assert(ok); -} +// auto Transaction::UploadFile(FileUploadResponse *response, +// const FileDescriptor &file_descriptor, +// const std::string ®istration_id) -> void { +// grpc::Status grpc_status; +// CompletionQueue cq; +// grpc::ClientContext context; + +// FileUploadRequest request; +// auto *chunk = request.mutable_chunk(); +// auto *file_transmission_id = chunk->mutable_file_transmission_id(); +// file_transmission_id->set_registration_id(registration_id); +// file_transmission_id->set_file_id("test.txt"); +// chunk->set_data("this is some data"); + +// std::unique_ptr<ClientAsyncResponseReader<FileUploadResponse>> rpc( +// this->file_service->PrepareAsyncFileUpload(&context, request, &cq)); + +// rpc->StartCall(); + +// int tag = 1; +// void *send_tag = static_cast<void *>(&tag); +// rpc->Finish(response, &grpc_status, send_tag); +// void *recv_tag = nullptr; +// bool ok = false; + +// cq.Next(&recv_tag, &ok); +// assert(recv_tag == send_tag); +// assert(ok); +//} + +// auto Transaction::DownloadFile(FileDownloadResponse *response, +// const FileTransmissionId &file_transmission_id) +//-> void { + +// FileDownloadRequest request; +// request.mutable_file_transmission_id()->CopyFrom(file_transmission_id); +// grpc::Status grpc_status; +// CompletionQueue cq; + +// grpc::ClientContext context; +// std::unique_ptr<ClientAsyncResponseReader<FileDownloadResponse>> rpc( +// this->file_service->PrepareAsyncFileDownload(&context, request, &cq)); + +// rpc->StartCall(); + +// int tag = 1; +// void *send_tag = static_cast<void *>(&tag); +// rpc->Finish(response, &grpc_status, send_tag); +// void *recv_tag = nullptr; +// bool ok = false; + +// cq.Next(&recv_tag, &ok); +//} } // namespace caosdb::transaction diff --git a/test/caosdb_test_utility.h.in b/test/caosdb_test_utility.h.in index 24d088b..baac638 100644 --- a/test/caosdb_test_utility.h.in +++ b/test/caosdb_test_utility.h.in @@ -22,6 +22,7 @@ #ifndef CAOSDB_TEST_UTILITY_H #define CAOSDB_TEST_UTILITY_H +#include <string> /** * @file caosdb_test_utility.h * @brief Utility for the unit tests diff --git a/test/test_data/test_caosdb_client.json b/test/test_data/test_caosdb_client.json index 2007413..b65d79b 100644 --- a/test/test_data/test_caosdb_client.json +++ b/test/test_data/test_caosdb_client.json @@ -25,12 +25,6 @@ "sinks": { "stderr": { "destination": "console" - }, - "file" : { - "destination": "file" - }, - "syslog": { - "destination": "syslog" } } }, diff --git a/test/test_entity.cpp b/test/test_entity.cpp index 2794e9b..0b81f68 100644 --- a/test/test_entity.cpp +++ b/test/test_entity.cpp @@ -20,6 +20,7 @@ * along with this program. If not, see <https://www.gnu.org/licenses/>. * */ +#include "caosdb_test_utility.h" #include "caosdb/entity.h" // for Entity, Parent, Par... #include "caosdb/entity/v1alpha1/main.grpc.pb.h" // for EntityTransactionSe... #include "caosdb/entity/v1alpha1/main.pb.h" // for IdResponse, Message @@ -230,7 +231,7 @@ TEST(test_entity, test_insert_with_property) { TEST(test_entity, test_from_id_response) { IdResponse idResponse; idResponse.set_id("entity_id"); - auto *error = idResponse.add_entity_errors(); + auto *error = idResponse.add_errors(); error->set_code(MessageCode::ENTITY_DOES_NOT_EXIST); error->set_description("error_desc"); @@ -245,10 +246,10 @@ TEST(test_entity, test_from_id_response) { IdResponse idr_warnings_and_infos; idr_warnings_and_infos.set_id("other_entity_id"); - auto *warning = idr_warnings_and_infos.add_entity_warnings(); + auto *warning = idr_warnings_and_infos.add_warnings(); warning->set_description("warning_desc"); warning->set_code(MessageCode::ENTITY_HAS_NO_PROPERTIES); - auto *info = idr_warnings_and_infos.add_entity_infos(); + auto *info = idr_warnings_and_infos.add_infos(); info->set_description("info_desc"); info->set_code(MessageCode::UNSPECIFIED); @@ -264,4 +265,29 @@ TEST(test_entity, test_from_id_response) { EXPECT_EQ(other_ent.GetInfos().At(0).GetDescription(), "info_desc"); EXPECT_EQ(other_ent.GetInfos().At(0).GetCode(), MessageCode::UNSPECIFIED); } + +TEST(test_entity, test_add_file_to_non_file_entity) { + Entity entity; + EXPECT_EQ(entity.SetLocalPath("local/path"), StatusCode::NOT_A_FILE_ENTITY); +} + +TEST(test_entity, test_add_non_existing_file) { + Entity entity; + entity.SetRole("File"); + EXPECT_EQ(entity.SetLocalPath("non-existing/path"), + StatusCode::FILE_DOES_NOT_EXIST_LOCALLY); +} + +TEST(test_entity, test_add_directory_path) { + Entity entity; + entity.SetRole("File"); + EXPECT_EQ(entity.SetLocalPath("./"), StatusCode::PATH_IS_A_DIRECTORY); +} + +TEST(test_entity, test_add_file) { + Entity entity; + entity.SetRole("File"); + EXPECT_EQ(entity.SetLocalPath(TEST_DATA_DIR + "/test.json"), + StatusCode::SUCCESS); +} } // namespace caosdb::entity diff --git a/test/test_transaction.cpp b/test/test_transaction.cpp index b062cb8..1ed3a07 100644 --- a/test/test_transaction.cpp +++ b/test/test_transaction.cpp @@ -56,9 +56,9 @@ TEST(test_transaction, create_transaction) { } TEST(test_transaction, unique_result) { - auto *entity = new ProtoEntity(); - entity->set_id("test"); - UniqueResult result(entity); + auto *entity_response = new EntityResponse; + entity_response->mutable_entity()->set_id("test"); + UniqueResult result(entity_response); EXPECT_EQ("test", result.GetEntity().GetId()); @@ -102,8 +102,9 @@ TEST(test_transaction, test_multi_result_set_empty) { TEST(test_transaction, test_multi_result_iterator) { std::vector<std::unique_ptr<Entity>> one_elem; RetrieveResponse response; - response.mutable_entity()->set_id("100"); - one_elem.push_back(std::make_unique<Entity>(response.release_entity())); + response.mutable_entity_response()->mutable_entity()->set_id("100"); + one_elem.push_back( + std::make_unique<Entity>(response.release_entity_response())); MultiResultSet rs(std::move(one_elem)); EXPECT_EQ(rs.Size(), 1); @@ -114,8 +115,8 @@ TEST(test_transaction, test_multi_result_iterator) { } TEST(test_transaction, test_unique_result_iterator) { - caosdb::entity::v1alpha1::Entity response; - response.set_id("100"); + caosdb::entity::v1alpha1::EntityResponse response; + response.mutable_entity()->set_id("100"); UniqueResult rs(&response); EXPECT_EQ(rs.Size(), 1); @@ -127,8 +128,9 @@ TEST(test_transaction, test_unique_result_iterator) { TEST(test_transaction, test_multi_result_set_one) { std::vector<std::unique_ptr<Entity>> one_elem; RetrieveResponse response; - response.mutable_entity()->set_id("100"); - one_elem.push_back(std::make_unique<Entity>(response.release_entity())); + response.mutable_entity_response()->mutable_entity()->set_id("100"); + one_elem.push_back( + std::make_unique<Entity>(response.release_entity_response())); MultiResultSet rs(std::move(one_elem)); EXPECT_EQ(rs.Size(), 1); @@ -141,14 +143,17 @@ TEST(test_transaction, test_multi_result_set_three) { MultiTransactionResponse response; response.add_responses() ->mutable_retrieve_response() + ->mutable_entity_response() ->mutable_entity() ->set_id("100"); - auto *entity_with_error = - response.add_responses()->mutable_retrieve_response()->mutable_entity(); - entity_with_error->set_id("101"); + auto *entity_with_error = response.add_responses() + ->mutable_retrieve_response() + ->mutable_entity_response(); + entity_with_error->mutable_entity()->set_id("101"); entity_with_error->add_errors()->set_code(1); response.add_responses() ->mutable_retrieve_response() + ->mutable_entity_response() ->mutable_entity() ->set_id("102"); @@ -156,7 +161,7 @@ TEST(test_transaction, test_multi_result_set_three) { std::vector<std::unique_ptr<Entity>> entities; for (auto sub_response : *responses) { three_elem.push_back(std::make_unique<Entity>( - sub_response.mutable_retrieve_response()->release_entity())); + sub_response.mutable_retrieve_response()->release_entity_response())); } MultiResultSet rs(std::move(three_elem)); @@ -189,4 +194,36 @@ TEST(test_transaction, test_multi_deletion) { } } +TEST(test_transaction, test_retrieve_and_download) { + const auto *host = "localhost"; + auto configuration = InsecureConnectionConfiguration(host, 8000); + Connection connection(configuration); + auto transaction = connection.CreateTransaction(); + + EXPECT_EQ(transaction->GetStatus().GetCode(), StatusCode::INITIAL); + transaction->RetrieveAndDownloadFilesById("asdf"); + + EXPECT_EQ(transaction->GetStatus().GetCode(), StatusCode::GO_ON); + + EXPECT_EQ(transaction->ExecuteAsynchronously(), StatusCode::EXECUTING); + EXPECT_EQ(transaction->GetStatus().GetCode(), StatusCode::CONNECTION_ERROR); +} + +TEST(test_transaction, test_insert_with_file) { + const auto *host = "localhost"; + auto configuration = InsecureConnectionConfiguration(host, 8000); + Connection connection(configuration); + auto transaction = connection.CreateTransaction(); + Entity entity; + entity.SetRole("File"); + entity.SetLocalPath(TEST_DATA_DIR + "/test.json"); + + EXPECT_TRUE(transaction->upload_files.empty()); + transaction->InsertEntity(&entity); + EXPECT_EQ(transaction->upload_files.size(), 1); + + transaction->ExecuteAsynchronously(); + EXPECT_EQ(transaction->GetStatus().GetCode(), StatusCode::FILE_UPLOAD_ERROR); +} + } // namespace caosdb::transaction -- GitLab