diff --git a/CMakeLists.txt b/CMakeLists.txt index 64fad3282a1796dda04745378feed7bb5c891754..f9c6718af235dad3d558d571c0e39dc76984ee69 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -20,7 +20,7 @@ cmake_minimum_required(VERSION 3.13) -set(libcaosdb_VERSION 0.0.10) +set(libcaosdb_VERSION 0.0.11) set(libcaosdb_COMPATIBLE_SERVER_VERSION_MAJOR 0) set(libcaosdb_COMPATIBLE_SERVER_VERSION_MINOR 5) set(libcaosdb_COMPATIBLE_SERVER_VERSION_PATCH 0) diff --git a/README_SETUP.md b/README_SETUP.md index bdb8a03e4c58576b35e50b2090b889c120ae23cd..ea0284e2dd0458eee5aa7c9e165a3e6767e57d55 100644 --- a/README_SETUP.md +++ b/README_SETUP.md @@ -48,10 +48,9 @@ as compiler. We use [cmake](https://cmake.org/download/) as build tool. and build the project. (You can open Tools/Command Line/Developer Command Prompt and execute `msbuild libcaosdb.sln /property:Configuration=Release`) -### Creating a local Conan package ## +### Creating a Local Conan Package ## -Building and installing libcaosdb with Conan is just a single command: -`conan create . -s "compiler.libcxx=libstdc++11"` +Building and installing libcaosdb with Conan is just a single command: `make conan-install` For MacOS, you probably should adjust the option as mentioned above. @@ -65,6 +64,10 @@ the `--build=missing` option: `conan install .. [ other options ] --build=missing`. This should download and compile the sources of the dependencies. +#### cmake fails when using the Debug flag +Depending on the clang version it might be necessary to use additionally the following flag: +`-DCMAKE_CXX_FLAGS="-Wno-unused-parameter"` + ## Unit Tests ### Build diff --git a/conanfile.py b/conanfile.py index 0e31f8dfeaae2cd681690324611031bd3017ec67..300f652d76620d2cd6d4c698a0d651941daed5a4 100644 --- a/conanfile.py +++ b/conanfile.py @@ -3,7 +3,7 @@ from conans import ConanFile, CMake, tools class CaosdbConan(ConanFile): name = "caosdb" - version = "0.0.10" + version = "0.0.11" license = "AGPL-3.0-or-later" author = "Timm C. Fitschen <t.fitschen@indiscale.com>" url = "https://gitlab.indiscale.com/caosdb/src/caosdb-cpplib.git" diff --git a/include/CMakeLists.txt b/include/CMakeLists.txt index 542ad975f5241ce7755994b57afcf53e1cdeefc7..af287eba3b9bb6719f9313411ecf8f0ddd42b11f 100644 --- a/include/CMakeLists.txt +++ b/include/CMakeLists.txt @@ -27,6 +27,7 @@ set(libcaosdb_INCL ${CMAKE_CURRENT_BINARY_DIR}/caosdb/constants.h ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/entity.h ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/exceptions.h + ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/handler_interface.h ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/info.h ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/log_level.h ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/logging.h @@ -34,8 +35,16 @@ set(libcaosdb_INCL ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/protobuf_helper.h ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/status_code.h ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/transaction.h + ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/transaction_handler.h ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/transaction_status.h + ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/unary_rpc_handler.h ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/utility.h + ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/file_transmission/register_file_upload_handler.h + ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/file_transmission/upload_request_handler.h + ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/file_transmission/download_request_handler.h + ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/file_transmission/file_writer.h + ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/file_transmission/file_reader.h + ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/file_transmission/file_error.h ) # pass variable to parent scope diff --git a/include/caosdb/connection.h b/include/caosdb/connection.h index 5d7930a0d63e484f01be52dab7749d02b3916c33..ef9597bb06cb5454160eb02938219a8ced564d17 100644 --- a/include/caosdb/connection.h +++ b/include/caosdb/connection.h @@ -45,6 +45,7 @@ using boost::filesystem::path; using caosdb::authentication::Authenticator; using caosdb::configuration::ConnectionConfiguration; using caosdb::entity::v1alpha1::EntityTransactionService; +using caosdb::entity::v1alpha1::FileTransmissionService; using caosdb::info::VersionInfo; using caosdb::info::v1alpha1::GeneralInfoService; using caosdb::transaction::Transaction; @@ -104,6 +105,9 @@ private: /// Service for entity transactions. We use a shared pointer because /// Transaction instances also own this service stub. std::shared_ptr<EntityTransactionService::Stub> entity_transaction_service; + /// Service for file transmission (download and upload). We use a shared + /// pointer because Transaction instances also own this service stub. + std::shared_ptr<FileTransmissionService::Stub> file_transmission_service; }; /** diff --git a/include/caosdb/entity.h b/include/caosdb/entity.h index 73e841c44461f596b98f8b4217c0c097de604122..622ea3c27830cf3f1e12f94f13aa6c878d9e9bd8 100644 --- a/include/caosdb/entity.h +++ b/include/caosdb/entity.h @@ -29,25 +29,49 @@ #ifndef CAOSDB_ENTITY_H #define CAOSDB_ENTITY_H -#include "caosdb/entity/v1alpha1/main.pb.h" // for ProtoEntity, ProtoParent... -#include "caosdb/message_code.h" // for get_message_code, Messag... -#include <google/protobuf/util/json_util.h> // for MessageToJsonString, Jso... -#include <google/protobuf/message.h> // for RepeatedPtrField, Message +#include "caosdb/entity/v1alpha1/main.pb.h" // for RepeatedPtrField +#include "caosdb/logging.h" // for CAOSDB_LOG_WARN +#include "caosdb/message_code.h" // for get_message_code +#include "caosdb/status_code.h" // for StatusCode +#include <boost/filesystem/operations.hpp> // for exists, is_di... +#include <boost/filesystem/path.hpp> // for path +#include <boost/log/core/record.hpp> // for record +#include <boost/log/detail/attachable_sstream_buf.hpp> // for basic_ostring... +#include <boost/log/sources/record_ostream.hpp> // for basic_record_... +#include <boost/preprocessor/seq/limits/enum_256.hpp> // for BOOST_PP_SEQ_... +#include <boost/preprocessor/seq/limits/size_256.hpp> // for BOOST_PP_SEQ_... +#include <google/protobuf/message.h> // for RepeatedPtrField +#include <google/protobuf/util/json_util.h> // for MessageToJson... +#include <iosfwd> // for streamsize #include <iterator> // for iterator, output_iterato... #include <map> // for map -#include <stdexcept> // for out_of_range -#include <string> // for string +#include <random> // for mt19937, rand... +#include <stdexcept> // for out_of_range +#include <string> // for string, basic... namespace caosdb::entity { +using boost::filesystem::exists; +using boost::filesystem::is_directory; using caosdb::entity::v1alpha1::IdResponse; using ProtoParent = caosdb::entity::v1alpha1::Parent; using ProtoProperty = caosdb::entity::v1alpha1::Property; using ProtoEntity = caosdb::entity::v1alpha1::Entity; +using ProtoFileDescriptor = caosdb::entity::v1alpha1::FileDescriptor; using ProtoMessage = caosdb::entity::v1alpha1::Message; using ::google::protobuf::RepeatedPtrField; +using caosdb::StatusCode; +using caosdb::entity::v1alpha1::EntityResponse; +using caosdb::entity::v1alpha1::FileTransmissionId; +using google::protobuf::RepeatedPtrField; static const std::string logger_name = "caosdb::entity"; +struct FileDescriptor { + FileTransmissionId *file_transmission_id; + ProtoFileDescriptor *wrapped; + boost::filesystem::path local_path; +}; + /** * Abstract base class for Messages, Properties and Parents container classes. * @@ -521,12 +545,18 @@ public: this->wrapped->CopyFrom(*original.wrapped); }; explicit Entity(IdResponse *idResponse); - explicit inline Entity(ProtoEntity *wrapped) : wrapped(wrapped) { - errors.wrapped = this->wrapped->mutable_errors(); - warnings.wrapped = this->wrapped->mutable_warnings(); - infos.wrapped = this->wrapped->mutable_infos(); + explicit Entity(ProtoEntity *wrapped) : wrapped(wrapped) { properties.wrapped = this->wrapped->mutable_properties(); parents.wrapped = this->wrapped->mutable_parents(); + errors.wrapped = CreateMessagesField(); + warnings.wrapped = CreateMessagesField(); + infos.wrapped = CreateMessagesField(); + }; + explicit inline Entity(EntityResponse *response) + : Entity(response->release_entity()) { + errors.wrapped->Swap(response->mutable_errors()); + warnings.wrapped->Swap(response->mutable_warnings()); + infos.wrapped->Swap(response->mutable_infos()); }; [[nodiscard]] inline auto GetId() const noexcept -> const std::string & { @@ -605,10 +635,69 @@ public: */ auto CopyTo(ProtoEntity *target) -> void; -protected: + auto SetFilePath(const std::string &path) -> void; + inline auto HasFile() const -> bool { + return !this->file_descriptor.local_path.empty(); + } + auto SetFileTransmissionRegistrationId(const std::string ®istration_id) + -> void; + inline auto SetFileTransmissionId(FileTransmissionId *file_transmission_id) + -> void { + file_transmission_id->set_file_id(GetNextFileId()); + file_descriptor.file_transmission_id = file_transmission_id; + } + + inline auto GetFileDescriptor() -> FileDescriptor & { + return this->file_descriptor; + } + + inline auto GetLocalPath() const noexcept -> const boost::filesystem::path & { + return this->file_descriptor.local_path; + } + + inline auto SetLocalPath(const boost::filesystem::path &local_path) noexcept + -> StatusCode { + if (GetRole() != "File") { + CAOSDB_LOG_WARN(logger_name) + << "Entity::SetLocalPath failed. This is not a file entity."; + return StatusCode::NOT_A_FILE_ENTITY; + } + if (!exists(local_path)) { + CAOSDB_LOG_WARN(logger_name) + << "Entity::SetLocalPath failed. This file does not exists: " + << local_path.string(); + return StatusCode::FILE_DOES_NOT_EXIST_LOCALLY; + } + if (is_directory(local_path)) { + CAOSDB_LOG_WARN(logger_name) + << "Entity::SetLocalPath failed. This file is a directory: " + << local_path.string(); + return StatusCode::PATH_IS_A_DIRECTORY; + } + + CAOSDB_LOG_TRACE(logger_name) + << "Entity::SetLocalPath(" << local_path.string() << ");"; + this->file_descriptor.local_path = local_path; + return StatusCode::SUCCESS; + } + +private: + inline auto GetNextFileId() -> std::string { + std::string str = "0123456789abcdef"; + std::mt19937 generator(std::random_device{}()); + std::uniform_int_distribution<int> distribution(0, str.size() - 1); + std::string result(10, '\0'); + + for (auto &dis : result) { + dis = str[distribution(generator)]; + } + return result; + } static auto CreateProtoEntity() -> ProtoEntity *; + static auto CreateMessagesField() -> RepeatedPtrField<ProtoMessage> *; auto SetId(const std::string &id) -> void; auto SetVersionId(const std::string &id) -> void; + FileDescriptor file_descriptor; ProtoEntity *wrapped; Properties properties; Parents parents; diff --git a/include/caosdb/file_transmission/download_request_handler.h b/include/caosdb/file_transmission/download_request_handler.h new file mode 100644 index 0000000000000000000000000000000000000000..c6ee1c36b67c637c8faad901f590a4eee318b7b5 --- /dev/null +++ b/include/caosdb/file_transmission/download_request_handler.h @@ -0,0 +1,121 @@ +/* + * This file is a part of the CaosDB Project. + * Copyright (C) 2021 Timm Fitschen <t.fitschen@indiscale.com> + * Copyright (C) 2021 IndiScale GmbH <info@indiscale.com> + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <https://www.gnu.org/licenses/>. + * + ********************************************************************************* + * + * This is derived work which is heavily based on + * https://github.com/NeiRo21/grpcpp-bidi-streaming, Commit + * cd9cb78e5d6d72806c2ec4c703e5e856b223dc96, Aug 10, 2020 + * + * The orginal work is licensed as + * + * > MIT License + * > + * > Copyright (c) 2019 NeiRo21 + * > + * > Permission is hereby granted, free of charge, to any person obtaining a + * > copy of this software and associated documentation files (the "Software"), + * > to deal in the Software without restriction, including without limitation + * > the rights to use, copy, modify, merge, publish, distribute, sublicense, + * > and/or sell copies of the Software, and to permit persons to whom the + * > Software is furnished to do so, subject to the following conditions: + * > + * > The above copyright notice and this permission notice shall be included in + * > all copies or substantial portions of the Software. + * > + * > THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * > IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * > FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * > AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * > LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * > FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * > DEALINGS IN THE SOFTWARE. + */ +#ifndef CAOSDB_FILE_TRANSMISSION_DOWNLOAD_REQUEST_HANDLER_H +#define CAOSDB_FILE_TRANSMISSION_DOWNLOAD_REQUEST_HANDLER_H + +#include "caosdb/entity.h" // for FileDescriptor +#include "caosdb/entity/v1alpha1/main.grpc.pb.h" // for FileTransmissionS... +#include "caosdb/entity/v1alpha1/main.pb.h" // for FileDownloadResponse +#include "caosdb/file_transmission/file_writer.h" // for FileWriter +#include "caosdb/handler_interface.h" // for HandlerTag, Handl... +#include <grpcpp/impl/codegen/async_stream.h> // for ClientAsyncReader +#include <grpcpp/impl/codegen/client_context.h> // for ClientContext +#include <grpcpp/impl/codegen/completion_queue.h> // for CompletionQueue +#include <grpcpp/impl/codegen/status.h> // for Status +#include <memory> // for unique_ptr + +namespace caosdb::transaction { +using caosdb::entity::FileDescriptor; +using caosdb::entity::v1alpha1::FileDownloadRequest; +using caosdb::entity::v1alpha1::FileDownloadResponse; +using caosdb::entity::v1alpha1::FileTransmissionService; +using caosdb::transaction::HandlerInterface; +using caosdb::transaction::HandlerTag; + +class DownloadRequestHandler final : public HandlerInterface { +public: + DownloadRequestHandler(HandlerTag tag, FileTransmissionService::Stub *stub, + grpc::CompletionQueue *cq, + FileDescriptor file_descriptor); + + ~DownloadRequestHandler() override = default; + + DownloadRequestHandler(const DownloadRequestHandler &) = delete; + DownloadRequestHandler &operator=(const DownloadRequestHandler &) = delete; + DownloadRequestHandler(DownloadRequestHandler &&) = delete; + DownloadRequestHandler &operator=(DownloadRequestHandler &&) = delete; + + void Start() override { OnNext(true); } + + bool OnNext(bool ok) override; + + void Cancel() override; + +protected: + enum class CallState { NewCall, SendingRequest, ReceivingFile, CallComplete }; + + void handleNewCallState(); + void handleSendingRequestState(); + void handleReceivingFileState(); + void handleCallCompleteState(); + + HandlerTag tag_; + + FileTransmissionService::Stub *stub_; + grpc::CompletionQueue *cq_; + grpc::ClientContext ctx_; + + std::unique_ptr<grpc::ClientAsyncReader<FileDownloadResponse>> rpc_; + + FileDownloadRequest *request_; + FileDownloadResponse *response_; + grpc::Status status_; + + CallState state_; + + std::unique_ptr<FileWriter> fileWriter_; + + FileDescriptor file_descriptor_; + + unsigned long long bytesReceived_; +}; + +} // namespace caosdb::transaction + +#endif diff --git a/include/caosdb/file_transmission/file_error.h b/include/caosdb/file_transmission/file_error.h new file mode 100644 index 0000000000000000000000000000000000000000..ae057be3ba47f894f8fdbc22adb6e6d56accdafd --- /dev/null +++ b/include/caosdb/file_transmission/file_error.h @@ -0,0 +1,64 @@ +/* + * This file is a part of the CaosDB Project. + * Copyright (C) 2021 Timm Fitschen <t.fitschen@indiscale.com> + * Copyright (C) 2021 IndiScale GmbH <info@indiscale.com> + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <https://www.gnu.org/licenses/>. + * + ********************************************************************************* + * + * This is derived work which is heavily based on + * https://github.com/NeiRo21/grpcpp-bidi-streaming, Commit + * cd9cb78e5d6d72806c2ec4c703e5e856b223dc96, Aug 10, 2020 + * + * The orginal work is licensed as + * + * > MIT License + * > + * > Copyright (c) 2019 NeiRo21 + * > + * > Permission is hereby granted, free of charge, to any person obtaining a + * > copy of this software and associated documentation files (the "Software"), + * > to deal in the Software without restriction, including without limitation + * > the rights to use, copy, modify, merge, publish, distribute, sublicense, + * > and/or sell copies of the Software, and to permit persons to whom the + * > Software is furnished to do so, subject to the following conditions: + * > + * > The above copyright notice and this permission notice shall be included in + * > all copies or substantial portions of the Software. + * > + * > THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * > IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * > FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * > AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * > LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * > FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * > DEALINGS IN THE SOFTWARE. + */ +#ifndef CAOSDB_FILE_TRANSMISSION_FILE_ERROR_H +#define CAOSDB_FILE_TRANSMISSION_FILE_ERROR_H + +#include <stdexcept> +#include <string> + +namespace caosdb::transaction { + +class FileIOError : public std::runtime_error { +public: + FileIOError(const std::string &message) : std::runtime_error(message) {} +}; + +} // namespace caosdb::transaction + +#endif diff --git a/include/caosdb/file_transmission/file_reader.h b/include/caosdb/file_transmission/file_reader.h new file mode 100644 index 0000000000000000000000000000000000000000..67c1247fa97cc43e28064b4e0da81812f41b231a --- /dev/null +++ b/include/caosdb/file_transmission/file_reader.h @@ -0,0 +1,89 @@ +/* + * This file is a part of the CaosDB Project. + * Copyright (C) 2021 Timm Fitschen <t.fitschen@indiscale.com> + * Copyright (C) 2021 IndiScale GmbH <info@indiscale.com> + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <https://www.gnu.org/licenses/>. + * + ********************************************************************************* + * + * This is derived work which is heavily based on + * https://github.com/NeiRo21/grpcpp-bidi-streaming, Commit + * cd9cb78e5d6d72806c2ec4c703e5e856b223dc96, Aug 10, 2020 + * + * The orginal work is licensed as + * + * > MIT License + * > + * > Copyright (c) 2019 NeiRo21 + * > + * > Permission is hereby granted, free of charge, to any person obtaining a + * > copy of this software and associated documentation files (the "Software"), + * > to deal in the Software without restriction, including without limitation + * > the rights to use, copy, modify, merge, publish, distribute, sublicense, + * > and/or sell copies of the Software, and to permit persons to whom the + * > Software is furnished to do so, subject to the following conditions: + * > + * > The above copyright notice and this permission notice shall be included in + * > all copies or substantial portions of the Software. + * > + * > THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * > IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * > FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * > AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * > LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * > FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * > DEALINGS IN THE SOFTWARE. + */ +#ifndef CAOSDB_FILE_TRANSMISSION_FILE_READER_H +#define CAOSDB_FILE_TRANSMISSION_FILE_READER_H + +#include <boost/filesystem/fstream.hpp> // for ifstream +#include <boost/filesystem/operations.hpp> // for exists +#include <boost/filesystem/path.hpp> // for path +#include <fstream> // for ifstream, size_t +#include <string> // for string + +namespace caosdb::transaction { +using boost::filesystem::exists; +using boost::filesystem::ifstream; +using boost::filesystem::path; + +class FileReader final { +public: + FileReader(boost::filesystem::path filename); + + ~FileReader() = default; + + FileReader(const FileReader &) = delete; + FileReader &operator=(const FileReader &) = delete; + + FileReader(FileReader &&) = default; + FileReader &operator=(FileReader &&) = default; + + unsigned long long fileSize() const { return size_; } + + std::size_t read(std::string &buffer); + +private: + void openFile(); + + std::ifstream stream_; + boost::filesystem::path filename_; + unsigned long long size_; +}; + +} // namespace caosdb::transaction + +#endif diff --git a/include/caosdb/file_transmission/file_writer.h b/include/caosdb/file_transmission/file_writer.h new file mode 100644 index 0000000000000000000000000000000000000000..801d74b9547951d2a3b86ed4b333bfb4b7035aa9 --- /dev/null +++ b/include/caosdb/file_transmission/file_writer.h @@ -0,0 +1,81 @@ +/* + * This file is a part of the CaosDB Project. + * Copyright (C) 2021 Timm Fitschen <t.fitschen@indiscale.com> + * Copyright (C) 2021 IndiScale GmbH <info@indiscale.com> + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <https://www.gnu.org/licenses/>. + * + ********************************************************************************* + * + * This is derived work which is heavily based on + * https://github.com/NeiRo21/grpcpp-bidi-streaming, Commit + * cd9cb78e5d6d72806c2ec4c703e5e856b223dc96, Aug 10, 2020 + * + * The orginal work is licensed as + * + * > MIT License + * > + * > Copyright (c) 2019 NeiRo21 + * > + * > Permission is hereby granted, free of charge, to any person obtaining a + * > copy of this software and associated documentation files (the "Software"), + * > to deal in the Software without restriction, including without limitation + * > the rights to use, copy, modify, merge, publish, distribute, sublicense, + * > and/or sell copies of the Software, and to permit persons to whom the + * > Software is furnished to do so, subject to the following conditions: + * > + * > The above copyright notice and this permission notice shall be included in + * > all copies or substantial portions of the Software. + * > + * > THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * > IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * > FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * > AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * > LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * > FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * > DEALINGS IN THE SOFTWARE. + */ +#ifndef CAOSDB_FILE_TRANSMISSION_FILE_WRITER_H +#define CAOSDB_FILE_TRANSMISSION_FILE_WRITER_H + +#include <boost/filesystem/path.hpp> // for path +#include <fstream> // for ofstream +#include <string> // for string + +namespace caosdb::transaction { + +class FileWriter final { +public: + FileWriter(boost::filesystem::path filename); + + ~FileWriter() = default; + + FileWriter(const FileWriter &) = delete; + FileWriter &operator=(const FileWriter &) = delete; + + FileWriter(FileWriter &&) = default; + FileWriter &operator=(FileWriter &&) = default; + + void write(const std::string &buffer); + +private: + void openFile(); + + std::ofstream stream_; + boost::filesystem::path filename_; +}; + +} // namespace caosdb::transaction + +#endif diff --git a/include/caosdb/file_transmission/register_file_upload_handler.h b/include/caosdb/file_transmission/register_file_upload_handler.h new file mode 100644 index 0000000000000000000000000000000000000000..17a42c0a2a2cf1d591048abb6e4c8b329bfd008c --- /dev/null +++ b/include/caosdb/file_transmission/register_file_upload_handler.h @@ -0,0 +1,97 @@ +/* + * This file is a part of the CaosDB Project. + * Copyright (C) 2021 Timm Fitschen <t.fitschen@indiscale.com> + * Copyright (C) 2021 IndiScale GmbH <info@indiscale.com> + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <https://www.gnu.org/licenses/>. + * + ********************************************************************************* + * + * This is derived work which is heavily based on + * https://github.com/NeiRo21/grpcpp-bidi-streaming, Commit + * cd9cb78e5d6d72806c2ec4c703e5e856b223dc96, Aug 10, 2020 + * + * The orginal work is licensed as + * + * > MIT License + * > + * > Copyright (c) 2019 NeiRo21 + * > + * > Permission is hereby granted, free of charge, to any person obtaining a + * > copy of this software and associated documentation files (the "Software"), + * > to deal in the Software without restriction, including without limitation + * > the rights to use, copy, modify, merge, publish, distribute, sublicense, + * > and/or sell copies of the Software, and to permit persons to whom the + * > Software is furnished to do so, subject to the following conditions: + * > + * > The above copyright notice and this permission notice shall be included in + * > all copies or substantial portions of the Software. + * > + * > THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * > IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * > FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * > AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * > LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * > FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * > DEALINGS IN THE SOFTWARE. + */ +#ifndef CAOSDB_FILE_TRANSMISSION_REGISTER_FILE_UPLOAD_H +#define CAOSDB_FILE_TRANSMISSION_REGISTER_FILE_UPLOAD_H + +#include "caosdb/entity/v1alpha1/main.grpc.pb.h" // for FileTransmissionS... +#include "caosdb/entity/v1alpha1/main.pb.h" // for FileDownloadResponse +#include "caosdb/handler_interface.h" // for HandlerTag, Handl... +#include "caosdb/unary_rpc_handler.h" +#include <grpcpp/impl/codegen/async_unary_call.h> // for ClientAsyncRespons... +#include <grpcpp/impl/codegen/completion_queue.h> // for CompletionQueue +#include <memory> // for unique_ptr + +namespace caosdb::transaction { + +using caosdb::entity::v1alpha1::FileTransmissionService; +using caosdb::entity::v1alpha1::RegisterFileUploadRequest; +using caosdb::entity::v1alpha1::RegisterFileUploadResponse; + +class RegisterFileUploadHandler final : public UnaryRpcHandler { +public: + RegisterFileUploadHandler(HandlerTag tag, FileTransmissionService::Stub *stub, + grpc::CompletionQueue *completion_queue, + RegisterFileUploadRequest *request, + RegisterFileUploadResponse *response); + + ~RegisterFileUploadHandler(); + + RegisterFileUploadHandler(const RegisterFileUploadHandler &) = delete; + RegisterFileUploadHandler & + operator=(const RegisterFileUploadHandler &) = delete; + RegisterFileUploadHandler(RegisterFileUploadHandler &&) = delete; + RegisterFileUploadHandler &operator=(RegisterFileUploadHandler &&) = delete; + +protected: + void handleNewCallState() override; + + HandlerTag tag_; + + FileTransmissionService::Stub *stub_; + + std::unique_ptr<grpc::ClientAsyncResponseReader<RegisterFileUploadResponse>> + rpc_; + + RegisterFileUploadRequest *request_; + RegisterFileUploadResponse *response_; +}; + +} // namespace caosdb::transaction + +#endif diff --git a/include/caosdb/file_transmission/upload_request_handler.h b/include/caosdb/file_transmission/upload_request_handler.h new file mode 100644 index 0000000000000000000000000000000000000000..54621d12ba7d67d374037ecdb1f1259662071319 --- /dev/null +++ b/include/caosdb/file_transmission/upload_request_handler.h @@ -0,0 +1,129 @@ +/* + * This file is a part of the CaosDB Project. + * Copyright (C) 2021 Timm Fitschen <t.fitschen@indiscale.com> + * Copyright (C) 2021 IndiScale GmbH <info@indiscale.com> + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <https://www.gnu.org/licenses/>. + * + ********************************************************************************* + * + * This is derived work which is heavily based on + * https://github.com/NeiRo21/grpcpp-bidi-streaming, Commit + * cd9cb78e5d6d72806c2ec4c703e5e856b223dc96, Aug 10, 2020 + * + * The orginal work is licensed as + * + * > MIT License + * > + * > Copyright (c) 2019 NeiRo21 + * > + * > Permission is hereby granted, free of charge, to any person obtaining a + * > copy of this software and associated documentation files (the "Software"), + * > to deal in the Software without restriction, including without limitation + * > the rights to use, copy, modify, merge, publish, distribute, sublicense, + * > and/or sell copies of the Software, and to permit persons to whom the + * > Software is furnished to do so, subject to the following conditions: + * > + * > The above copyright notice and this permission notice shall be included in + * > all copies or substantial portions of the Software. + * > + * > THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * > IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * > FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * > AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * > LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * > FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * > DEALINGS IN THE SOFTWARE. + */ +#ifndef CAOSDB_FILE_TRANSMISSION_UPLOAD_REQUEST_HANDLER_H +#define CAOSDB_FILE_TRANSMISSION_UPLOAD_REQUEST_HANDLER_H + +#include "caosdb/entity.h" // for FileDescriptor +#include "caosdb/entity/v1alpha1/main.grpc.pb.h" // for FileTransmissionS... +#include "caosdb/entity/v1alpha1/main.pb.h" // for FileUploadRequest +#include "caosdb/file_transmission/file_reader.h" // for FileReader +#include "caosdb/handler_interface.h" // for HandlerTag, Handl... +#include <cstdint> // for uint64_t +#include <grpcpp/impl/codegen/async_stream.h> // for ClientAsyncWriter +#include <grpcpp/impl/codegen/client_context.h> // for ClientContext +#include <grpcpp/impl/codegen/completion_queue.h> // for CompletionQueue +#include <grpcpp/impl/codegen/status.h> // for Status +#include <memory> // for unique_ptr + +namespace caosdb::transaction { +using caosdb::entity::FileDescriptor; +using caosdb::entity::v1alpha1::FileTransmissionService; +using caosdb::entity::v1alpha1::FileUploadRequest; +using caosdb::entity::v1alpha1::FileUploadResponse; +using caosdb::transaction::HandlerInterface; +using caosdb::transaction::HandlerTag; + +class UploadRequestHandler final : public HandlerInterface { +public: + UploadRequestHandler(HandlerTag tag, FileTransmissionService::Stub *stub, + grpc::CompletionQueue *cq, + FileDescriptor file_descriptor); + + ~UploadRequestHandler() override = default; + + UploadRequestHandler(const UploadRequestHandler &) = delete; + UploadRequestHandler &operator=(const UploadRequestHandler &) = delete; + UploadRequestHandler(UploadRequestHandler &&) = delete; + UploadRequestHandler &operator=(UploadRequestHandler &&) = delete; + + void Start() override { OnNext(true); } + + bool OnNext(bool ok) override; + + void Cancel() override; + +protected: + enum class CallState { + NewCall, + SendingHeader, + SendingFile, + ExpectingResponse, + CallComplete + }; + + void handleNewCallState(); + void handleSendingHeaderState(); + void handleSendingFileState(); + void handleExpectingResponseState(); + void handleCallCompleteState(); + + HandlerTag tag_; + + FileTransmissionService::Stub *stub_; + grpc::CompletionQueue *cq_; + grpc::ClientContext ctx_; + + std::unique_ptr<grpc::ClientAsyncWriter<FileUploadRequest>> rpc_; + + FileUploadRequest *request_; + FileUploadResponse *response_; + grpc::Status status_; + + CallState state_; + + std::unique_ptr<FileReader> fileReader_; + + FileDescriptor file_descriptor_; + + uint64_t bytesToSend_; +}; + +} // namespace caosdb::transaction + +#endif diff --git a/include/caosdb/handler_interface.h b/include/caosdb/handler_interface.h new file mode 100644 index 0000000000000000000000000000000000000000..4ba563a300175478c8140e9b88c991e2f5321245 --- /dev/null +++ b/include/caosdb/handler_interface.h @@ -0,0 +1,83 @@ +/* + * This file is a part of the CaosDB Project. + * Copyright (C) 2021 Timm Fitschen <t.fitschen@indiscale.com> + * Copyright (C) 2021 IndiScale GmbH <info@indiscale.com> + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <https://www.gnu.org/licenses/>. + * + ********************************************************************************* + * + * This is derived work which is heavily based on + * https://github.com/NeiRo21/grpcpp-bidi-streaming, Commit + * cd9cb78e5d6d72806c2ec4c703e5e856b223dc96, Aug 10, 2020 + * + * The orginal work is licensed as + * + * > MIT License + * > + * > Copyright (c) 2019 NeiRo21 + * > + * > Permission is hereby granted, free of charge, to any person obtaining a + * > copy of this software and associated documentation files (the "Software"), + * > to deal in the Software without restriction, including without limitation + * > the rights to use, copy, modify, merge, publish, distribute, sublicense, + * > and/or sell copies of the Software, and to permit persons to whom the + * > Software is furnished to do so, subject to the following conditions: + * > + * > The above copyright notice and this permission notice shall be included in + * > all copies or substantial portions of the Software. + * > + * > THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * > IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * > FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * > AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * > LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * > FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * > DEALINGS IN THE SOFTWARE. + */ +#ifndef CAOSDB_HANDLER_INTERFACE_H +#define CAOSDB_HANDLER_INTERFACE_H + +#include "caosdb/transaction_status.h" // for TransactionStatus +#include <memory> +#include <string> + +namespace caosdb::transaction { + +const static std::string logger_name = "caosdb::transaction"; + +class HandlerInterface { +public: + HandlerInterface() : transaction_status(TransactionStatus::READY()) {} + + virtual ~HandlerInterface() = default; + + virtual void Start() = 0; + + virtual bool OnNext(bool ok) = 0; + + virtual void Cancel() = 0; + + inline TransactionStatus GetStatus() { return this->transaction_status; } + +protected: + TransactionStatus transaction_status; +}; + +using HandlerPtr = std::unique_ptr<HandlerInterface>; +using HandlerTag = HandlerPtr *; + +} // namespace caosdb::transaction + +#endif diff --git a/include/caosdb/protobuf_helper.h b/include/caosdb/protobuf_helper.h index ce7dd5ef7f58ce0fb1e72df08367426deb70cfca..bd3395f4de7e8cd586a613869e84b1d5ffeb729b 100644 --- a/include/caosdb/protobuf_helper.h +++ b/include/caosdb/protobuf_helper.h @@ -22,7 +22,15 @@ #ifndef CAOSDB_PROTOBUF_HELPER_H #define CAOSDB_PROTOBUF_HELPER_H -#include <google/protobuf/arena.h> +#include <google/protobuf/arena.h> // for Arena +#include <google/protobuf/extension_set.h> // for Arena + +#define CAOSDB_DEBUG_MESSAGE_STRING(message, out) \ + std::string out; \ + { \ + google::protobuf::util::JsonOptions options; \ + google::protobuf::util::MessageToJsonString(message, &out, options); \ + } namespace caosdb::utility { diff --git a/include/caosdb/status_code.h b/include/caosdb/status_code.h index 1dc8030b7d36a71e4b05758e22ccd5c829ef3f95..f996901581f10d3fdadc0137b3b0a3db74348036 100644 --- a/include/caosdb/status_code.h +++ b/include/caosdb/status_code.h @@ -41,6 +41,7 @@ enum StatusCode { INITIAL = -2, EXECUTING = -1, SUCCESS = 0, + // TODO(tf) Map other GRPC errors AUTHENTICATION_ERROR = 16, CONNECTION_ERROR = 14, GENERIC_RPC_ERROR = 20, @@ -53,6 +54,11 @@ enum StatusCode { UNSUPPORTED_FEATURE = 27, ORIGINAL_ENTITY_MISSING_ID = 28, EXTERN_C_ASSIGNMENT_ERROR = 29, + PATH_IS_A_DIRECTORY = 30, + FILE_DOES_NOT_EXIST_LOCALLY = 31, + FILE_UPLOAD_ERROR = 32, + FILE_DOWNLOAD_ERROR = 33, + NOT_A_FILE_ENTITY = 34, OTHER_CLIENT_ERROR = 9999, }; diff --git a/include/caosdb/transaction.h b/include/caosdb/transaction.h index eed8f1c1e0a495fc0f29f19e08f3aa142dae57b2..368eb2e10d311ce862cce09c8c417c54da379a5a 100644 --- a/include/caosdb/transaction.h +++ b/include/caosdb/transaction.h @@ -20,22 +20,30 @@ */ #ifndef CAOSDB_TRANSACTION_H #define CAOSDB_TRANSACTION_H -#include "boost/log/core/record.hpp" // for record -#include "boost/log/sources/record_ostream.hpp" // for basic_record_o... -#include "boost/preprocessor/seq/limits/enum_256.hpp" // for BOOST_PP_SEQ_E... -#include "boost/preprocessor/seq/limits/size_256.hpp" // for BOOST_PP_SEQ_S... -#include "caosdb/entity.h" // for Entity -#include "caosdb/logging.h" -#include "caosdb/entity/v1alpha1/main.grpc.pb.h" // for EntityTransactionSe... -#include "caosdb/entity/v1alpha1/main.pb.h" // for Entity, RetrieveReq... -#include "caosdb/transaction_status.h" // for TransactionStatus + +#include "caosdb/entity.h" // for Entity, FileDe... +#include "caosdb/entity/v1alpha1/main.grpc.pb.h" // for EntityTransact... +#include "caosdb/entity/v1alpha1/main.pb.h" // for MultiTransacti... +#include "caosdb/handler_interface.h" // for HandlerInterface +#include "caosdb/transaction_handler.h" // for EntityTransactionHandler +#include "caosdb/logging.h" // for CAOSDB_LOG_ERR... +#include "caosdb/protobuf_helper.h" // for get_arena #include "caosdb/status_code.h" // for StatusCode -#include "google/protobuf/util/json_util.h" // for MessageToJsonString, Jso... -#include <iterator> +#include "caosdb/transaction_status.h" // for StatusCode +#include <boost/log/core/record.hpp> // for record +#include <boost/log/sources/record_ostream.hpp> // for basic_record_o... +#include <boost/preprocessor/seq/limits/enum_256.hpp> // for BOOST_PP_SEQ_E... +#include <boost/preprocessor/seq/limits/size_256.hpp> // for BOOST_PP_SEQ_S... +#include <google/protobuf/arena.h> // for Arena +#include <google/protobuf/util/json_util.h> // for MessageToJsonS... +#include <grpcpp/impl/codegen/completion_queue.h> // for CompletionQueue +#include <iterator> // for iterator, next +#include <map> // for map // IWYU pragma: no_include <ext/alloc_traits.h> -#include <memory> // for shared_ptr, unique_ptr -#include <string> // for string -#include <vector> // for vector +#include <memory> // for unique_ptr +#include <string> // for string +#include <utility> // for move +#include <vector> // for vector /** * Do all necessary checks and assure that another retrieval (by id or by @@ -157,19 +165,28 @@ */ namespace caosdb::transaction { using caosdb::entity::Entity; -using ProtoEntity = caosdb::entity::v1alpha1::Entity; +using caosdb::entity::FileDescriptor; +using caosdb::entity::v1alpha1::EntityResponse; using caosdb::entity::v1alpha1::EntityTransactionService; +using caosdb::entity::v1alpha1::FileDownloadRequest; +using caosdb::entity::v1alpha1::FileDownloadResponse; +using caosdb::entity::v1alpha1::FileTransmissionId; +using caosdb::entity::v1alpha1::FileTransmissionService; +using caosdb::entity::v1alpha1::FileUploadRequest; +using caosdb::entity::v1alpha1::FileUploadResponse; using caosdb::entity::v1alpha1::IdResponse; using caosdb::entity::v1alpha1::MultiTransactionRequest; using caosdb::entity::v1alpha1::MultiTransactionResponse; +using caosdb::entity::v1alpha1::RegisterFileUploadRequest; +using caosdb::entity::v1alpha1::RegisterFileUploadResponse; using caosdb::transaction::TransactionStatus; -using WrappedResponseCase = - caosdb::entity::v1alpha1::TransactionResponse::WrappedResponseCase; +using TransactionResponseCase = + caosdb::entity::v1alpha1::TransactionResponse::TransactionResponseCase; +using caosdb::utility::get_arena; +using google::protobuf::Arena; class Transaction; -static const std::string logger_name = "caosdb::transaction"; - /** * Abstract base class for the results of a Transaction. */ @@ -199,24 +216,34 @@ private: }; }; -/** - * Container with results of a transaction. - */ -class MultiResultSet : public ResultSet { +class AbstractMultiResultSet : public ResultSet { public: - ~MultiResultSet() = default; - explicit MultiResultSet(std::vector<std::unique_ptr<Entity>> result_set); + virtual ~AbstractMultiResultSet() = default; + inline explicit AbstractMultiResultSet( + std::vector<std::unique_ptr<Entity>> result_set) + : items(std::move(result_set)) {} [[nodiscard]] inline auto size() const noexcept -> int override { - return this->entities.size(); + return this->items.size(); } [[nodiscard]] inline auto at(const int index) const -> const Entity & override { - return *(this->entities.at(index)); + return *(this->items.at(index)); } [[nodiscard]] inline auto mutable_at(int index) const -> Entity * override { - return this->entities.at(index).get(); + return this->items.at(index).get(); } - std::vector<std::unique_ptr<Entity>> entities; + +protected: + std::vector<std::unique_ptr<Entity>> items; +}; + +/** + * Container with results of a transaction. + */ +class MultiResultSet : public AbstractMultiResultSet { +public: + ~MultiResultSet() = default; + explicit MultiResultSet(std::vector<std::unique_ptr<Entity>> result_set); }; /** @@ -232,15 +259,36 @@ public: * yet. */ enum TransactionType { - NONE, /// Unspecified or not specified yet. - READ_ONLY, /// Only retrievals (by id, by query) - INSERT, /// Only insertions - UPDATE, /// Only updates - DELETE, /// Only deletions - MIXED_WRITE, /// Only insertions, deletions, updates - MIXED_READ_AND_WRITE /// all kind of transaction. + NONE, //!< Unspecified or not specified yet. + READ_ONLY, //!< Only retrievals (by id, by query) + INSERT, //!< Only insertions + UPDATE, //!< Only updates + DELETE, //!< Only deletions + MIXED_WRITE, //!< Only insertions, deletions, updates + MIXED_READ_AND_WRITE //!< all kind of transaction. }; - Transaction(std::shared_ptr<EntityTransactionService::Stub> service_stub); + Transaction(std::shared_ptr<EntityTransactionService::Stub> entity_service, + std::shared_ptr<FileTransmissionService::Stub> file_service); + + ~Transaction(); + + Transaction(const Transaction &) = delete; + Transaction &operator=(const Transaction &) = delete; + Transaction(Transaction &&) = delete; + Transaction &operator=(Transaction &&) = delete; + + /** + * Add an entity id to this transaction for retrieval and also download the + * file. + * + * If the entity doesn't have a file a warning is appended. + * + * If the file cannot be downloaded due to unsufficient permissions an error + * is appended. + */ + auto RetrieveAndDownloadFilesById(const std::string &id, + const std::string &local_path) noexcept + -> StatusCode; /** * Add an entity id to this transaction for retrieval. @@ -321,13 +369,12 @@ public: /** * Return the current status of the transaction. */ - [[nodiscard]] inline auto GetStatus() const -> TransactionStatus { + [[nodiscard]] inline auto GetStatus() const noexcept -> TransactionStatus { return this->status; } - [[nodiscard]] inline auto GetResultSet() const -> const ResultSet & { - const ResultSet *result_set = this->result_set.get(); - return *result_set; + [[nodiscard]] inline auto GetResultSet() const noexcept -> const ResultSet & { + return *(this->result_set.get()); } /** @@ -337,7 +384,7 @@ public: * this transaction. In all other cases, the return value will be * -1. */ - [[nodiscard]] inline auto GetCountResult() const -> long { + [[nodiscard]] inline auto GetCountResult() const noexcept -> long { return query_count; } @@ -375,12 +422,56 @@ public: return out; } + /** + * Return the vector which holds all the files which are to be uploaded. + */ + inline auto GetUploadFiles() const -> const std::vector<FileDescriptor> & { + return upload_files; + } + +protected: + /** + * Await and process the current handler's results. + */ + auto ProcessCalls() -> TransactionStatus; + + /** + * Cancels any active handler and drains the completion_queue. + * + * Can stay protected until ExecuteAsynchronously() is actually asynchronous. + * Then it is also intended for aborting an execution after it has already + * started. + */ + auto Cancel() -> void; + + /** + * Return the Arena where this transaction may create Message instances. + * + * Currently, this implementation is only a call to + * caosdb::utility::get_arena(), but in the future we might want to have a + * smarter memory management. + */ + inline auto GetArena() const -> Arena * { return get_arena(); } + private: + grpc::CompletionQueue completion_queue; + std::unique_ptr<HandlerInterface> handler_; + + std::vector<FileDescriptor> upload_files; + std::map<std::string, FileDescriptor> download_files; + + // auto RegisterUploadFile(RegisterFileUploadResponse *response) -> void; + auto UploadFile(FileUploadResponse *response, + const FileDescriptor &file_descriptor, + const std::string ®istration_id) -> void; + auto DownloadFile(FileDownloadResponse *response, + const FileTransmissionId &file_transmission_id) -> void; bool has_query = false; TransactionType transaction_type = TransactionType::NONE; mutable std::unique_ptr<ResultSet> result_set; mutable TransactionStatus status = TransactionStatus::INITIAL(); - std::shared_ptr<EntityTransactionService::Stub> service_stub; + std::shared_ptr<EntityTransactionService::Stub> entity_service; + std::shared_ptr<FileTransmissionService::Stub> file_service; MultiTransactionRequest *request; mutable MultiTransactionResponse *response; std::string error_message; diff --git a/include/caosdb/transaction_handler.h b/include/caosdb/transaction_handler.h new file mode 100644 index 0000000000000000000000000000000000000000..0a7154d03dfbde19284807d1e1f40998c89e1de0 --- /dev/null +++ b/include/caosdb/transaction_handler.h @@ -0,0 +1,45 @@ +#pragma once +#include "caosdb/entity/v1alpha1/main.grpc.pb.h" // for FileTransmissionS... +#include "caosdb/entity/v1alpha1/main.pb.h" // for FileDownloadResponse +#include "caosdb/handler_interface.h" // for HandlerTag +#include "caosdb/unary_rpc_handler.h" // for HandlerTag, Handl... +#include <grpcpp/impl/codegen/async_unary_call.h> // for ClientAsyncRespons... +#include <grpcpp/impl/codegen/completion_queue.h> // for CompletionQueue +#include <memory> // for unique_ptr + +namespace caosdb::transaction { + +using caosdb::entity::v1alpha1::EntityTransactionService; +using caosdb::entity::v1alpha1::MultiTransactionRequest; +using caosdb::entity::v1alpha1::MultiTransactionResponse; + +class EntityTransactionHandler final : public UnaryRpcHandler { +public: + EntityTransactionHandler(HandlerTag tag, EntityTransactionService::Stub *stub, + grpc::CompletionQueue *completion_queue, + MultiTransactionRequest *request, + MultiTransactionResponse *response); + + ~EntityTransactionHandler() override = default; + + EntityTransactionHandler(const EntityTransactionHandler &) = delete; + EntityTransactionHandler & + operator=(const EntityTransactionHandler &) = delete; + EntityTransactionHandler(EntityTransactionHandler &&) = delete; + EntityTransactionHandler &operator=(EntityTransactionHandler &&) = delete; + +protected: + virtual void handleNewCallState() override; + + HandlerTag tag_; + + EntityTransactionService::Stub *stub_; + + std::unique_ptr<grpc::ClientAsyncResponseReader<MultiTransactionResponse>> + rpc_; + + MultiTransactionRequest *request_; + MultiTransactionResponse *response_; +}; + +} // namespace caosdb::transaction diff --git a/include/caosdb/transaction_status.h b/include/caosdb/transaction_status.h index 6f2f5cefd7b36f8c986726b4955f53381339f0a2..ca40c578bc1c3dd2555ccda97c06b4da726e9c7b 100644 --- a/include/caosdb/transaction_status.h +++ b/include/caosdb/transaction_status.h @@ -119,6 +119,22 @@ public: caosdb::get_status_description(StatusCode::AUTHENTICATION_ERROR) + " Original error: " + details); } + /** + * Factory for a FILE_UPLOAD_ERROR status. + * + * This status means that the transaction failed during the upload of the + * file blobs of file entities. + */ + CAOSDB_TRANSACTION_STATUS_DEFAULT_FACTORY(FILE_UPLOAD_ERROR, + StatusCode::FILE_UPLOAD_ERROR); + /** + * Factory for a FILE_DOWN_ERROR status. + * + * This status means that the transaction failed during the download of the + * file blobs of file entities. + */ + CAOSDB_TRANSACTION_STATUS_DEFAULT_FACTORY(FILE_DOWNLOAD_ERROR, + StatusCode::FILE_DOWNLOAD_ERROR); /** * Factory for a TRANSACTION_ERROR status. * @@ -154,6 +170,19 @@ public: " Original error: " + details); } + /** + * Factory for a GENERIC_ERROR status. + * + * This status means that the transaction failed due to errors which + * supposedly do not have a special handling. + */ + inline static auto GENERIC_ERROR(const std::string &details) { + return TransactionStatus( + StatusCode::GENERIC_ERROR, + caosdb::get_status_description(StatusCode::GENERIC_ERROR) + + "Original error: " + details); + } + inline auto ThrowExceptionIfError() const -> void { TransactionStatus::ThrowExceptionIfError(this->code, this->description); } @@ -176,7 +205,7 @@ public: case StatusCode::TRANSACTION_TYPE_ERROR: throw TransactionTypeError(description); default: - throw Exception(StatusCode::GENERIC_ERROR, description); + throw Exception(code, description); } } @@ -211,6 +240,9 @@ public: */ inline auto GetCode() const -> StatusCode { return this->code; } + TransactionStatus(StatusCode code, const std::string &description) + : code(code), description(description){}; + private: /** * The code is an identifier of errors. @@ -221,9 +253,6 @@ private: * Description of the error */ std::string description; - - TransactionStatus(StatusCode code, const std::string &description) - : code(code), description(description){}; }; } // namespace caosdb::transaction diff --git a/include/caosdb/unary_rpc_handler.h b/include/caosdb/unary_rpc_handler.h new file mode 100644 index 0000000000000000000000000000000000000000..6955504a2baea38cb0bd8075bdca2d20c9f8a52a --- /dev/null +++ b/include/caosdb/unary_rpc_handler.h @@ -0,0 +1,89 @@ +/* + * This file is a part of the CaosDB Project. + * Copyright (C) 2021 Timm Fitschen <t.fitschen@indiscale.com> + * Copyright (C) 2021 IndiScale GmbH <info@indiscale.com> + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <https://www.gnu.org/licenses/>. + * + ********************************************************************************* + * + * This is derived work which is heavily based on + * https://github.com/NeiRo21/grpcpp-bidi-streaming, Commit + * cd9cb78e5d6d72806c2ec4c703e5e856b223dc96, Aug 10, 2020 + * + * The orginal work is licensed as + * + * > MIT License + * > + * > Copyright (c) 2019 NeiRo21 + * > + * > Permission is hereby granted, free of charge, to any person obtaining a + * > copy of this software and associated documentation files (the "Software"), + * > to deal in the Software without restriction, including without limitation + * > the rights to use, copy, modify, merge, publish, distribute, sublicense, + * > and/or sell copies of the Software, and to permit persons to whom the + * > Software is furnished to do so, subject to the following conditions: + * > + * > The above copyright notice and this permission notice shall be included in + * > all copies or substantial portions of the Software. + * > + * > THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * > IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * > FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * > AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * > LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * > FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * > DEALINGS IN THE SOFTWARE. + */ +#ifndef CAOSDB_UNARY_RPC_HANDLER_H +#define CAOSDB_UNARY_RPC_HANDLER_H + +#include "caosdb/handler_interface.h" // for HandlerTag, Handl... +#include "caosdb/transaction_status.h" // for TransactionStatus +#include <grpcpp/impl/codegen/client_context.h> // for ClientContext +#include <grpcpp/impl/codegen/completion_queue.h> // for CompletionQueue +#include <grpcpp/impl/codegen/status.h> // for Status + +namespace caosdb::transaction { + +class UnaryRpcHandler : public HandlerInterface { +public: + inline UnaryRpcHandler(grpc::CompletionQueue *completion_queue) + : HandlerInterface(), state_(CallState::NewCall), + completion_queue(completion_queue) {} + + void Start() override { + transaction_status = TransactionStatus::EXECUTING(); + OnNext(true); + } + + bool OnNext(bool ok) override; + + void Cancel() override; + +protected: + virtual void handleNewCallState() = 0; + void handleCallCompleteState(); + + enum class CallState { NewCall, CallComplete }; + CallState state_; + grpc::CompletionQueue *completion_queue; + + grpc::ClientContext call_context; + grpc::Status status_; +}; + +} // namespace caosdb::transaction + +#endif diff --git a/include/caosdb/utility.h b/include/caosdb/utility.h index 30e21747ec4ce39493e24e10e09d7c53c7ab1308..dd53ff952b9737fe76222d998b6813e744b0ad3d 100644 --- a/include/caosdb/utility.h +++ b/include/caosdb/utility.h @@ -33,6 +33,8 @@ #include <memory> #include <string> #include <string_view> +#include <mutex> +#include <shared_mutex> namespace caosdb::utility { using boost::filesystem::exists; diff --git a/proto b/proto index 45d120f78f17986ba67c10b6a4a130e7fa60b34c..485173a714d9ff7c2388945b0a4cad35980cda69 160000 --- a/proto +++ b/proto @@ -1 +1 @@ -Subproject commit 45d120f78f17986ba67c10b6a4a130e7fa60b34c +Subproject commit 485173a714d9ff7c2388945b0a4cad35980cda69 diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 95b2dabac276216bd744ea347a71f986070fce8f..89180ee81fc6b07d4f081c90fd1200530a2d60b6 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -28,6 +28,13 @@ set(libcaosdb_SRC ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/configuration.cpp ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/protobuf_helper.cpp ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/transaction.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/transaction_handler.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/unary_rpc_handler.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/file_transmission/register_file_upload_handler.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/file_transmission/upload_request_handler.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/file_transmission/download_request_handler.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/file_transmission/file_writer.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/file_transmission/file_reader.cpp ) # pass variable to parent scope diff --git a/src/caosdb/connection.cpp b/src/caosdb/connection.cpp index 4ff50d24d329fa6ece3a8e87d304378f3d157782..b43444ce085d262a0ed8e960001e31c8da1a0124 100644 --- a/src/caosdb/connection.cpp +++ b/src/caosdb/connection.cpp @@ -37,6 +37,7 @@ namespace caosdb::connection { using caosdb::configuration::ConfigurationManager; using caosdb::configuration::ConnectionConfiguration; using caosdb::entity::v1alpha1::EntityTransactionService; +using caosdb::entity::v1alpha1::FileTransmissionService; using caosdb::info::VersionInfo; using caosdb::info::v1alpha1::GeneralInfoService; using caosdb::info::v1alpha1::GetVersionInfoRequest; @@ -52,6 +53,8 @@ Connection::Connection(const ConnectionConfiguration &configuration) { this->general_info_service = GeneralInfoService::NewStub(this->channel); this->entity_transaction_service = std::make_shared<EntityTransactionService::Stub>(this->channel); + this->file_transmission_service = + std::make_shared<FileTransmissionService::Stub>(this->channel); } auto Connection::RetrieveVersionInfoNoExceptions() const noexcept @@ -95,8 +98,9 @@ auto Connection::RetrieveVersionInfo() const -> const VersionInfo & { [[nodiscard]] auto Connection::CreateTransaction() const -> std::unique_ptr<Transaction> { - auto service_stub = this->entity_transaction_service; - return std::make_unique<Transaction>(service_stub); + auto entity_service = this->entity_transaction_service; + auto file_service = this->file_transmission_service; + return std::make_unique<Transaction>(entity_service, file_service); } auto ConnectionManager::mHasConnection(const std::string &name) const -> bool { diff --git a/src/caosdb/entity.cpp b/src/caosdb/entity.cpp index 74809f6d0004bae268692e03e604149f00fababe..8f410a3c6c454375fe827ba8058e20423ef9ccce 100644 --- a/src/caosdb/entity.cpp +++ b/src/caosdb/entity.cpp @@ -20,16 +20,20 @@ * */ #include "caosdb/entity.h" -#include "caosdb/entity/v1alpha1/main.pb.h" // for Parent, Arena::CreateMay... +#include "caosdb/entity/v1alpha1/main.pb.h" // for RepeatedPtrField, Property #include "caosdb/protobuf_helper.h" // for get_arena -#include "google/protobuf/arena.h" // for Arena +#include <google/protobuf/arena.h> // for Arena +#include <new> // for operator new namespace caosdb::entity { using caosdb::entity::v1alpha1::IdResponse; using ProtoParent = caosdb::entity::v1alpha1::Parent; using ProtoProperty = caosdb::entity::v1alpha1::Property; using ProtoEntity = caosdb::entity::v1alpha1::Entity; +using ProtoMessage = caosdb::entity::v1alpha1::Message; +using ProtoFileDescriptor = caosdb::entity::v1alpha1::FileDescriptor; using caosdb::utility::get_arena; +using google::protobuf::Arena; Messages::~Messages() = default; @@ -42,7 +46,7 @@ Parent::Parent() : wrapped(Parent::CreateProtoParent()) { } auto Parent::CreateProtoParent() -> ProtoParent * { - return google::protobuf::Arena::CreateMessage<ProtoParent>(get_arena()); + return Arena::CreateMessage<ProtoParent>(get_arena()); } auto Parent::SetName(const std::string &name) -> void { @@ -66,7 +70,7 @@ auto Parent::SetId(const std::string &id) -> void { this->wrapped->set_id(id); } Property::Property() : wrapped(Property::CreateProtoProperty()) {} auto Property::CreateProtoProperty() -> ProtoProperty * { - return google::protobuf::Arena::CreateMessage<ProtoProperty>(get_arena()); + return Arena::CreateMessage<ProtoProperty>(get_arena()); } [[nodiscard]] auto Property::GetId() const -> const std::string & { @@ -148,23 +152,20 @@ auto Entity::RemoveProperty(int index) -> void { } auto Entity::CreateProtoEntity() -> ProtoEntity * { - return google::protobuf::Arena::CreateMessage<ProtoEntity>(get_arena()); + return Arena::CreateMessage<ProtoEntity>(get_arena()); } -Entity::Entity() : wrapped(Entity::CreateProtoEntity()) { - properties.wrapped = this->wrapped->mutable_properties(); - parents.wrapped = this->wrapped->mutable_parents(); - errors.wrapped = this->wrapped->mutable_errors(); - warnings.wrapped = this->wrapped->mutable_warnings(); - infos.wrapped = this->wrapped->mutable_infos(); +auto Entity::CreateMessagesField() -> RepeatedPtrField<ProtoMessage> * { + return Arena::CreateMessage<RepeatedPtrField<ProtoMessage>>(get_arena()); } +Entity::Entity() : Entity(Entity::CreateProtoEntity()) {} + Entity::Entity(IdResponse *idResponse) : Entity() { this->wrapped->set_id(idResponse->id()); - this->wrapped->mutable_errors()->Swap(idResponse->mutable_entity_errors()); - this->wrapped->mutable_warnings()->Swap( - idResponse->mutable_entity_warnings()); - this->wrapped->mutable_infos()->Swap(idResponse->mutable_entity_infos()); + this->errors.wrapped->Swap(idResponse->mutable_errors()); + this->warnings.wrapped->Swap(idResponse->mutable_warnings()); + this->infos.wrapped->Swap(idResponse->mutable_infos()); } auto Entity::SetId(const std::string &id) -> void { this->wrapped->set_id(id); } @@ -201,4 +202,8 @@ auto Entity::SetDatatype(const std::string &datatype) -> void { this->wrapped->set_datatype(datatype); } +auto Entity::SetFilePath(const std::string &path) -> void { + this->wrapped->mutable_file_descriptor()->set_path(path); +} + } // namespace caosdb::entity diff --git a/src/caosdb/file_transmission/download_request_handler.cpp b/src/caosdb/file_transmission/download_request_handler.cpp new file mode 100644 index 0000000000000000000000000000000000000000..fb36ca5b46cbf2c4f08c2c06bc9faf6b00e038be --- /dev/null +++ b/src/caosdb/file_transmission/download_request_handler.cpp @@ -0,0 +1,207 @@ +/* + * This file is a part of the CaosDB Project. + * Copyright (C) 2021 Timm Fitschen <t.fitschen@indiscale.com> + * Copyright (C) 2021 IndiScale GmbH <info@indiscale.com> + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <https://www.gnu.org/licenses/>. + * + ********************************************************************************* + * + * This is derived work which is heavily based on + * https://github.com/NeiRo21/grpcpp-bidi-streaming, Commit + * cd9cb78e5d6d72806c2ec4c703e5e856b223dc96, Aug 10, 2020 + * + * The orginal work is licensed as + * + * > MIT License + * > + * > Copyright (c) 2019 NeiRo21 + * > + * > Permission is hereby granted, free of charge, to any person obtaining a + * > copy of this software and associated documentation files (the "Software"), + * > to deal in the Software without restriction, including without limitation + * > the rights to use, copy, modify, merge, publish, distribute, sublicense, + * > and/or sell copies of the Software, and to permit persons to whom the + * > Software is furnished to do so, subject to the following conditions: + * > + * > The above copyright notice and this permission notice shall be included in + * > all copies or substantial portions of the Software. + * > + * > THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * > IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * > FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * > AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * > LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * > FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * > DEALINGS IN THE SOFTWARE. + */ +#include "caosdb/file_transmission/download_request_handler.h" +#include "caosdb/logging.h" // for CAOSDB_LOG_TRACE +#include "caosdb/protobuf_helper.h" // for get_arena +#include "caosdb/status_code.h" // for GENERIC_RPC_E... +#include "caosdb/transaction_status.h" // for TransactionStatus +#include <boost/filesystem/path.hpp> // for operator<<, path +#include <boost/log/core/record.hpp> // for record +#include <boost/log/detail/attachable_sstream_buf.hpp> // for basic_ostring... +#include <boost/log/sources/record_ostream.hpp> // for basic_record_... +#include <boost/preprocessor/seq/limits/enum_256.hpp> // for BOOST_PP_SEQ_... +#include <boost/preprocessor/seq/limits/size_256.hpp> // for BOOST_PP_SEQ_... +#include <exception> // IWYU pragma: keep +// IWYU pragma: no_include <bits/exception.h> +#include <google/protobuf/arena.h> // for Arena +#include <grpcpp/impl/codegen/async_stream.h> // for ClientAsyncRe... +#include <grpcpp/impl/codegen/client_context.h> // for ClientContext +#include <grpcpp/impl/codegen/completion_queue.h> // for CompletionQueue +#include <grpcpp/impl/codegen/status.h> // for Status +#include <grpcpp/impl/codegen/status_code_enum.h> // for OK, UNAUTHENT... +#include <iostream> // for char_traits +#include <stdexcept> // for runtime_error +#include <string> // for string, opera... +#include <utility> // for move + +namespace caosdb::transaction { +using caosdb::StatusCode; +using caosdb::utility::get_arena; +using google::protobuf::Arena; + +DownloadRequestHandler::DownloadRequestHandler( + HandlerTag tag, FileTransmissionService::Stub *stub, + grpc::CompletionQueue *cq, FileDescriptor file_descriptor) + : tag_(tag), stub_(stub), cq_(cq), + request_(Arena::CreateMessage<FileDownloadRequest>(get_arena())), + response_(Arena::CreateMessage<FileDownloadResponse>(get_arena())), + state_(CallState::NewCall), file_descriptor_(std::move(file_descriptor)), + bytesReceived_(0) {} + +bool DownloadRequestHandler::OnNext(bool ok) { + try { + if (ok) { + if (state_ == CallState::NewCall) { + this->handleNewCallState(); + } else if (state_ == CallState::SendingRequest) { + this->handleSendingRequestState(); + } else if (state_ == CallState::ReceivingFile) { + this->handleReceivingFileState(); + } else if (state_ == CallState::CallComplete) { + this->handleCallCompleteState(); + return false; + } + } else { + state_ = CallState::CallComplete; + rpc_->Finish(&status_, tag_); + } + + return true; + } catch (std::exception &e) { + CAOSDB_LOG_ERROR(logger_name) + << "DownloadRequestHandler caught an exception: " << e.what(); + transaction_status = TransactionStatus::GENERIC_ERROR(e.what()); + state_ = CallState::CallComplete; + } catch (...) { + CAOSDB_LOG_ERROR(logger_name) + << "Transaction error: unknown exception caught"; + transaction_status = TransactionStatus::GENERIC_ERROR( + "DownloadRequestHandler caught an unknown exception"); + state_ = CallState::CallComplete; + } + + if (state_ == CallState::NewCall) { + return false; + } + + ctx_.TryCancel(); + + return true; +} + +void DownloadRequestHandler::Cancel() { ctx_.TryCancel(); } + +void DownloadRequestHandler::handleNewCallState() { + CAOSDB_LOG_TRACE(logger_name) + << "Enter DownloadRequestHandler::handleNewCallState. local_path = " + << file_descriptor_.local_path + << ", download_id = " << file_descriptor_.file_transmission_id; + fileWriter_ = std::make_unique<FileWriter>(file_descriptor_.local_path); + + request_->mutable_file_transmission_id()->CopyFrom( + *(file_descriptor_.file_transmission_id)); + + rpc_ = stub_->PrepareAsyncFileDownload(&ctx_, *request_, cq_); + + transaction_status = TransactionStatus::EXECUTING(); + state_ = CallState::SendingRequest; + rpc_->StartCall(tag_); + CAOSDB_LOG_TRACE(logger_name) + << "Leave DownloadRequestHandler::handleNewCallState"; +} + +void DownloadRequestHandler::handleSendingRequestState() { + CAOSDB_LOG_TRACE(logger_name) + << "Enter DownloadRequestHandler::handleSendingRequestState"; + state_ = CallState::ReceivingFile; + rpc_->Read(response_, tag_); + CAOSDB_LOG_TRACE(logger_name) + << "Leave DownloadRequestHandler::handleSendingRequestState"; +} + +void DownloadRequestHandler::handleReceivingFileState() { + CAOSDB_LOG_TRACE(logger_name) + << "Enter DownloadRequestHandler::handleReceivingFileState"; + if (response_->has_chunk()) { + const auto &chunkData = response_->chunk().data(); + if (chunkData.empty()) { + CAOSDB_LOG_DEBUG(logger_name) << "Received an empty FileChunk, ignoring"; + } else { + fileWriter_->write(chunkData); + bytesReceived_ += chunkData.size(); + } + + state_ = CallState::ReceivingFile; + response_->Clear(); + rpc_->Read(response_, tag_); + } else { + throw std::runtime_error("File chunk expected"); + } + CAOSDB_LOG_TRACE(logger_name) + << "Leave DownloadRequestHandler::handleReceivingFileState"; +} + +void DownloadRequestHandler::handleCallCompleteState() { + CAOSDB_LOG_TRACE(logger_name) + << "Enter DownloadRequestHandler::handleCallCompleteState"; + + switch (status_.error_code()) { + case grpc::OK: { + CAOSDB_LOG_INFO(logger_name) + << "DownloadRequestHandler finished successfully (" + << file_descriptor_.local_path << "): Download complete, " + << bytesReceived_ << " bytes received."; + } break; + default: { + auto code(static_cast<StatusCode>(status_.error_code())); + std::string description(get_status_description(code) + + " Original message: " + status_.error_message()); + transaction_status = TransactionStatus(code, description); + CAOSDB_LOG_ERROR(logger_name) + << "DownloadRequestHandler finished with an error (" + << file_descriptor_.local_path << "): Download aborted with code " << code + << " - " << description; + } break; + } + + CAOSDB_LOG_TRACE(logger_name) + << "Leave DownloadRequestHandler::handleCallCompleteState"; +} + +} // namespace caosdb::transaction diff --git a/src/caosdb/file_transmission/file_reader.cpp b/src/caosdb/file_transmission/file_reader.cpp new file mode 100644 index 0000000000000000000000000000000000000000..2df58c9accbde99c9bc908ed7f80ceef7685b573 --- /dev/null +++ b/src/caosdb/file_transmission/file_reader.cpp @@ -0,0 +1,91 @@ +/* + * This file is a part of the CaosDB Project. + * Copyright (C) 2021 Timm Fitschen <t.fitschen@indiscale.com> + * Copyright (C) 2021 IndiScale GmbH <info@indiscale.com> + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <https://www.gnu.org/licenses/>. + * + ********************************************************************************* + * + * This is derived work which is heavily based on + * https://github.com/NeiRo21/grpcpp-bidi-streaming, Commit + * cd9cb78e5d6d72806c2ec4c703e5e856b223dc96, Aug 10, 2020 + * + * The orginal work is licensed as + * + * > MIT License + * > + * > Copyright (c) 2019 NeiRo21 + * > + * > Permission is hereby granted, free of charge, to any person obtaining a + * > copy of this software and associated documentation files (the "Software"), + * > to deal in the Software without restriction, including without limitation + * > the rights to use, copy, modify, merge, publish, distribute, sublicense, + * > and/or sell copies of the Software, and to permit persons to whom the + * > Software is furnished to do so, subject to the following conditions: + * > + * > The above copyright notice and this permission notice shall be included in + * > all copies or substantial portions of the Software. + * > + * > THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * > IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * > FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * > AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * > LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * > FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * > DEALINGS IN THE SOFTWARE. + */ +#include "caosdb/file_transmission/file_reader.h" +#include "caosdb/file_transmission/file_error.h" // for FileIOError +#include <boost/filesystem/path.hpp> // for path +#include <utility> // for move + +namespace caosdb::transaction { + +FileReader::FileReader(boost::filesystem::path filename) + : filename_(std::move(filename)), size_(0) { + this->openFile(); +} + +void FileReader::openFile() { + stream_.open(filename_, std::ios::binary | std::ios::ate); + if (!stream_) { + throw FileIOError("Can't open file for reading: " + filename_.string()); + } + + auto size = stream_.tellg(); + stream_.seekg(0); + if (size > 0) { + size_ = static_cast<decltype(size_)>(size); + } +} + +std::size_t FileReader::read(std::string &buffer) { + std::size_t bytesRead = 0; + + if (!stream_.eof()) { + auto bufferSize = buffer.size(); + if (bufferSize > 0) { + if (!stream_.read(&buffer[0], bufferSize)) { + throw FileIOError("Can't read file: " + filename_.string()); + } + + bytesRead = static_cast<std::size_t>(stream_.gcount()); + } + } + + return bytesRead; +} + +} // namespace caosdb::transaction diff --git a/src/caosdb/file_transmission/file_writer.cpp b/src/caosdb/file_transmission/file_writer.cpp new file mode 100644 index 0000000000000000000000000000000000000000..73b604d4255b213d015e010ea987fdb6a9918f6c --- /dev/null +++ b/src/caosdb/file_transmission/file_writer.cpp @@ -0,0 +1,77 @@ +/* + * This file is a part of the CaosDB Project. + * Copyright (C) 2021 Timm Fitschen <t.fitschen@indiscale.com> + * Copyright (C) 2021 IndiScale GmbH <info@indiscale.com> + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <https://www.gnu.org/licenses/>. + * + ********************************************************************************* + * + * This is derived work which is heavily based on + * https://github.com/NeiRo21/grpcpp-bidi-streaming, Commit + * cd9cb78e5d6d72806c2ec4c703e5e856b223dc96, Aug 10, 2020 + * + * The orginal work is licensed as + * + * > MIT License + * > + * > Copyright (c) 2019 NeiRo21 + * > + * > Permission is hereby granted, free of charge, to any person obtaining a + * > copy of this software and associated documentation files (the "Software"), + * > to deal in the Software without restriction, including without limitation + * > the rights to use, copy, modify, merge, publish, distribute, sublicense, + * > and/or sell copies of the Software, and to permit persons to whom the + * > Software is furnished to do so, subject to the following conditions: + * > + * > The above copyright notice and this permission notice shall be included in + * > all copies or substantial portions of the Software. + * > + * > THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * > IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * > FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * > AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * > LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * > FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * > DEALINGS IN THE SOFTWARE. + */ +#include "caosdb/file_transmission/file_writer.h" +#include "caosdb/file_transmission/file_error.h" // for FileIOError +#include <boost/filesystem/path.hpp> // for path +#include <utility> // for move + +namespace caosdb::transaction { + +FileWriter::FileWriter(boost::filesystem::path filename) + : filename_(std::move(filename)) { + this->openFile(); +} + +void FileWriter::openFile() { + stream_.open(filename_, std::ios::binary | std::ios::trunc); + if (!stream_) { + throw FileIOError("Can't open file for writing: " + filename_.string()); + } +} + +void FileWriter::write(const std::string &buffer) { + auto bufferSize = buffer.size(); + if (bufferSize > 0) { + if (!stream_.write(buffer.data(), bufferSize)) { + throw FileIOError("Can't write file: " + filename_.string()); + } + } +} + +} // namespace caosdb::transaction diff --git a/src/caosdb/file_transmission/register_file_upload_handler.cpp b/src/caosdb/file_transmission/register_file_upload_handler.cpp new file mode 100644 index 0000000000000000000000000000000000000000..b2020223f1c79f69e77b1e0950bac1c8f1220986 --- /dev/null +++ b/src/caosdb/file_transmission/register_file_upload_handler.cpp @@ -0,0 +1,84 @@ +/* + * This file is a part of the CaosDB Project. + * Copyright (C) 2021 Timm Fitschen <t.fitschen@indiscale.com> + * Copyright (C) 2021 IndiScale GmbH <info@indiscale.com> + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <https://www.gnu.org/licenses/>. + * + ********************************************************************************* + * + * This is derived work which is heavily based on + * https://github.com/NeiRo21/grpcpp-bidi-streaming, Commit + * cd9cb78e5d6d72806c2ec4c703e5e856b223dc96, Aug 10, 2020 + * + * The orginal work is licensed as + * + * > MIT License + * > + * > Copyright (c) 2019 NeiRo21 + * > + * > Permission is hereby granted, free of charge, to any person obtaining a + * > copy of this software and associated documentation files (the "Software"), + * > to deal in the Software without restriction, including without limitation + * > the rights to use, copy, modify, merge, publish, distribute, sublicense, + * > and/or sell copies of the Software, and to permit persons to whom the + * > Software is furnished to do so, subject to the following conditions: + * > + * > The above copyright notice and this permission notice shall be included in + * > all copies or substantial portions of the Software. + * > + * > THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * > IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * > FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * > AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * > LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * > FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * > DEALINGS IN THE SOFTWARE. + */ +#include "caosdb/file_transmission/register_file_upload_handler.h" +#include "caosdb/logging.h" // for CAOSDB_LOG_TRACE +#include <boost/log/core/record.hpp> // for record +#include <boost/log/sources/record_ostream.hpp> // for basic_record_... +#include <boost/preprocessor/seq/limits/enum_256.hpp> // for BOOST_PP_SEQ_... +#include <boost/preprocessor/seq/limits/size_256.hpp> // for BOOST_PP_SEQ_... +#include <grpcpp/impl/codegen/async_unary_call.h> // for ClientAsyncRes... +#include <grpcpp/impl/codegen/completion_queue.h> // for CompletionQueue + +namespace caosdb::transaction { + +RegisterFileUploadHandler::~RegisterFileUploadHandler() = default; + +RegisterFileUploadHandler::RegisterFileUploadHandler( + HandlerTag tag, FileTransmissionService::Stub *stub, + grpc::CompletionQueue *completion_queue, RegisterFileUploadRequest *request, + RegisterFileUploadResponse *response) + : UnaryRpcHandler(completion_queue), tag_(tag), stub_(stub), + request_(request), response_(response) {} + +void RegisterFileUploadHandler::handleNewCallState() { + CAOSDB_LOG_TRACE(logger_name) + << "Enter RegisterFileUploadHandler::handleNewCallState."; + + rpc_ = stub_->PrepareAsyncRegisterFileUpload(&call_context, *request_, + completion_queue); + + state_ = CallState::CallComplete; + rpc_->StartCall(); + rpc_->Finish(response_, &status_, tag_); + + CAOSDB_LOG_TRACE(logger_name) + << "Leave RegisterFileUploadHandler::handleNewCallState"; +} + +} // namespace caosdb::transaction diff --git a/src/caosdb/file_transmission/upload_request_handler.cpp b/src/caosdb/file_transmission/upload_request_handler.cpp new file mode 100644 index 0000000000000000000000000000000000000000..27926bfaabe4dd18feca80f191ff496573196e52 --- /dev/null +++ b/src/caosdb/file_transmission/upload_request_handler.cpp @@ -0,0 +1,217 @@ +/* + * This file is a part of the CaosDB Project. + * Copyright (C) 2021 Timm Fitschen <t.fitschen@indiscale.com> + * Copyright (C) 2021 IndiScale GmbH <info@indiscale.com> + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <https://www.gnu.org/licenses/>. + * + ********************************************************************************* + * + * This is derived work which is heavily based on + * https://github.com/NeiRo21/grpcpp-bidi-streaming, Commit + * cd9cb78e5d6d72806c2ec4c703e5e856b223dc96, Aug 10, 2020 + * + * The orginal work is licensed as + * + * > MIT License + * > + * > Copyright (c) 2019 NeiRo21 + * > + * > Permission is hereby granted, free of charge, to any person obtaining a + * > copy of this software and associated documentation files (the "Software"), + * > to deal in the Software without restriction, including without limitation + * > the rights to use, copy, modify, merge, publish, distribute, sublicense, + * > and/or sell copies of the Software, and to permit persons to whom the + * > Software is furnished to do so, subject to the following conditions: + * > + * > The above copyright notice and this permission notice shall be included in + * > all copies or substantial portions of the Software. + * > + * > THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * > IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * > FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * > AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * > LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * > FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * > DEALINGS IN THE SOFTWARE. + */ +#include "caosdb/file_transmission/upload_request_handler.h" +#include "caosdb/logging.h" // for CAOSDB_LOG_ERROR +#include "caosdb/protobuf_helper.h" // for get_arena +#include "caosdb/status_code.h" // for GENERIC_RPC_E... +#include "caosdb/transaction_status.h" // for TransactionStatus +#include <algorithm> // for min +#include <boost/filesystem/path.hpp> // for operator<<, path +#include <boost/log/core/record.hpp> // for record +#include <boost/log/detail/attachable_sstream_buf.hpp> // for basic_ostring... +#include <boost/log/sources/record_ostream.hpp> // for basic_record_... +#include <boost/preprocessor/seq/limits/enum_256.hpp> // for BOOST_PP_SEQ_... +#include <boost/preprocessor/seq/limits/size_256.hpp> // for BOOST_PP_SEQ_... +#include <cstdint> // for uint64_t +#include <exception> // IWYU pragma: keep +// IWYU pragma: no_include <bits/exception.h> +#include <google/protobuf/arena.h> // for Arena +#include <grpcpp/impl/codegen/async_stream.h> // for ClientAsyncWr... +#include <grpcpp/impl/codegen/call_op_set.h> // for WriteOptions +#include <grpcpp/impl/codegen/client_context.h> // for ClientContext +#include <grpcpp/impl/codegen/completion_queue.h> // for CompletionQueue +#include <grpcpp/impl/codegen/status.h> // for Status +#include <grpcpp/impl/codegen/status_code_enum.h> // for OK, UNAUTHENT... +#include <iostream> // for endl, streamsize +#include <string> // for basic_string +#include <utility> // for move + +namespace caosdb::transaction { +using caosdb::StatusCode; +using caosdb::utility::get_arena; +using google::protobuf::Arena; + +UploadRequestHandler::UploadRequestHandler(HandlerTag tag, + FileTransmissionService::Stub *stub, + grpc::CompletionQueue *cq, + FileDescriptor file_descriptor) + : tag_(tag), stub_(stub), cq_(cq), + request_(Arena::CreateMessage<FileUploadRequest>(get_arena())), + response_(Arena::CreateMessage<FileUploadResponse>(get_arena())), + state_(CallState::NewCall), file_descriptor_(std::move(file_descriptor)), + bytesToSend_(0) {} + +bool UploadRequestHandler::OnNext(bool ok) { + try { + if (state_ == CallState::CallComplete) { + this->handleCallCompleteState(); + return false; + } else if (ok) { + if (state_ == CallState::NewCall) { + this->handleNewCallState(); + } else if (state_ == CallState::SendingHeader) { + this->handleSendingHeaderState(); + } else if (state_ == CallState::SendingFile) { + this->handleSendingFileState(); + } else if (state_ == CallState::ExpectingResponse) { + this->handleExpectingResponseState(); + } + } else { + state_ = CallState::CallComplete; + rpc_->Finish(&status_, tag_); + } + + return true; + } catch (std::exception &e) { + CAOSDB_LOG_ERROR(logger_name) + << "UploadRequestHandler caught an exception: " << e.what(); + transaction_status = TransactionStatus::GENERIC_ERROR(e.what()); + state_ = CallState::CallComplete; + } catch (...) { + CAOSDB_LOG_ERROR(logger_name) + << "Transaction error: unknown exception caught"; + transaction_status = TransactionStatus::GENERIC_ERROR( + "UploadRequestHandler caught an unknown exception"); + state_ = CallState::CallComplete; + } + + if (state_ == CallState::NewCall) { + return false; + } + + ctx_.TryCancel(); + + return true; +} + +void UploadRequestHandler::Cancel() { ctx_.TryCancel(); } + +void UploadRequestHandler::handleNewCallState() { + auto filename = file_descriptor_.local_path; + fileReader_ = std::make_unique<FileReader>(filename); + + rpc_ = stub_->PrepareAsyncFileUpload(&ctx_, response_, cq_); + + transaction_status = TransactionStatus::EXECUTING(); + state_ = CallState::SendingHeader; + rpc_->StartCall(tag_); +} + +void UploadRequestHandler::handleSendingHeaderState() { + auto *tid = request_->mutable_chunk()->mutable_file_transmission_id(); + tid->CopyFrom(*(file_descriptor_.file_transmission_id)); + + bytesToSend_ = fileReader_->fileSize(); + + if (bytesToSend_ > 0) { + state_ = CallState::SendingFile; + } else { + state_ = CallState::ExpectingResponse; + } + + rpc_->Write(*request_, tag_); +} + +void UploadRequestHandler::handleSendingFileState() { + const uint64_t DefaultChunkSize = 4 * 1024; // 4K + + auto chunkSize = std::min(DefaultChunkSize, bytesToSend_); + + request_->Clear(); + auto *buffer = request_->mutable_chunk()->mutable_data(); + buffer->resize(chunkSize); + + fileReader_->read(*buffer); + bytesToSend_ -= chunkSize; + + grpc::WriteOptions writeOptions; + if (bytesToSend_ > 0) { + state_ = CallState::SendingFile; + } else { + state_ = CallState::ExpectingResponse; + writeOptions.set_last_message(); + } + + rpc_->Write(*request_, writeOptions, tag_); +} + +void UploadRequestHandler::handleExpectingResponseState() { + state_ = CallState::CallComplete; + rpc_->Finish(&status_, tag_); +} + +void UploadRequestHandler::handleCallCompleteState() { + CAOSDB_LOG_TRACE(logger_name) + << "Enter UploadRequestHandler::handleCallCompleteState"; + + switch (status_.error_code()) { + case grpc::OK: { + auto bytesSent = fileReader_ != nullptr ? fileReader_->fileSize() : 0; + CAOSDB_LOG_INFO(logger_name) + << "UploadRequestHandler finished successfully (" + << file_descriptor_.local_path << "): upload complete, " << bytesSent + << " bytes sent"; + } break; + default: { + auto code(static_cast<StatusCode>(status_.error_code())); + std::string description(get_status_description(code) + + " Original message: " + status_.error_message()); + transaction_status = TransactionStatus(code, description); + CAOSDB_LOG_ERROR(logger_name) + << "UploadRequestHandler finished with an error (" + << file_descriptor_.local_path << "): Upload aborted with code " << code + << " - " << description; + } break; + } + + CAOSDB_LOG_TRACE(logger_name) + << "Leave UploadRequestHandler::handleCallCompleteState"; +} + +} // namespace caosdb::transaction diff --git a/src/caosdb/logging.cpp b/src/caosdb/logging.cpp index d8f2de95233dffcbaf4aef2640cbea569f4ba006..5e6cd4e3577b57ec7491a26ae71982667a096609 100644 --- a/src/caosdb/logging.cpp +++ b/src/caosdb/logging.cpp @@ -165,6 +165,7 @@ auto initialize_logging(const LoggingConfiguration &configuration) -> void { off_settings["Core.DisableLogging"] = true; boost::log::init_from_settings(off_settings); + // now set everything up boost::log::settings new_settings; if (configuration.GetLevel() == CAOSDB_LOG_LEVEL_OFF) { diff --git a/src/caosdb/protobuf_helper.cpp b/src/caosdb/protobuf_helper.cpp index a9ad000595285f1c0fb6402182e0d48294daa37d..e8bbd07834ead9b561c7e8769ed834527337f7a6 100644 --- a/src/caosdb/protobuf_helper.cpp +++ b/src/caosdb/protobuf_helper.cpp @@ -19,7 +19,8 @@ * */ #include "caosdb/protobuf_helper.h" -#include <google/protobuf/arena.h> +#include <google/protobuf/arena.h> // for Arena +#include <google/protobuf/extension_set.h> // for Arena namespace caosdb::utility { diff --git a/src/caosdb/transaction.cpp b/src/caosdb/transaction.cpp index a7ddba88dc32543a0daf63ad5b4c199b56d2b919..a46019f3a5b83cc0bb058aa6446100bceac87242 100644 --- a/src/caosdb/transaction.cpp +++ b/src/caosdb/transaction.cpp @@ -18,23 +18,32 @@ * */ #include "caosdb/transaction.h" -#include "caosdb/entity/v1alpha1/main.grpc.pb.h" // for EntityTransactionS... -#include "caosdb/entity/v1alpha1/main.pb.h" // for SingleRetrieveRequest -#include "caosdb/logging.h" -#include "caosdb/protobuf_helper.h" // for get_arena -#include "caosdb/status_code.h" // for StatusCode, AUTHEN... -#include "google/protobuf/arena.h" // for Arena -#include "grpcpp/grpcpp.h" // for CompletionQueue -#include "grpcpp/impl/codegen/async_unary_call.h" // for ClientAsyncRespons... -#include "grpcpp/impl/codegen/client_context.h" // for ClientContext -#include "grpcpp/impl/codegen/completion_queue.h" // for CompletionQueue -#include "grpcpp/impl/codegen/status.h" // for Status -#include "grpcpp/impl/codegen/status_code_enum.h" // for StatusCode, UNAUTH... -#include <cassert> // for assert -#include <map> // for map -#include <memory> // for allocator, unique_ptr +#include "caosdb/entity/v1alpha1/main.grpc.pb.h" // for EntityTransac... +#include "caosdb/entity/v1alpha1/main.pb.h" // for TransactionRe... +#include "caosdb/file_transmission/download_request_handler.h" // Download... +#include "caosdb/file_transmission/file_reader.h" // for path +#include "caosdb/file_transmission/register_file_upload_handler.h" +#include "caosdb/file_transmission/upload_request_handler.h" // Upload... +#include "caosdb/logging.h" // for CAOSDB_LOG_FATAL +#include "caosdb/status_code.h" // for StatusCode +#include "caosdb/transaction_handler.h" +#include <algorithm> // for max +#include <boost/filesystem/path.hpp> // for operator<<, path +#include <boost/log/core/record.hpp> // for record +#include <boost/log/detail/attachable_sstream_buf.hpp> // for basic_ostring... +#include <boost/log/sources/record_ostream.hpp> // for basic_record_... +#include <boost/preprocessor/seq/limits/enum_256.hpp> // for BOOST_PP_SEQ_... +#include <boost/preprocessor/seq/limits/size_256.hpp> // for BOOST_PP_SEQ_... +// IWYU pragma: no_include <bits/exception.h> +#include <exception> // IWYU pragma: keep +#include <google/protobuf/arena.h> // for Arena +#include <grpc/impl/codegen/gpr_types.h> +#include <grpcpp/impl/codegen/completion_queue.h> // for CompletionQueue +#include <iosfwd> // for streamsize +#include <map> // for map, operator!= +#include <memory> // for unique_ptr #include <stdexcept> // for out_of_range -#include <utility> // for move +#include <utility> // for move, pair namespace caosdb { @@ -79,6 +88,14 @@ auto get_status_description(int code) -> const std::string & { "have " "an id. This is the case when you did not retrieve it before applying any " "changes and instantiated the Entity class explicitely."}, + {StatusCode::NOT_A_FILE_ENTITY, "You can only add files to file entities."}, + {StatusCode::PATH_IS_A_DIRECTORY, "The given path is a directory."}, + {StatusCode::FILE_DOES_NOT_EXIST_LOCALLY, + "The file does not not exist in the local file system."}, + {StatusCode::FILE_DOWNLOAD_ERROR, + "The transaction failed during the download of the files"}, + {StatusCode::FILE_UPLOAD_ERROR, + "The transaction failed during the upload of the files"}, {StatusCode::UNSUPPORTED_FEATURE, "This feature is not available in the this client implementation."}, {StatusCode::EXTERN_C_ASSIGNMENT_ERROR, @@ -99,16 +116,17 @@ auto get_status_description(int code) -> const std::string & { namespace caosdb::transaction { using caosdb::entity::v1alpha1::EntityTransactionService; +using caosdb::entity::v1alpha1::FileTransmissionService; using caosdb::entity::v1alpha1::MultiTransactionRequest; using caosdb::entity::v1alpha1::MultiTransactionResponse; -using WrappedResponseCase = - caosdb::entity::v1alpha1::TransactionResponse::WrappedResponseCase; -using QueryResponseCase = - caosdb::entity::v1alpha1::RetrieveResponse::QueryResponseCase; -using caosdb::utility::get_arena; -using grpc::ClientAsyncResponseReader; +using TransactionResponseCase = + caosdb::entity::v1alpha1::TransactionResponse::TransactionResponseCase; +using RetrieveResponseCase = + caosdb::entity::v1alpha1::RetrieveResponse::RetrieveResponseCase; using ProtoEntity = caosdb::entity::v1alpha1::Entity; -using grpc::CompletionQueue; +using google::protobuf::Arena; +using NextStatus = grpc::CompletionQueue::NextStatus; +using RegistrationStatus = caosdb::entity::v1alpha1::RegistrationStatus; ResultSet::iterator::iterator(const ResultSet *result_set_param, int index) : current_index(index), result_set(result_set_param) {} @@ -141,15 +159,15 @@ auto ResultSet::end() const -> ResultSet::iterator { } MultiResultSet::MultiResultSet(std::vector<std::unique_ptr<Entity>> result_set) - : entities(std::move(result_set)) {} + : AbstractMultiResultSet(std::move(result_set)) {} Transaction::Transaction( - std::shared_ptr<EntityTransactionService::Stub> service_stub) - : request(google::protobuf::Arena::CreateMessage<MultiTransactionRequest>( - get_arena())), - response(google::protobuf::Arena::CreateMessage<MultiTransactionResponse>( - get_arena())) { - this->service_stub = std::move(service_stub); + std::shared_ptr<EntityTransactionService::Stub> entity_service, + std::shared_ptr<FileTransmissionService::Stub> file_service) + : request(Arena::CreateMessage<MultiTransactionRequest>(GetArena())), + response(Arena::CreateMessage<MultiTransactionResponse>(GetArena())) { + this->entity_service = std::move(entity_service); + this->file_service = std::move(file_service); this->query_count = -1; } @@ -163,6 +181,21 @@ auto Transaction::RetrieveById(const std::string &id) noexcept -> StatusCode { return this->status.GetCode(); } +auto Transaction::RetrieveAndDownloadFilesById( + const std::string &id, const std::string &local_path) noexcept -> StatusCode { + ASSERT_CAN_ADD_RETRIEVAL + + auto *retrieve_request = + this->request->add_requests()->mutable_retrieve_request(); + retrieve_request->set_id(id); + retrieve_request->set_register_file_download(true); + + download_files[id].local_path = local_path; + + this->status = TransactionStatus::GO_ON(); + return this->status.GetCode(); +} + auto Transaction::Query(const std::string &query) noexcept -> StatusCode { ASSERT_CAN_ADD_QUERY @@ -189,11 +222,18 @@ auto Transaction::DeleteById(const std::string &id) noexcept -> StatusCode { auto Transaction::InsertEntity(Entity *entity) noexcept -> StatusCode { ASSERT_CAN_ADD_INSERTION - auto *sub_request = this->request->add_requests(); - auto *proto_entity = sub_request->mutable_insert_request(); + auto *entity_request = this->request->add_requests() + ->mutable_insert_request() + ->mutable_entity_request(); + auto *proto_entity = entity_request->mutable_entity(); // copy the original entity for the transaction entity->CopyTo(proto_entity); + if (entity->HasFile()) { + auto *file_transmission_id = entity_request->mutable_upload_id(); + entity->SetFileTransmissionId(file_transmission_id); + upload_files.push_back(entity->GetFileDescriptor()); + } this->status = TransactionStatus::READY(); return this->status.GetCode(); } @@ -201,10 +241,17 @@ auto Transaction::InsertEntity(Entity *entity) noexcept -> StatusCode { auto Transaction::UpdateEntity(Entity *entity) noexcept -> StatusCode { ASSERT_CAN_ADD_UPDATE - auto *sub_request = this->request->add_requests(); - auto *proto_entity = sub_request->mutable_update_request(); + auto *entity_request = this->request->add_requests() + ->mutable_update_request() + ->mutable_entity_request(); + auto *proto_entity = entity_request->mutable_entity(); entity->CopyTo(proto_entity); + if (entity->HasFile()) { + auto *file_transmission_id = entity_request->mutable_upload_id(); + entity->SetFileTransmissionId(file_transmission_id); + upload_files.push_back(entity->GetFileDescriptor()); + } this->status = TransactionStatus::READY(); return this->status.GetCode(); } @@ -218,7 +265,8 @@ auto Transaction::Execute() -> TransactionStatus { return status; } -auto Transaction::ExecuteAsynchronously() noexcept -> StatusCode { +// TODO(tf) This has apparently a cognitive complexity of 39>25 (threshold). +auto Transaction::ExecuteAsynchronously() noexcept -> StatusCode { // NOLINT if (!IsStatus(TransactionStatus::READY()) && !IsStatus(TransactionStatus::GO_ON())) { return StatusCode::TRANSACTION_STATUS_ERROR; @@ -241,103 +289,250 @@ auto Transaction::ExecuteAsynchronously() noexcept -> StatusCode { } this->status = TransactionStatus::EXECUTING(); - grpc::Status grpc_status; - CompletionQueue cq; + // upload files first + if (!upload_files.empty()) { + CAOSDB_LOG_INFO(logger_name) + << "Number of files to be uploaded: " << upload_files.size(); + + auto *registration_request = + Arena::CreateMessage<RegisterFileUploadRequest>(GetArena()); + auto *registration_response = + Arena::CreateMessage<RegisterFileUploadResponse>(GetArena()); + + handler_ = std::make_unique<RegisterFileUploadHandler>( + &handler_, file_service.get(), &completion_queue, registration_request, + registration_response); + this->status = ProcessCalls(); + + if (registration_response->status() != + RegistrationStatus::REGISTRATION_STATUS_ACCEPTED) { + this->status = TransactionStatus::FILE_UPLOAD_ERROR(); + return StatusCode::EXECUTING; + } + + for (auto &file_descriptor : upload_files) { + file_descriptor.file_transmission_id->set_registration_id( + registration_response->registration_id()); + CAOSDB_LOG_INFO(logger_name) + << "Uploading " << file_descriptor.local_path; + handler_ = std::make_unique<UploadRequestHandler>( + &handler_, file_service.get(), &completion_queue, file_descriptor); + this->status = ProcessCalls(); + if (this->status.GetCode() != StatusCode::EXECUTING) { + return StatusCode::EXECUTING; + } + } + } - grpc::ClientContext context; - std::unique_ptr<ClientAsyncResponseReader<MultiTransactionResponse>> rpc( - this->service_stub->PrepareAsyncMultiTransaction(&context, *(this->request), - &cq)); - rpc->StartCall(); + handler_ = std::make_unique<EntityTransactionHandler>( + &handler_, entity_service.get(), &completion_queue, request, response); + this->status = ProcessCalls(); + if (this->status.GetCode() != StatusCode::EXECUTING) { + return StatusCode::EXECUTING; + } - int tag = 1; - void *send_tag = static_cast<void *>(&tag); - rpc->Finish(this->response, &grpc_status, send_tag); - void *recv_tag = nullptr; - bool ok = false; + // file download afterwards + if (status.GetCode() == StatusCode::EXECUTING && !download_files.empty()) { + // run over all retrieved entities and get the download_id + for (auto sub_response : *(response->mutable_responses())) { + if (sub_response.transaction_response_case() == + TransactionResponseCase::kRetrieveResponse) { + if (sub_response.retrieve_response() + .entity_response() + .has_download_id()) { + auto *entity_response = + sub_response.mutable_retrieve_response()->mutable_entity_response(); + auto entity_id = entity_response->entity().id(); + download_files[entity_id].file_transmission_id = + entity_response->release_download_id(); + // TODO(tf) handle error + } + } + } - // TODO(tf) make this actually asynchronous by moving this to WaitForIt() - cq.Next(&recv_tag, &ok); - assert(recv_tag == send_tag); - assert(ok); + for (const auto &item : download_files) { + auto file_descriptor(item.second); + CAOSDB_LOG_INFO(logger_name) + << "Downloading " << file_descriptor.local_path; - if (!grpc_status.ok()) { - switch (grpc_status.error_code()) { - case grpc::StatusCode::UNAUTHENTICATED: - this->status = TransactionStatus::AUTHENTICATION_ERROR(); - break; - case grpc::StatusCode::UNAVAILABLE: - this->status = TransactionStatus::CONNECTION_ERROR(); - break; - default: - auto error_details = std::to_string(grpc_status.error_code()) + " - " + - grpc_status.error_message(); - this->status = TransactionStatus::RPC_ERROR(error_details); + handler_ = std::make_unique<DownloadRequestHandler>( + &handler_, file_service.get(), &completion_queue, file_descriptor); + this->status = ProcessCalls(); + if (this->status.GetCode() != StatusCode::EXECUTING) { + return StatusCode::EXECUTING; + } } - } else { - this->status = TransactionStatus::SUCCESS(); } return StatusCode::EXECUTING; } -auto Transaction::WaitForIt() const noexcept -> TransactionStatus { +// TODO(tf) This has apparently a cognitive complexity of 36>25 (threshold). +auto Transaction::WaitForIt() const noexcept -> TransactionStatus { // NOLINT + if (this->status.GetCode() != StatusCode::EXECUTING) { + return this->status; + } + this->status = TransactionStatus::SUCCESS(); + bool set_error = false; auto *responses = this->response->mutable_responses(); std::vector<std::unique_ptr<Entity>> entities; - for (auto sub_response : *responses) { + for (auto &sub_response : *responses) { std::unique_ptr<Entity> result; - switch (sub_response.wrapped_response_case()) { + switch (sub_response.transaction_response_case()) { - case WrappedResponseCase::kRetrieveResponse: { + case TransactionResponseCase::kRetrieveResponse: { auto *retrieve_response = sub_response.mutable_retrieve_response(); - switch (retrieve_response->query_response_case()) { - case QueryResponseCase::kEntity: { - auto *retrieve_entity_response = retrieve_response->release_entity(); + switch (retrieve_response->retrieve_response_case()) { + case RetrieveResponseCase::kEntityResponse: { + auto *retrieve_entity_response = + retrieve_response->release_entity_response(); result = std::make_unique<Entity>(retrieve_entity_response); } break; - case QueryResponseCase::kSelectResult: { + case RetrieveResponseCase::kSelectResult: { + CAOSDB_LOG_ERROR(logger_name) << "Results of a SELECT query cannot be " + "processed by this client yet."; // TODO(tf) Select queries } break; - case QueryResponseCase::kCountResult: { + case RetrieveResponseCase::kCountResult: { this->query_count = retrieve_response->count_result(); } break; + case RetrieveResponseCase::kFindResult: { + std::unique_ptr<Entity> find_result; + for (auto &entity_response : + *retrieve_response->mutable_find_result()->mutable_result_set()) { + find_result = std::make_unique<Entity>(&entity_response); + if (find_result->HasErrors()) { + set_error = true; + } + entities.push_back(std::move(find_result)); + } + } break; default: - // TODO(tf) Error + CAOSDB_LOG_FATAL(logger_name) << "Received invalid QueryResponseCase."; break; } - break; + break; // break TransactionResponseCase::kRetrieveResponse } - case WrappedResponseCase::kInsertResponse: { - auto *inserted_id_response = sub_response.release_insert_response(); + case TransactionResponseCase::kInsertResponse: { + auto *inserted_id_response = + sub_response.mutable_insert_response()->release_id_response(); result = std::make_unique<Entity>(inserted_id_response); break; } - case WrappedResponseCase::kDeleteResponse: { - auto *deleted_id_response = sub_response.release_delete_response(); + case TransactionResponseCase::kDeleteResponse: { + auto *deleted_id_response = + sub_response.mutable_delete_response()->release_id_response(); result = std::make_unique<Entity>(deleted_id_response); break; } - case WrappedResponseCase::kUpdateResponse: { - auto *updated_id_response = sub_response.release_update_response(); + case TransactionResponseCase::kUpdateResponse: { + auto *updated_id_response = + sub_response.mutable_update_response()->release_id_response(); result = std::make_unique<Entity>(updated_id_response); break; } default: - // TODO(tf) Error + CAOSDB_LOG_FATAL(logger_name) + << "Received invalid TransactionResponseCase."; break; } - if (result->HasErrors()) { - this->status = TransactionStatus::TRANSACTION_ERROR( - "The request terminated with errors."); + if (result != nullptr) { + if (result->HasErrors()) { + set_error = true; + } + entities.push_back(std::move(result)); + } + } + + // copy local path of downloaded files into the entities file descriptor + for (auto &entity : entities) { + auto id = entity->GetId(); + if (!id.empty() && download_files.count(id) == 1) { + const auto &local_path = download_files.at(id).local_path; + entity->SetLocalPath(local_path); } - entities.push_back(std::move(result)); } this->result_set = std::make_unique<MultiResultSet>(std::move(entities)); - //} + + if (set_error) { + this->status = TransactionStatus::TRANSACTION_ERROR( + "The request terminated with errors."); + } return this->status; } +auto Transaction::ProcessCalls() -> TransactionStatus { + gpr_timespec deadline; + deadline.tv_sec = 1; + deadline.tv_nsec = 0; + deadline.clock_type = gpr_clock_type::GPR_TIMESPAN; + + TransactionStatus result = TransactionStatus::EXECUTING(); + handler_->Start(); + void *tag = nullptr; + bool ok = false; + while (true) { + switch (completion_queue.AsyncNext(&tag, &ok, deadline)) { + case NextStatus::GOT_EVENT: { + if (tag != nullptr) { + auto res = handler_->OnNext(ok); + if (!res) { + // The handler has finished it's work + result = handler_->GetStatus(); + handler_.reset(); + return result; + } + } else { + std::string description("Invalid tag delivered by notification queue."); + CAOSDB_LOG_ERROR(logger_name) << description; + handler_.reset(); + return TransactionStatus::RPC_ERROR(description); + } + } break; + case NextStatus::SHUTDOWN: { + CAOSDB_LOG_ERROR(logger_name) + << "Notification queue has been shut down unexpectedly."; + result = handler_->GetStatus(); + handler_.reset(); + return result; + } break; + case NextStatus::TIMEOUT: { + CAOSDB_LOG_DEBUG(logger_name) << "Timeout, waiting..."; + } break; + default: + CAOSDB_LOG_FATAL(logger_name) + << "Got an invalid NextStatus from CompletionQueue."; + result = handler_->GetStatus(); + handler_.reset(); + return result; + } + } + result = handler_->GetStatus(); + handler_.reset(); + return result; +} + +Transaction::~Transaction() { + this->Cancel(); + + completion_queue.Shutdown(); + + // drain the queue + void *ignoredTag = nullptr; + bool ok = false; + while (completion_queue.Next(&ignoredTag, &ok)) { + ; + } +} + +void Transaction::Cancel() { + // TODO(tf) State Canceled + if (handler_ != nullptr) { + handler_->Cancel(); + } +} + } // namespace caosdb::transaction diff --git a/src/caosdb/transaction_handler.cpp b/src/caosdb/transaction_handler.cpp new file mode 100644 index 0000000000000000000000000000000000000000..cf020af35bf56b2e68da11a92b0961995397f2fd --- /dev/null +++ b/src/caosdb/transaction_handler.cpp @@ -0,0 +1,40 @@ +#include "caosdb/transaction_handler.h" +#include "caosdb/logging.h" // for CAOSDB_LOG_TRACE +#include <boost/log/core/record.hpp> // for record +#include <boost/log/detail/attachable_sstream_buf.hpp> // for basic_ostring... +#include <boost/log/sources/record_ostream.hpp> // for basic_record_... +#include <boost/preprocessor/seq/limits/enum_256.hpp> // for BOOST_PP_SEQ_... +#include <boost/preprocessor/seq/limits/size_256.hpp> // for BOOST_PP_SEQ_... +#include <exception> // IWYU pragma: keep +// IWYU pragma: no_include <bits/exception.h> +#include <grpcpp/impl/codegen/async_unary_call.h> // for ClientAsyncRes... +#include <grpcpp/impl/codegen/completion_queue.h> // for CompletionQueue +#include <iosfwd> // for streamsize + +namespace caosdb::transaction { + +EntityTransactionHandler::EntityTransactionHandler( + HandlerTag tag, EntityTransactionService::Stub *stub, + grpc::CompletionQueue *completion_queue, MultiTransactionRequest *request, + MultiTransactionResponse *response) + : UnaryRpcHandler(completion_queue), tag_(tag), stub_(stub), + request_(request), response_(response) {} + +void EntityTransactionHandler::handleNewCallState() { + CAOSDB_LOG_TRACE(logger_name) + << "Enter EntityTransactionHandler::handleNewCallState with " + "CompletionQueue " + << completion_queue; + + rpc_ = stub_->PrepareAsyncMultiTransaction(&call_context, *request_, + completion_queue); + + state_ = CallState::CallComplete; + rpc_->StartCall(); + rpc_->Finish(response_, &status_, tag_); + + CAOSDB_LOG_TRACE(logger_name) + << "Leave EntityTransactionHandler::handleNewCallState"; +} + +} // namespace caosdb::transaction diff --git a/src/caosdb/unary_rpc_handler.cpp b/src/caosdb/unary_rpc_handler.cpp new file mode 100644 index 0000000000000000000000000000000000000000..4d1e1cf618ef119278b02fdc3657d17482a87601 --- /dev/null +++ b/src/caosdb/unary_rpc_handler.cpp @@ -0,0 +1,135 @@ +/* + * This file is a part of the CaosDB Project. + * Copyright (C) 2021 Timm Fitschen <t.fitschen@indiscale.com> + * Copyright (C) 2021 IndiScale GmbH <info@indiscale.com> + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <https://www.gnu.org/licenses/>. + * + ********************************************************************************* + * + * This is derived work which is heavily based on + * https://github.com/NeiRo21/grpcpp-bidi-streaming, Commit + * cd9cb78e5d6d72806c2ec4c703e5e856b223dc96, Aug 10, 2020 + * + * The orginal work is licensed as + * + * > MIT License + * > + * > Copyright (c) 2019 NeiRo21 + * > + * > Permission is hereby granted, free of charge, to any person obtaining a + * > copy of this software and associated documentation files (the "Software"), + * > to deal in the Software without restriction, including without limitation + * > the rights to use, copy, modify, merge, publish, distribute, sublicense, + * > and/or sell copies of the Software, and to permit persons to whom the + * > Software is furnished to do so, subject to the following conditions: + * > + * > The above copyright notice and this permission notice shall be included in + * > all copies or substantial portions of the Software. + * > + * > THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * > IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * > FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * > AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * > LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * > FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * > DEALINGS IN THE SOFTWARE. + */ +#include "caosdb/unary_rpc_handler.h" +#include "caosdb/logging.h" // for CAOSDB_LOG_TRACE +#include "caosdb/status_code.h" // for GENERIC_RPC_E... +#include <boost/log/core/record.hpp> // for record +#include <boost/log/detail/attachable_sstream_buf.hpp> // for basic_ostring... +#include <boost/log/sources/record_ostream.hpp> // for basic_record_... +#include <boost/preprocessor/seq/limits/enum_256.hpp> // for BOOST_PP_SEQ_... +#include <boost/preprocessor/seq/limits/size_256.hpp> // for BOOST_PP_SEQ_... +// IWYU pragma: no_include <bits/exception.h> +#include <exception> // IWYU pragma: keep +#include <grpcpp/impl/codegen/client_context.h> // for ClientContext +#include <grpcpp/impl/codegen/status.h> // for Status +#include <grpcpp/impl/codegen/status_code_enum.h> // for OK, UNAUTHENT... +#include <iosfwd> // for streamsize +#include <string> // for string, opera... + +namespace caosdb::transaction { + +bool UnaryRpcHandler::OnNext(bool ok) { + try { + if (ok) { + if (state_ == CallState::NewCall) { + this->handleNewCallState(); + } else if (state_ == CallState::CallComplete) { + this->handleCallCompleteState(); + return false; + } + } else { + CAOSDB_LOG_ERROR(logger_name) + << "UnaryRpcHandler::OnNext(false)!. This should not happen."; + // TODO(tf) Handle this error: + // in CallComplete state: "Client-side Finish: ok should always be true" + // in ReceivingFile state: "ok indicates that the RPC is going to go to + // the wire. If it is false, it not going to the wire. This would happen + // if the channel is either permanently broken or transiently broken but + // with the fail-fast option. (Note that async unary RPCs don't post a CQ + // tag at this point, nor do client-streaming or bidi-streaming RPCs that + // have the initial metadata corked option set.)" + } + + return true; + } catch (std::exception &e) { + CAOSDB_LOG_ERROR(logger_name) + << "UnaryRpcHandler caught an exception: " << e.what(); + transaction_status = TransactionStatus::GENERIC_ERROR(e.what()); + state_ = CallState::CallComplete; + } catch (...) { + CAOSDB_LOG_ERROR(logger_name) + << "Transaction error: unknown exception caught"; + transaction_status = TransactionStatus::GENERIC_ERROR( + "UnaryRpcHandler caught an unknown exception"); + state_ = CallState::CallComplete; + } + + if (state_ != CallState::NewCall) { + call_context.TryCancel(); + } + + return false; +} + +void UnaryRpcHandler::Cancel() { call_context.TryCancel(); } + +void UnaryRpcHandler::handleCallCompleteState() { + CAOSDB_LOG_TRACE(logger_name) + << "Enter UnaryRpcHandler::handleCallCompleteState"; + + switch (status_.error_code()) { + case grpc::OK: + CAOSDB_LOG_INFO(logger_name) << "UnaryRpcHandler finished successfully."; + break; + default: + auto code(static_cast<StatusCode>(status_.error_code())); + std::string description(get_status_description(code) + + " Original message: " + status_.error_message()); + transaction_status = TransactionStatus(code, description); + CAOSDB_LOG_ERROR(logger_name) + << "UnaryRpcHandler finished with an error (Code " << code + << "): " << description; + break; + } + + CAOSDB_LOG_TRACE(logger_name) + << "Leave UnaryRpcHandler::handleCallCompleteState"; +} + +} // namespace caosdb::transaction diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 389d5d919f8200a77ea1ffb93267c11ac70baf75..0ecb50169e94b5f758a429054c80c80e9d420c4d 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -23,6 +23,7 @@ set(test_cases test_configuration test_connection test_entity + test_file_transmission test_info test_protobuf test_transaction @@ -36,7 +37,7 @@ set(test_cases # special linting for tests set(_CMAKE_CXX_CLANG_TIDY_TEST_CHECKS - "${_CMAKE_CXX_CLANG_TIDY_CHECKS},-cert-err58-cpp,-cppcoreguidelines-avoid-non-const-global-variables,-cppcoreguidelines-owning-memory,-modernize-use-trailing-return-type,-google-readability-avoid-underscore-in-googletest-name,-cppcoreguidelines-avoid-magic-numbers,-readability-magic-numbers,-cppcoreguidelines-avoid-goto,-hicpp-avoid-goto,-readability-function-cognitive-complexity" + "${_CMAKE_CXX_CLANG_TIDY_CHECKS},-cert-err58-cpp,-cppcoreguidelines-avoid-non-const-global-variables,-cppcoreguidelines-owning-memory,-modernize-use-trailing-return-type,-google-readability-avoid-underscore-in-googletest-name,-cppcoreguidelines-avoid-magic-numbers,-readability-magic-numbers,-cppcoreguidelines-avoid-goto,-hicpp-avoid-goto,-readability-function-cognitive-complexity,-cppcoreguidelines-non-private-member-variables-in-classes,-misc-non-private-member-variables-in-classes" ) # add special cmake functions for gtest diff --git a/test/caosdb_test_utility.h.in b/test/caosdb_test_utility.h.in index 24d088bb9ff8a0c82d117da35241f1d8acc18012..baac638f5668a05c6464fef99410bb345ba8b3c3 100644 --- a/test/caosdb_test_utility.h.in +++ b/test/caosdb_test_utility.h.in @@ -22,6 +22,7 @@ #ifndef CAOSDB_TEST_UTILITY_H #define CAOSDB_TEST_UTILITY_H +#include <string> /** * @file caosdb_test_utility.h * @brief Utility for the unit tests diff --git a/test/test_entity.cpp b/test/test_entity.cpp index 4ea5365250a3b7c8979ae0faebba73ae87f9044d..49b3c7c49f41efbc33865c56362da084e8ede297 100644 --- a/test/test_entity.cpp +++ b/test/test_entity.cpp @@ -20,19 +20,21 @@ * along with this program. If not, see <https://www.gnu.org/licenses/>. * */ +#include "caosdb_test_utility.h" #include "caosdb/entity.h" // for Entity, Parent, Par... #include "caosdb/entity/v1alpha1/main.grpc.pb.h" // for EntityTransactionSe... #include "caosdb/entity/v1alpha1/main.pb.h" // for IdResponse, Message #include "caosdb/message_code.h" // for MessageCode #include "caosdb/protobuf_helper.h" // for get_arena +#include "caosdb/status_code.h" // for StatusCode, FILE_DO... #include "caosdb/transaction.h" // for Transaction -#include "google/protobuf/arena.h" // for Arena -#include "gtest/gtest-message.h" // for Message -#include "gtest/gtest-test-part.h" // for TestPartResult, Sui... -#include "gtest/gtest_pred_impl.h" // for Test, EXPECT_EQ +#include <google/protobuf/arena.h> // for Arena +#include <gtest/gtest-message.h> // for Message +#include <gtest/gtest-test-part.h> // for TestPartResult, Sui... +#include <gtest/gtest_pred_impl.h> // for Test, EXPECT_EQ #include <iostream> -#include <memory> // for allocator, shared_ptr -#include <string> +#include <memory> // for allocator, shared_ptr +#include <string> // for operator+, string namespace caosdb::entity { using caosdb::entity::v1alpha1::IdResponse; @@ -54,11 +56,11 @@ TEST(test_entity, test_append_parent) { parent.SetId("some-id"); auto entity = Entity(); - EXPECT_EQ(entity.GetParents().Size(), 0); + EXPECT_EQ(entity.GetParents().size(), 0); entity.AppendParent(parent); - EXPECT_EQ(entity.GetParents().Size(), 1); + EXPECT_EQ(entity.GetParents().size(), 1); - auto same_parent = entity.GetParents().At(0); + auto same_parent = entity.GetParents().at(0); EXPECT_EQ(same_parent.GetId(), "some-id"); } @@ -90,11 +92,11 @@ TEST(test_entity, test_append_property) { prop.SetUnit("prop_unit"); prop.SetDatatype("prop_dtype"); - EXPECT_EQ(entity.GetProperties().Size(), 0); + EXPECT_EQ(entity.GetProperties().size(), 0); entity.AppendProperty(prop); - EXPECT_EQ(entity.GetProperties().Size(), 1); + EXPECT_EQ(entity.GetProperties().size(), 1); - auto same_prop = entity.GetProperties().At(0); + auto same_prop = entity.GetProperties().at(0); EXPECT_EQ(prop.GetName(), same_prop.GetName()); EXPECT_EQ(prop.GetId(), same_prop.GetId()); @@ -128,15 +130,16 @@ TEST(test_entity, test_copy_to) { EXPECT_EQ(entity.GetRole(), copied.GetRole()); EXPECT_EQ(entity.GetName(), copied.GetName()); - EXPECT_EQ(copied.GetParents().At(0).GetId(), parent.GetId()); - EXPECT_EQ(copied.GetParents().At(0).GetName(), parent.GetName()); - EXPECT_EQ(copied.GetProperties().At(0).GetId(), prop.GetId()); - EXPECT_EQ(copied.GetProperties().At(0).GetName(), prop.GetName()); + EXPECT_EQ(copied.GetParents().at(0).GetId(), parent.GetId()); + EXPECT_EQ(copied.GetParents().at(0).GetName(), parent.GetName()); + EXPECT_EQ(copied.GetProperties().at(0).GetId(), prop.GetId()); + EXPECT_EQ(copied.GetProperties().at(0).GetName(), prop.GetName()); } TEST(test_entity, test_insert_entity) { auto transaction = caosdb::transaction::Transaction( - std::shared_ptr<transaction::EntityTransactionService::Stub>(nullptr)); + std::shared_ptr<transaction::EntityTransactionService::Stub>(nullptr), + std::shared_ptr<transaction::FileTransmissionService::Stub>(nullptr)); auto entity = Entity(); entity.SetRole("entity_role"); @@ -153,7 +156,8 @@ TEST(test_entity, test_insert_entity) { TEST(test_entity, test_insert_with_role) { auto transaction = caosdb::transaction::Transaction( - std::shared_ptr<transaction::EntityTransactionService::Stub>(nullptr)); + std::shared_ptr<transaction::EntityTransactionService::Stub>(nullptr), + std::shared_ptr<transaction::FileTransmissionService::Stub>(nullptr)); auto entity = Entity(); entity.SetRole("Property"); @@ -173,7 +177,8 @@ TEST(test_entity, test_insert_with_role) { TEST(test_entity, test_insert_with_parent) { auto transaction = caosdb::transaction::Transaction( - std::shared_ptr<transaction::EntityTransactionService::Stub>(nullptr)); + std::shared_ptr<transaction::EntityTransactionService::Stub>(nullptr), + std::shared_ptr<transaction::FileTransmissionService::Stub>(nullptr)); auto entity = Entity(); entity.SetName("entity_name"); @@ -189,15 +194,16 @@ TEST(test_entity, test_insert_with_parent) { transaction.InsertEntity(&entity); EXPECT_EQ(entity.GetName(), "entity_name"); - EXPECT_EQ(entity.GetParents().Size(), 1); - auto inserted_parent = entity.GetParents().At(0); + EXPECT_EQ(entity.GetParents().size(), 1); + auto inserted_parent = entity.GetParents().at(0); EXPECT_EQ(inserted_parent.GetId(), parent.GetId()); EXPECT_EQ(inserted_parent.GetName(), parent.GetName()); } TEST(test_entity, test_insert_with_property) { auto transaction = caosdb::transaction::Transaction( - std::shared_ptr<transaction::EntityTransactionService::Stub>(nullptr)); + std::shared_ptr<transaction::EntityTransactionService::Stub>(nullptr), + std::shared_ptr<transaction::FileTransmissionService::Stub>(nullptr)); auto entity = Entity(); entity.SetName("entity_name"); @@ -214,9 +220,9 @@ TEST(test_entity, test_insert_with_property) { transaction.InsertEntity(&entity); - EXPECT_EQ(entity.GetProperties().Size(), 1); + EXPECT_EQ(entity.GetProperties().size(), 1); - auto inserted_prop = entity.GetProperties().At(0); + auto inserted_prop = entity.GetProperties().at(0); EXPECT_EQ(prop.GetName(), inserted_prop.GetName()); EXPECT_EQ(prop.GetId(), inserted_prop.GetId()); @@ -229,7 +235,7 @@ TEST(test_entity, test_insert_with_property) { TEST(test_entity, test_from_id_response) { IdResponse idResponse; idResponse.set_id("entity_id"); - auto *error = idResponse.add_entity_errors(); + auto *error = idResponse.add_errors(); error->set_code(MessageCode::ENTITY_DOES_NOT_EXIST); error->set_description("error_desc"); @@ -237,31 +243,49 @@ TEST(test_entity, test_from_id_response) { EXPECT_EQ(entity.GetId(), "entity_id"); EXPECT_TRUE(entity.HasErrors()); - EXPECT_EQ(entity.GetErrors().Size(), 1); - EXPECT_EQ(entity.GetErrors().At(0).GetDescription(), "error_desc"); - EXPECT_EQ(entity.GetErrors().At(0).GetCode(), + EXPECT_EQ(entity.GetErrors().size(), 1); + EXPECT_EQ(entity.GetErrors().at(0).GetDescription(), "error_desc"); + EXPECT_EQ(entity.GetErrors().at(0).GetCode(), MessageCode::ENTITY_DOES_NOT_EXIST); IdResponse idr_warnings_and_infos; idr_warnings_and_infos.set_id("other_entity_id"); - auto *warning = idr_warnings_and_infos.add_entity_warnings(); + auto *warning = idr_warnings_and_infos.add_warnings(); warning->set_description("warning_desc"); warning->set_code(MessageCode::ENTITY_HAS_NO_PROPERTIES); - auto *info = idr_warnings_and_infos.add_entity_infos(); + auto *info = idr_warnings_and_infos.add_infos(); info->set_description("info_desc"); info->set_code(MessageCode::UNSPECIFIED); Entity other_ent(&idr_warnings_and_infos); EXPECT_EQ(other_ent.GetId(), "other_entity_id"); - EXPECT_EQ(other_ent.GetWarnings().Size(), 1); + EXPECT_EQ(other_ent.GetWarnings().size(), 1); EXPECT_TRUE(other_ent.HasWarnings()); - EXPECT_EQ(other_ent.GetWarnings().At(0).GetDescription(), "warning_desc"); - EXPECT_EQ(other_ent.GetWarnings().At(0).GetCode(), + EXPECT_EQ(other_ent.GetWarnings().at(0).GetDescription(), "warning_desc"); + EXPECT_EQ(other_ent.GetWarnings().at(0).GetCode(), MessageCode::ENTITY_HAS_NO_PROPERTIES); - EXPECT_EQ(other_ent.GetInfos().Size(), 1); - EXPECT_EQ(other_ent.GetInfos().At(0).GetDescription(), "info_desc"); - EXPECT_EQ(other_ent.GetInfos().At(0).GetCode(), MessageCode::UNSPECIFIED); + EXPECT_EQ(other_ent.GetInfos().size(), 1); + EXPECT_EQ(other_ent.GetInfos().at(0).GetDescription(), "info_desc"); + EXPECT_EQ(other_ent.GetInfos().at(0).GetCode(), MessageCode::UNSPECIFIED); +} + +TEST(test_entity, test_add_file_to_non_file_entity) { + Entity entity; + EXPECT_EQ(entity.SetLocalPath("local/path"), StatusCode::NOT_A_FILE_ENTITY); +} + +TEST(test_entity, test_add_non_existing_file) { + Entity entity; + entity.SetRole("File"); + EXPECT_EQ(entity.SetLocalPath("non-existing/path"), + StatusCode::FILE_DOES_NOT_EXIST_LOCALLY); +} + +TEST(test_entity, test_add_directory_path) { + Entity entity; + entity.SetRole("File"); + EXPECT_EQ(entity.SetLocalPath("./"), StatusCode::PATH_IS_A_DIRECTORY); } TEST(test_entity, test_remove_property) { @@ -389,4 +413,11 @@ TEST(test_entity, test_description) { EXPECT_EQ(property.GetDescription(), "desc property"); EXPECT_EQ(parent.GetDescription(), "desc parent"); } + +TEST(test_entity, test_add_file) { + Entity entity; + entity.SetRole("File"); + EXPECT_EQ(entity.SetLocalPath(TEST_DATA_DIR + "/test.json"), + StatusCode::SUCCESS); +} } // namespace caosdb::entity diff --git a/test/test_file_transmission.cpp b/test/test_file_transmission.cpp new file mode 100644 index 0000000000000000000000000000000000000000..801b89db3a07f5b96e530cfbdfc8335c892a1331 --- /dev/null +++ b/test/test_file_transmission.cpp @@ -0,0 +1,63 @@ +/* + * This file is a part of the CaosDB Project. + * + * Copyright (C) 2021 Timm Fitschen <t.fitschen@indiscale.com> + * Copyright (C) 2021 IndiScale GmbH <info@indiscale.com> + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <https://www.gnu.org/licenses/>. + */ +#include "caosdb/file_transmission/file_writer.h" +#include "caosdb/file_transmission/file_reader.h" +#include <boost/filesystem/operations.hpp> // for exists, file_size, remove +#include <boost/filesystem/path.hpp> // for path +#include <boost/filesystem/path_traits.hpp> // for filesystem +#include <gtest/gtest-message.h> // for Message +#include <gtest/gtest-test-part.h> // for TestPartResult, SuiteApiResolver +#include <gtest/gtest_pred_impl.h> // for Test, EXPECT_EQ, AssertionResult +#include <string> // for string + +namespace fs = boost::filesystem; + +namespace caosdb::transaction { + +class test_file_transmission : public ::testing::Test { +protected: + fs::path test_file_name; + + void SetUp() override { + test_file_name = fs::path("this_is_a_test_file_remove_me.dat"); + } + + void TearDown() override { fs::remove(test_file_name); } +}; + +TEST_F(test_file_transmission, test_file_writer_reader) { + ASSERT_FALSE(fs::exists(test_file_name)); + + FileWriter writer(test_file_name); + std::string buffer_out(1024, 'c'); + for (int i = 0; i < 8; i++) { + writer.write(buffer_out); + EXPECT_EQ(fs::file_size(test_file_name), 1024 * (i + 1)); + } + + FileReader reader(test_file_name); + std::string buffer_in(1024, '\0'); + for (int i = 0; i < 8; i++) { + reader.read(buffer_in); + EXPECT_EQ(buffer_in, std::string(1024, 'c')); + } +} + +} // namespace caosdb::transaction diff --git a/test/test_transaction.cpp b/test/test_transaction.cpp index d50ada35a4093f13c77453fdc5bfd4a30d265139..69dac411878e67cd2351683969ffa5f5ca23f1e9 100644 --- a/test/test_transaction.cpp +++ b/test/test_transaction.cpp @@ -24,11 +24,12 @@ #include "caosdb/exceptions.h" // for ConnectionError #include "caosdb/status_code.h" #include "caosdb/transaction.h" // for Transaction +#include "caosdb/transaction_handler.h" // for MultiTransactionResponse #include "caosdb/transaction_status.h" // for ConnectionError #include "caosdb_test_utility.h" // for EXPECT_THROW_MESSAGE -#include "gtest/gtest-message.h" // for Message -#include "gtest/gtest-test-part.h" // for SuiteApiResolver, TestPa... -#include "gtest/gtest_pred_impl.h" // for Test, TestInfo, TEST +#include <gtest/gtest-message.h> // for Message +#include <gtest/gtest-test-part.h> // for SuiteApiResolver, TestPa... +#include <gtest/gtest_pred_impl.h> // for Test, TestInfo, TEST #include <memory> // for allocator, unique_ptr #include <stdexcept> // for out_of_range #include <string> // for string, basic_string @@ -43,6 +44,20 @@ using caosdb::exceptions::ConnectionError; using ProtoEntity = caosdb::entity::v1alpha1::Entity; using caosdb::entity::v1alpha1::RetrieveResponse; +TEST(test_transaction, create_transaction) { + const auto *host = "localhost"; + auto configuration = InsecureConnectionConfiguration(host, 8000); + Connection connection(configuration); + auto transaction = connection.CreateTransaction(); + + ASSERT_EQ(StatusCode::GO_ON, transaction->RetrieveById("100")); + EXPECT_THROW_MESSAGE( + transaction->Execute(), ConnectionError, + "The attempt to execute this transaction was not successful because the " + "connection to the server could not be established. " + "Original message: failed to connect to all addresses"); +} + TEST(test_transaction, test_multi_result_set) { std::vector<std::unique_ptr<Entity>> entities; for (int i = 0; i < 5; i++) { @@ -52,13 +67,10 @@ TEST(test_transaction, test_multi_result_set) { MultiResultSet result_set(std::move(entities)); EXPECT_EQ(result_set.size(), 5); - EXPECT_EQ(result_set.mutable_at(2)->GetName(), "E2"); - EXPECT_EQ(result_set.at(3).GetName(), "E3"); - EXPECT_EQ(result_set.at(3).GetName(), "E3"); - - // discarding a [[nodiscard]] return value here (ok, we're only - // interested in the error anyway), so ignore when linting - EXPECT_THROW(result_set.at(15), std::out_of_range); // NOLINT + EXPECT_EQ(result_set.mutable_at(3)->GetName(), "E3"); + EXPECT_EQ(result_set.at(4).GetName(), "E4"); + EXPECT_EQ(result_set.at(4).GetName(), "E4"); + EXPECT_THROW(auto &e = result_set.at(15), std::out_of_range); int counter = 0; for (const auto &entity : result_set) { @@ -67,19 +79,6 @@ TEST(test_transaction, test_multi_result_set) { EXPECT_EQ(counter, 5); } -TEST(test_transaction, create_transaction) { - const auto *host = "localhost"; - auto configuration = InsecureConnectionConfiguration(host, 8000); - Connection connection(configuration); - auto transaction = connection.CreateTransaction(); - - ASSERT_EQ(StatusCode::GO_ON, transaction->RetrieveById("100")); - EXPECT_THROW_MESSAGE( - transaction->Execute(), ConnectionError, - "The attempt to execute this transaction was not successful because the " - "connection to the server could not be established."); -} - TEST(test_transaction, test_unavailable) { const auto *host = "localhost"; auto configuration = InsecureConnectionConfiguration(host, 8000); @@ -115,8 +114,9 @@ TEST(test_transaction, test_multi_result_set_empty) { TEST(test_transaction, test_multi_result_iterator) { std::vector<std::unique_ptr<Entity>> one_elem; RetrieveResponse response; - response.mutable_entity()->set_id("100"); - one_elem.push_back(std::make_unique<Entity>(response.release_entity())); + response.mutable_entity_response()->mutable_entity()->set_id("100"); + one_elem.push_back( + std::make_unique<Entity>(response.release_entity_response())); MultiResultSet rs(std::move(one_elem)); EXPECT_EQ(rs.size(), 1); @@ -129,8 +129,9 @@ TEST(test_transaction, test_multi_result_iterator) { TEST(test_transaction, test_multi_result_set_one) { std::vector<std::unique_ptr<Entity>> one_elem; RetrieveResponse response; - response.mutable_entity()->set_id("100"); - one_elem.push_back(std::make_unique<Entity>(response.release_entity())); + response.mutable_entity_response()->mutable_entity()->set_id("100"); + one_elem.push_back( + std::make_unique<Entity>(response.release_entity_response())); MultiResultSet rs(std::move(one_elem)); EXPECT_EQ(rs.size(), 1); @@ -143,14 +144,17 @@ TEST(test_transaction, test_multi_result_set_three) { MultiTransactionResponse response; response.add_responses() ->mutable_retrieve_response() + ->mutable_entity_response() ->mutable_entity() ->set_id("100"); - auto *entity_with_error = - response.add_responses()->mutable_retrieve_response()->mutable_entity(); - entity_with_error->set_id("101"); + auto *entity_with_error = response.add_responses() + ->mutable_retrieve_response() + ->mutable_entity_response(); + entity_with_error->mutable_entity()->set_id("101"); entity_with_error->add_errors()->set_code(1); response.add_responses() ->mutable_retrieve_response() + ->mutable_entity_response() ->mutable_entity() ->set_id("102"); @@ -158,7 +162,7 @@ TEST(test_transaction, test_multi_result_set_three) { std::vector<std::unique_ptr<Entity>> entities; for (auto sub_response : *responses) { three_elem.push_back(std::make_unique<Entity>( - sub_response.mutable_retrieve_response()->release_entity())); + sub_response.mutable_retrieve_response()->release_entity_response())); } MultiResultSet rs(std::move(three_elem)); @@ -191,4 +195,36 @@ TEST(test_transaction, test_multi_deletion) { } } +TEST(test_transaction, test_retrieve_and_download) { + const auto *host = "localhost"; + auto configuration = InsecureConnectionConfiguration(host, 8000); + Connection connection(configuration); + auto transaction = connection.CreateTransaction(); + + EXPECT_EQ(transaction->GetStatus().GetCode(), StatusCode::INITIAL); + transaction->RetrieveAndDownloadFilesById("asdf", "local_path"); + + EXPECT_EQ(transaction->GetStatus().GetCode(), StatusCode::GO_ON); + + EXPECT_EQ(transaction->ExecuteAsynchronously(), StatusCode::EXECUTING); + EXPECT_EQ(transaction->GetStatus().GetCode(), StatusCode::CONNECTION_ERROR); +} + +TEST(test_transaction, test_insert_with_file) { + const auto *host = "localhost"; + auto configuration = InsecureConnectionConfiguration(host, 8000); + Connection connection(configuration); + auto transaction = connection.CreateTransaction(); + Entity entity; + entity.SetRole("File"); + entity.SetLocalPath(TEST_DATA_DIR + "/test.json"); + + EXPECT_TRUE(transaction->GetUploadFiles().empty()); + transaction->InsertEntity(&entity); + EXPECT_EQ(transaction->GetUploadFiles().size(), 1); + + transaction->ExecuteAsynchronously(); + EXPECT_EQ(transaction->GetStatus().GetCode(), StatusCode::FILE_UPLOAD_ERROR); +} + } // namespace caosdb::transaction