diff --git a/include/CMakeLists.txt b/include/CMakeLists.txt index a1aa5a871f09946b836159be80e16ba5c35e1934..4393d3628cf63743b0191857b7f25b1329b6297c 100644 --- a/include/CMakeLists.txt +++ b/include/CMakeLists.txt @@ -27,6 +27,7 @@ set(libcaosdb_INCL ${CMAKE_CURRENT_BINARY_DIR}/caosdb/constants.h ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/entity.h ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/exceptions.h + ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/handler_interface.h ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/info.h ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/log_level.h ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/logging.h @@ -34,12 +35,14 @@ set(libcaosdb_INCL ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/protobuf_helper.h ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/status_code.h ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/transaction.h + ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/transaction_handler.h ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/transaction_status.h + ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/unary_rpc_handler.h ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/utility.h + ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/file_transmission/register_file_upload_handler.h ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/file_transmission/Client.h ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/file_transmission/UploadRequestHandler.h ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/file_transmission/DownloadRequestHandler.h - ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/file_transmission/HandlerInterface.h ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/file_transmission/FileWriter.h ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/file_transmission/FileReader.h ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/file_transmission/FileLock.h diff --git a/include/caosdb/file_transmission/Client.h b/include/caosdb/file_transmission/Client.h index 218a3e6859a2039932700a1df921602704b3315e..3a2b4a8fc487112a4d89d845d0340620261c2fc4 100644 --- a/include/caosdb/file_transmission/Client.h +++ b/include/caosdb/file_transmission/Client.h @@ -1,11 +1,12 @@ -#include "caosdb/entity.h" // for FileDescriptor -#include "caosdb/entity/v1alpha1/main.grpc.pb.h" // for FileTransmissionS... -#include "caosdb/file_transmission/HandlerInterface.h" // for HandlerInterface -#include "caosdb/status_code.h" // for StatusCode -#include <grpcpp/impl/codegen/completion_queue.h> // for CompletionQueue -#include <memory> // for shared_ptr, uniqu... - -namespace FileExchange { +#include "caosdb/entity.h" // for FileDescriptor +#include "caosdb/entity/v1alpha1/main.grpc.pb.h" // for FileTransmissionS... +#include "caosdb/handler_interface.h" // for HandlerInterface +#include "caosdb/status_code.h" // for StatusCode +#include "caosdb/transaction_status.h" // for StatusCode +#include <grpcpp/impl/codegen/completion_queue.h> // for CompletionQueue +#include <memory> // for shared_ptr, uniqu... + +namespace caosdb::transaction { using caosdb::StatusCode; using caosdb::entity::FileDescriptor; using caosdb::entity::v1alpha1::FileTransmissionService; @@ -26,7 +27,7 @@ public: StatusCode upload(const FileDescriptor &file_descriptor); StatusCode download(const FileDescriptor &file_descriptor); - void cancel(); + void Cancel(); private: int processMessages(); @@ -37,4 +38,4 @@ private: std::unique_ptr<HandlerInterface> handler_; }; -} // namespace FileExchange +} // namespace caosdb::transaction diff --git a/include/caosdb/file_transmission/DownloadRequestHandler.h b/include/caosdb/file_transmission/DownloadRequestHandler.h index 0b19132fc07bd2afdc4ce42aee28ae28c6ee7645..a59f9067605ac15aa01f569b8cc10ccadd6e7fef 100644 --- a/include/caosdb/file_transmission/DownloadRequestHandler.h +++ b/include/caosdb/file_transmission/DownloadRequestHandler.h @@ -1,19 +1,22 @@ -#include "caosdb/entity.h" // for FileDescriptor -#include "caosdb/entity/v1alpha1/main.grpc.pb.h" // for FileTransmissionS... -#include "caosdb/entity/v1alpha1/main.pb.h" // for FileDownloadResponse -#include "caosdb/file_transmission/FileWriter.h" // for FileWriter -#include "caosdb/file_transmission/HandlerInterface.h" // for HandlerTag, Handl... -#include <grpcpp/impl/codegen/async_stream.h> // for ClientAsyncReader -#include <grpcpp/impl/codegen/client_context.h> // for ClientContext -#include <grpcpp/impl/codegen/completion_queue.h> // for CompletionQueue -#include <grpcpp/impl/codegen/status.h> // for Status -#include <memory> // for unique_ptr - -namespace FileExchange { +#include "caosdb/entity.h" // for FileDescriptor +#include "caosdb/entity/v1alpha1/main.grpc.pb.h" // for FileTransmissionS... +#include "caosdb/entity/v1alpha1/main.pb.h" // for FileDownloadResponse +#include "caosdb/file_transmission/FileWriter.h" // for FileWriter +#include "caosdb/handler_interface.h" // for HandlerTag, Handl... +#include "caosdb/transaction_status.h" // for TransactionStatus +#include <grpcpp/impl/codegen/async_stream.h> // for ClientAsyncReader +#include <grpcpp/impl/codegen/client_context.h> // for ClientContext +#include <grpcpp/impl/codegen/completion_queue.h> // for CompletionQueue +#include <grpcpp/impl/codegen/status.h> // for Status +#include <memory> // for unique_ptr + +namespace caosdb::transaction { using caosdb::entity::FileDescriptor; using caosdb::entity::v1alpha1::FileDownloadRequest; using caosdb::entity::v1alpha1::FileDownloadResponse; using caosdb::entity::v1alpha1::FileTransmissionService; +using caosdb::transaction::HandlerInterface; +using caosdb::transaction::HandlerTag; class DownloadRequestHandler final : public HandlerInterface { public: @@ -28,9 +31,15 @@ public: DownloadRequestHandler(DownloadRequestHandler &&) = delete; DownloadRequestHandler &operator=(DownloadRequestHandler &&) = delete; - bool onNext(bool ok) override; + TransactionStatus GetStatus() override { + return TransactionStatus::EXECUTING(); + } - void cancel() override; + void Start() override { OnNext(true); } + + bool OnNext(bool ok) override; + + void Cancel() override; protected: enum class CallState { NewCall, SendingRequest, ReceivingFile, CallComplete }; @@ -61,4 +70,4 @@ protected: unsigned long long bytesReceived_; }; -} // namespace FileExchange +} // namespace caosdb::transaction diff --git a/include/caosdb/file_transmission/FileError.h b/include/caosdb/file_transmission/FileError.h index 609fd394d95c43691a537333b4f8d323e6078f23..f9bae6083fc7f1db4791f82fee474302b77cd420 100644 --- a/include/caosdb/file_transmission/FileError.h +++ b/include/caosdb/file_transmission/FileError.h @@ -3,7 +3,7 @@ #include <stdexcept> #include <string> -namespace FileExchange { +namespace caosdb::transaction { class FileLockError : public std::runtime_error { public: @@ -21,4 +21,4 @@ public: : std::runtime_error(message) {} }; -} // namespace FileExchange +} // namespace caosdb::transaction diff --git a/include/caosdb/file_transmission/FileLock.h b/include/caosdb/file_transmission/FileLock.h index 5fcf4d2d6a04f07edb7cb39223bbecc199eaf33e..40092ea49e842f8848da40c38ee4b26135c846aa 100644 --- a/include/caosdb/file_transmission/FileLock.h +++ b/include/caosdb/file_transmission/FileLock.h @@ -3,10 +3,10 @@ #include <mutex> #include <shared_mutex> -namespace FileExchange { +namespace caosdb::transaction { using FileMutex = std::shared_timed_mutex; using FileReadLock = std::shared_lock<FileMutex>; using FileWriteLock = std::unique_lock<FileMutex>; -} // namespace FileExchange +} // namespace caosdb::transaction diff --git a/include/caosdb/file_transmission/FileReader.h b/include/caosdb/file_transmission/FileReader.h index c1264395eb1a43b20e7f7ddf3395c85b26dce396..4ddca922cf5304239086840cb268b1205b505bb2 100644 --- a/include/caosdb/file_transmission/FileReader.h +++ b/include/caosdb/file_transmission/FileReader.h @@ -8,7 +8,7 @@ #include <memory> // for shared_ptr #include <string> // for string -namespace FileExchange { +namespace caosdb::transaction { using boost::filesystem::exists; using boost::filesystem::ifstream; using boost::filesystem::path; @@ -42,4 +42,4 @@ private: FileReadLock lock_; }; -} // namespace FileExchange +} // namespace caosdb::transaction diff --git a/include/caosdb/file_transmission/FileWriter.h b/include/caosdb/file_transmission/FileWriter.h index ee1589194b2c4eee6ac889d71eab5a4d7ea64438..0142d723910d361bee905c76c28d2157c7aebdb6 100644 --- a/include/caosdb/file_transmission/FileWriter.h +++ b/include/caosdb/file_transmission/FileWriter.h @@ -6,7 +6,7 @@ #include <memory> // for shared_ptr #include <string> // for string -namespace FileExchange { +namespace caosdb::transaction { class FileWriter final { public: @@ -34,4 +34,4 @@ private: FileWriteLock lock_; }; -} // namespace FileExchange +} // namespace caosdb::transaction diff --git a/include/caosdb/file_transmission/HandlerInterface.h b/include/caosdb/file_transmission/HandlerInterface.h deleted file mode 100644 index e1ab99dd6e86522c148a6f5635f614d7ea8564bd..0000000000000000000000000000000000000000 --- a/include/caosdb/file_transmission/HandlerInterface.h +++ /dev/null @@ -1,21 +0,0 @@ -#pragma once - -#include <memory> -#include <string> - -namespace FileExchange { -const std::string logger_name("caosdb::transaction::file_transmission"); - -class HandlerInterface { -public: - virtual ~HandlerInterface() = default; - - virtual bool onNext(bool ok) = 0; - - virtual void cancel() = 0; -}; - -using HandlerPtr = std::unique_ptr<HandlerInterface>; -using HandlerTag = HandlerPtr *; - -} // namespace FileExchange diff --git a/include/caosdb/file_transmission/LICENSE b/include/caosdb/file_transmission/LICENSE deleted file mode 100644 index 83d0cd5808aaf27991c5206fc1beacd2bc0c8131..0000000000000000000000000000000000000000 --- a/include/caosdb/file_transmission/LICENSE +++ /dev/null @@ -1,21 +0,0 @@ -MIT License - -Copyright (c) 2019 NeiRo21 - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. diff --git a/include/caosdb/file_transmission/UploadRequestHandler.h b/include/caosdb/file_transmission/UploadRequestHandler.h index 6803a1ae6018f120065bca65a59ed7d82e4d9387..8ee96c8a2875162bbbb615fdb2d30a2e47f82370 100644 --- a/include/caosdb/file_transmission/UploadRequestHandler.h +++ b/include/caosdb/file_transmission/UploadRequestHandler.h @@ -1,20 +1,23 @@ -#include "caosdb/entity.h" // for FileDescriptor -#include "caosdb/entity/v1alpha1/main.grpc.pb.h" // for FileTransmissionS... -#include "caosdb/entity/v1alpha1/main.pb.h" // for FileUploadRequest -#include "caosdb/file_transmission/FileReader.h" // for FileReader -#include "caosdb/file_transmission/HandlerInterface.h" // for HandlerTag, Handl... -#include <cstdint> // for uint64_t -#include <grpcpp/impl/codegen/async_stream.h> // for ClientAsyncWriter -#include <grpcpp/impl/codegen/client_context.h> // for ClientContext -#include <grpcpp/impl/codegen/completion_queue.h> // for CompletionQueue -#include <grpcpp/impl/codegen/status.h> // for Status -#include <memory> // for unique_ptr - -namespace FileExchange { +#include "caosdb/entity.h" // for FileDescriptor +#include "caosdb/entity/v1alpha1/main.grpc.pb.h" // for FileTransmissionS... +#include "caosdb/entity/v1alpha1/main.pb.h" // for FileUploadRequest +#include "caosdb/file_transmission/FileReader.h" // for FileReader +#include "caosdb/handler_interface.h" // for HandlerTag, Handl... +#include "caosdb/transaction_status.h" // for TransactionStatus +#include <cstdint> // for uint64_t +#include <grpcpp/impl/codegen/async_stream.h> // for ClientAsyncWriter +#include <grpcpp/impl/codegen/client_context.h> // for ClientContext +#include <grpcpp/impl/codegen/completion_queue.h> // for CompletionQueue +#include <grpcpp/impl/codegen/status.h> // for Status +#include <memory> // for unique_ptr + +namespace caosdb::transaction { using caosdb::entity::FileDescriptor; using caosdb::entity::v1alpha1::FileTransmissionService; using caosdb::entity::v1alpha1::FileUploadRequest; using caosdb::entity::v1alpha1::FileUploadResponse; +using caosdb::transaction::HandlerInterface; +using caosdb::transaction::HandlerTag; class UploadRequestHandler final : public HandlerInterface { public: @@ -29,9 +32,15 @@ public: UploadRequestHandler(UploadRequestHandler &&) = delete; UploadRequestHandler &operator=(UploadRequestHandler &&) = delete; - bool onNext(bool ok) override; + TransactionStatus GetStatus() override { + return TransactionStatus::EXECUTING(); + } - void cancel() override; + void Start() override { OnNext(true); } + + bool OnNext(bool ok) override; + + void Cancel() override; protected: enum class CallState { @@ -69,4 +78,4 @@ protected: uint64_t bytesToSend_; }; -} // namespace FileExchange +} // namespace caosdb::transaction diff --git a/include/caosdb/file_transmission/register_file_upload_handler.h b/include/caosdb/file_transmission/register_file_upload_handler.h new file mode 100644 index 0000000000000000000000000000000000000000..0d41b947eebf92b50ac6b33b97f0dc67bf6bad9c --- /dev/null +++ b/include/caosdb/file_transmission/register_file_upload_handler.h @@ -0,0 +1,46 @@ +#pragma once +#include "caosdb/entity/v1alpha1/main.grpc.pb.h" // for FileTransmissionS... +#include "caosdb/entity/v1alpha1/main.pb.h" // for FileDownloadResponse +#include "caosdb/handler_interface.h" // for HandlerTag, Handl... +#include "caosdb/unary_rpc_handler.h" +#include <grpcpp/impl/codegen/async_unary_call.h> // for ClientAsyncRespons... +#include <grpcpp/impl/codegen/completion_queue.h> // for CompletionQueue +#include <memory> // for unique_ptr + +namespace caosdb::transaction { + +using caosdb::entity::v1alpha1::FileTransmissionService; +using caosdb::entity::v1alpha1::RegisterFileUploadRequest; +using caosdb::entity::v1alpha1::RegisterFileUploadResponse; + +class RegisterFileUploadHandler final : public UnaryRpcHandler { +public: + RegisterFileUploadHandler(HandlerTag tag, FileTransmissionService::Stub *stub, + grpc::CompletionQueue *completion_queue, + RegisterFileUploadRequest *request, + RegisterFileUploadResponse *response); + + ~RegisterFileUploadHandler(); + + RegisterFileUploadHandler(const RegisterFileUploadHandler &) = delete; + RegisterFileUploadHandler & + operator=(const RegisterFileUploadHandler &) = delete; + RegisterFileUploadHandler(RegisterFileUploadHandler &&) = delete; + RegisterFileUploadHandler &operator=(RegisterFileUploadHandler &&) = delete; + +protected: + void handleNewCallState() override; + void handleReceivingFileState() override; + + HandlerTag tag_; + + FileTransmissionService::Stub *stub_; + + std::unique_ptr<grpc::ClientAsyncResponseReader<RegisterFileUploadResponse>> + rpc_; + + RegisterFileUploadRequest *request_; + RegisterFileUploadResponse *response_; +}; + +} // namespace caosdb::transaction diff --git a/include/caosdb/handler_interface.h b/include/caosdb/handler_interface.h new file mode 100644 index 0000000000000000000000000000000000000000..ff1a42b047cc7f216cacc63c945e97adb709c66c --- /dev/null +++ b/include/caosdb/handler_interface.h @@ -0,0 +1,78 @@ +/* + * This file is a part of the CaosDB Project. + * Copyright (C) 2021 Timm Fitschen <t.fitschen@indiscale.com> + * Copyright (C) 2021 IndiScale GmbH <info@indiscale.com> + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <https://www.gnu.org/licenses/>. + * + ********************************************************************************* + * + * This is derived work which is heavily based on + * https://github.com/NeiRo21/grpcpp-bidi-streaming, Commit + * cd9cb78e5d6d72806c2ec4c703e5e856b223dc96, Aug 10, 2020 + * + * The orginal work is licensed as + * + * > MIT License + * > + * > Copyright (c) 2019 NeiRo21 + * > + * > Permission is hereby granted, free of charge, to any person obtaining a + * > copy of this software and associated documentation files (the "Software"), + * > to deal in the Software without restriction, including without limitation + * > the rights to use, copy, modify, merge, publish, distribute, sublicense, + * > and/or sell copies of the Software, and to permit persons to whom the + * > Software is furnished to do so, subject to the following conditions: + * > + * > The above copyright notice and this permission notice shall be included in + * > all copies or substantial portions of the Software. + * > + * > THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * > IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * > FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * > AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * > LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * > FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * > DEALINGS IN THE SOFTWARE. + */ +#ifndef CAOSDB_HANDLER_INTERFACE_H +#define CAOSDB_HANDLER_INTERFACE_H + +#include "caosdb/transaction_status.h" // for TransactionStatus +#include <memory> +#include <string> + +namespace caosdb::transaction { + +const static std::string logger_name = "caosdb::transaction"; + +class HandlerInterface { +public: + virtual ~HandlerInterface() = default; + + virtual void Start() = 0; + + virtual bool OnNext(bool ok) = 0; + + virtual void Cancel() = 0; + + virtual TransactionStatus GetStatus() = 0; +}; + +using HandlerPtr = std::unique_ptr<HandlerInterface>; +using HandlerTag = HandlerPtr *; + +} // namespace caosdb::transaction + +#endif diff --git a/include/caosdb/status_code.h b/include/caosdb/status_code.h index fe42d7e7cc439ec91ac493b7a75bcbfb259045da..6cbb055d64c8e5f539be4a4f661556c5f276d077 100644 --- a/include/caosdb/status_code.h +++ b/include/caosdb/status_code.h @@ -41,6 +41,7 @@ enum StatusCode { INITIAL = -2, EXECUTING = -1, SUCCESS = 0, + // TODO(tf) Map other GRPC errors AUTHENTICATION_ERROR = 16, CONNECTION_ERROR = 14, GENERIC_RPC_ERROR = 20, diff --git a/include/caosdb/transaction.h b/include/caosdb/transaction.h index 3d394a1d1b37ffb980852cb5ff5400c96915a62b..368eb2e10d311ce862cce09c8c417c54da379a5a 100644 --- a/include/caosdb/transaction.h +++ b/include/caosdb/transaction.h @@ -21,24 +21,29 @@ #ifndef CAOSDB_TRANSACTION_H #define CAOSDB_TRANSACTION_H -#include "caosdb/entity.h" // for Entity, FileDe... -#include "caosdb/entity/v1alpha1/main.grpc.pb.h" // for EntityTransact... -#include "caosdb/entity/v1alpha1/main.pb.h" // for MultiTransacti... -#include "caosdb/logging.h" // for CAOSDB_LOG_ERR... -#include "caosdb/status_code.h" // for StatusCode -#include "caosdb/transaction_status.h" // for StatusCode -#include <boost/log/core/record.hpp> // for record -#include <boost/log/sources/record_ostream.hpp> // for basic_record_o... +#include "caosdb/entity.h" // for Entity, FileDe... +#include "caosdb/entity/v1alpha1/main.grpc.pb.h" // for EntityTransact... +#include "caosdb/entity/v1alpha1/main.pb.h" // for MultiTransacti... +#include "caosdb/handler_interface.h" // for HandlerInterface +#include "caosdb/transaction_handler.h" // for EntityTransactionHandler +#include "caosdb/logging.h" // for CAOSDB_LOG_ERR... +#include "caosdb/protobuf_helper.h" // for get_arena +#include "caosdb/status_code.h" // for StatusCode +#include "caosdb/transaction_status.h" // for StatusCode +#include <boost/log/core/record.hpp> // for record +#include <boost/log/sources/record_ostream.hpp> // for basic_record_o... #include <boost/preprocessor/seq/limits/enum_256.hpp> // for BOOST_PP_SEQ_E... #include <boost/preprocessor/seq/limits/size_256.hpp> // for BOOST_PP_SEQ_S... +#include <google/protobuf/arena.h> // for Arena +#include <google/protobuf/util/json_util.h> // for MessageToJsonS... +#include <grpcpp/impl/codegen/completion_queue.h> // for CompletionQueue +#include <iterator> // for iterator, next +#include <map> // for map // IWYU pragma: no_include <ext/alloc_traits.h> -#include <google/protobuf/util/json_util.h> // for MessageToJsonS... -#include <iterator> // for iterator, next -#include <map> // for map -#include <memory> // for unique_ptr -#include <string> // for string -#include <utility> // for move -#include <vector> // for vector +#include <memory> // for unique_ptr +#include <string> // for string +#include <utility> // for move +#include <vector> // for vector /** * Do all necessary checks and assure that another retrieval (by id or by @@ -177,11 +182,11 @@ using caosdb::entity::v1alpha1::RegisterFileUploadResponse; using caosdb::transaction::TransactionStatus; using TransactionResponseCase = caosdb::entity::v1alpha1::TransactionResponse::TransactionResponseCase; +using caosdb::utility::get_arena; +using google::protobuf::Arena; class Transaction; -static const std::string logger_name = "caosdb::transaction"; - /** * Abstract base class for the results of a Transaction. */ @@ -265,6 +270,13 @@ public: Transaction(std::shared_ptr<EntityTransactionService::Stub> entity_service, std::shared_ptr<FileTransmissionService::Stub> file_service); + ~Transaction(); + + Transaction(const Transaction &) = delete; + Transaction &operator=(const Transaction &) = delete; + Transaction(Transaction &&) = delete; + Transaction &operator=(Transaction &&) = delete; + /** * Add an entity id to this transaction for retrieval and also download the * file. @@ -410,11 +422,45 @@ public: return out; } + /** + * Return the vector which holds all the files which are to be uploaded. + */ + inline auto GetUploadFiles() const -> const std::vector<FileDescriptor> & { + return upload_files; + } + +protected: + /** + * Await and process the current handler's results. + */ + auto ProcessCalls() -> TransactionStatus; + + /** + * Cancels any active handler and drains the completion_queue. + * + * Can stay protected until ExecuteAsynchronously() is actually asynchronous. + * Then it is also intended for aborting an execution after it has already + * started. + */ + auto Cancel() -> void; + + /** + * Return the Arena where this transaction may create Message instances. + * + * Currently, this implementation is only a call to + * caosdb::utility::get_arena(), but in the future we might want to have a + * smarter memory management. + */ + inline auto GetArena() const -> Arena * { return get_arena(); } + +private: + grpc::CompletionQueue completion_queue; + std::unique_ptr<HandlerInterface> handler_; + std::vector<FileDescriptor> upload_files; std::map<std::string, FileDescriptor> download_files; -private: - auto RegisterUploadFile(RegisterFileUploadResponse *response) -> void; + // auto RegisterUploadFile(RegisterFileUploadResponse *response) -> void; auto UploadFile(FileUploadResponse *response, const FileDescriptor &file_descriptor, const std::string ®istration_id) -> void; diff --git a/include/caosdb/transaction_handler.h b/include/caosdb/transaction_handler.h new file mode 100644 index 0000000000000000000000000000000000000000..69df4f0b370c5cdcc9515a916e36c1c899fd3397 --- /dev/null +++ b/include/caosdb/transaction_handler.h @@ -0,0 +1,46 @@ +#pragma once +#include "caosdb/entity/v1alpha1/main.grpc.pb.h" // for FileTransmissionS... +#include "caosdb/entity/v1alpha1/main.pb.h" // for FileDownloadResponse +#include "caosdb/handler_interface.h" // for HandlerTag +#include "caosdb/unary_rpc_handler.h" // for HandlerTag, Handl... +#include <grpcpp/impl/codegen/async_unary_call.h> // for ClientAsyncRespons... +#include <grpcpp/impl/codegen/completion_queue.h> // for CompletionQueue +#include <memory> // for unique_ptr + +namespace caosdb::transaction { + +using caosdb::entity::v1alpha1::EntityTransactionService; +using caosdb::entity::v1alpha1::MultiTransactionRequest; +using caosdb::entity::v1alpha1::MultiTransactionResponse; + +class EntityTransactionHandler final : public UnaryRpcHandler { +public: + EntityTransactionHandler(HandlerTag tag, EntityTransactionService::Stub *stub, + grpc::CompletionQueue *completion_queue, + MultiTransactionRequest *request, + MultiTransactionResponse *response); + + ~EntityTransactionHandler() override = default; + + EntityTransactionHandler(const EntityTransactionHandler &) = delete; + EntityTransactionHandler & + operator=(const EntityTransactionHandler &) = delete; + EntityTransactionHandler(EntityTransactionHandler &&) = delete; + EntityTransactionHandler &operator=(EntityTransactionHandler &&) = delete; + +protected: + virtual void handleNewCallState() override; + virtual void handleReceivingFileState() override; + + HandlerTag tag_; + + EntityTransactionService::Stub *stub_; + + std::unique_ptr<grpc::ClientAsyncResponseReader<MultiTransactionResponse>> + rpc_; + + MultiTransactionRequest *request_; + MultiTransactionResponse *response_; +}; + +} // namespace caosdb::transaction diff --git a/include/caosdb/transaction_status.h b/include/caosdb/transaction_status.h index c018ef553fb7945fda07b779d2e7b7bbb71488df..ca40c578bc1c3dd2555ccda97c06b4da726e9c7b 100644 --- a/include/caosdb/transaction_status.h +++ b/include/caosdb/transaction_status.h @@ -170,6 +170,19 @@ public: " Original error: " + details); } + /** + * Factory for a GENERIC_ERROR status. + * + * This status means that the transaction failed due to errors which + * supposedly do not have a special handling. + */ + inline static auto GENERIC_ERROR(const std::string &details) { + return TransactionStatus( + StatusCode::GENERIC_ERROR, + caosdb::get_status_description(StatusCode::GENERIC_ERROR) + + "Original error: " + details); + } + inline auto ThrowExceptionIfError() const -> void { TransactionStatus::ThrowExceptionIfError(this->code, this->description); } @@ -192,7 +205,7 @@ public: case StatusCode::TRANSACTION_TYPE_ERROR: throw TransactionTypeError(description); default: - throw Exception(StatusCode::GENERIC_ERROR, description); + throw Exception(code, description); } } @@ -227,6 +240,9 @@ public: */ inline auto GetCode() const -> StatusCode { return this->code; } + TransactionStatus(StatusCode code, const std::string &description) + : code(code), description(description){}; + private: /** * The code is an identifier of errors. @@ -237,9 +253,6 @@ private: * Description of the error */ std::string description; - - TransactionStatus(StatusCode code, const std::string &description) - : code(code), description(description){}; }; } // namespace caosdb::transaction diff --git a/include/caosdb/unary_rpc_handler.h b/include/caosdb/unary_rpc_handler.h new file mode 100644 index 0000000000000000000000000000000000000000..816bd65683eeee0d5aebf34abb386c76cbb544fb --- /dev/null +++ b/include/caosdb/unary_rpc_handler.h @@ -0,0 +1,41 @@ +#pragma once +#include "caosdb/handler_interface.h" // for HandlerTag, Handl... +#include "caosdb/transaction_status.h" // for TransactionStatus +#include <grpcpp/impl/codegen/client_context.h> // for ClientContext +#include <grpcpp/impl/codegen/completion_queue.h> // for CompletionQueue +#include <grpcpp/impl/codegen/status.h> // for Status + +namespace caosdb::transaction { + +class UnaryRpcHandler : public HandlerInterface { +public: + inline UnaryRpcHandler(grpc::CompletionQueue *completion_queue) + : state_(CallState::NewCall), completion_queue(completion_queue), + transaction_status(TransactionStatus::EXECUTING()) {} + + void Start() override { + transaction_status = TransactionStatus::EXECUTING(); + OnNext(true); + } + + bool OnNext(bool ok) override; + + void Cancel() override; + + TransactionStatus GetStatus() override { return transaction_status; } + +protected: + virtual void handleNewCallState() = 0; + virtual void handleReceivingFileState() = 0; + void handleCallCompleteState(); + + enum class CallState { NewCall, ReceivingFile, CallComplete }; + CallState state_; + grpc::CompletionQueue *completion_queue; + + grpc::ClientContext call_context; + grpc::Status status_; + TransactionStatus transaction_status; +}; + +} // namespace caosdb::transaction diff --git a/proto b/proto index af178772ef43ded290168f33852ef3d85583b20a..485173a714d9ff7c2388945b0a4cad35980cda69 160000 --- a/proto +++ b/proto @@ -1 +1 @@ -Subproject commit af178772ef43ded290168f33852ef3d85583b20a +Subproject commit 485173a714d9ff7c2388945b0a4cad35980cda69 diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 57fc47ad5aac33d2aab2a591cfdd6d8696366e80..366875d669b57c4df3942aec5fa4658873754a86 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -28,6 +28,9 @@ set(libcaosdb_SRC ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/configuration.cpp ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/protobuf_helper.cpp ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/transaction.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/transaction_handler.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/unary_rpc_handler.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/file_transmission/register_file_upload_handler.cpp ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/file_transmission/Client.cpp ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/file_transmission/UploadRequestHandler.cpp ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/file_transmission/DownloadRequestHandler.cpp diff --git a/src/caosdb/file_transmission/Client.cpp b/src/caosdb/file_transmission/Client.cpp index 769cbb9c18891c5a0edad7589904b31d056f835c..9adabd60806537c35ac882fa601723a2cad37464 100644 --- a/src/caosdb/file_transmission/Client.cpp +++ b/src/caosdb/file_transmission/Client.cpp @@ -11,11 +11,11 @@ // IWYU pragma: no_include <bits/exception.h> #include <grpcpp/impl/codegen/completion_queue.h> // for CompletionQ... -namespace FileExchange { +namespace caosdb::transaction { using caosdb::StatusCode; FileExchangeClient::~FileExchangeClient() { - this->cancel(); + this->Cancel(); cq_.Shutdown(); @@ -49,21 +49,22 @@ StatusCode FileExchangeClient::download(const FileDescriptor &file_descriptor) { return StatusCode::SUCCESS; } -void FileExchangeClient::cancel() { +void FileExchangeClient::Cancel() { if (handler_) { - handler_->cancel(); + handler_->Cancel(); } } int FileExchangeClient::processMessages() { try { + handler_->Start(); void *tag = nullptr; bool ok = false; while (true) { if (cq_.Next(&tag, &ok)) { if (tag != nullptr) { // TODO(tf): assert - auto res = handler_->onNext(ok); + auto res = handler_->OnNext(ok); if (!res) { // TODO(tf): comment handler_.reset(); @@ -89,4 +90,4 @@ int FileExchangeClient::processMessages() { return 0; } -} // namespace FileExchange +} // namespace caosdb::transaction diff --git a/src/caosdb/file_transmission/DownloadRequestHandler.cpp b/src/caosdb/file_transmission/DownloadRequestHandler.cpp index 57ce85c92540098331f669df7d05575993f818d7..87730febcf8589f30639123a18a65d60df05de90 100644 --- a/src/caosdb/file_transmission/DownloadRequestHandler.cpp +++ b/src/caosdb/file_transmission/DownloadRequestHandler.cpp @@ -22,7 +22,7 @@ #include <string> // for string, opera... #include <utility> // for move -namespace FileExchange { +namespace caosdb::transaction { using caosdb::StatusCode; using caosdb::exceptions::AuthenticationError; using caosdb::exceptions::ConnectionError; @@ -37,11 +37,9 @@ DownloadRequestHandler::DownloadRequestHandler( request_(Arena::CreateMessage<FileDownloadRequest>(get_arena())), response_(Arena::CreateMessage<FileDownloadResponse>(get_arena())), state_(CallState::NewCall), file_descriptor_(std::move(file_descriptor)), - bytesReceived_(0) { - this->onNext(true); -} + bytesReceived_(0) {} -bool DownloadRequestHandler::onNext(bool ok) { +bool DownloadRequestHandler::OnNext(bool ok) { try { if (ok) { if (state_ == CallState::NewCall) { @@ -81,7 +79,7 @@ bool DownloadRequestHandler::onNext(bool ok) { return true; } -void DownloadRequestHandler::cancel() { ctx_.TryCancel(); } +void DownloadRequestHandler::Cancel() { ctx_.TryCancel(); } void DownloadRequestHandler::handleNewCallState() { CAOSDB_LOG_TRACE(logger_name) @@ -90,17 +88,10 @@ void DownloadRequestHandler::handleNewCallState() { << ", download_id = " << file_descriptor_.file_transmission_id; fileWriter_ = std::make_unique<FileWriter>(file_descriptor_.local_path); - CAOSDB_DEBUG_MESSAGE_STRING(*request_, request_out) - CAOSDB_LOG_TRACE(logger_name) << "HERE1" << request_out; - CAOSDB_DEBUG_MESSAGE_STRING(*(file_descriptor_.file_transmission_id), - file_transmission_id_out) - CAOSDB_LOG_TRACE(logger_name) << "HERE2" << file_transmission_id_out; request_->mutable_file_transmission_id()->CopyFrom( *(file_descriptor_.file_transmission_id)); - CAOSDB_LOG_TRACE(logger_name) << "HERE2"; rpc_ = stub_->PrepareAsyncFileDownload(&ctx_, *request_, cq_); - CAOSDB_LOG_TRACE(logger_name) << "HERE3"; state_ = CallState::SendingRequest; rpc_->StartCall(tag_); @@ -162,4 +153,4 @@ void DownloadRequestHandler::handleCallCompleteState() { << "Leave DownloadRequestHandler::handleCallCompleteState"; } -} // namespace FileExchange +} // namespace caosdb::transaction diff --git a/src/caosdb/file_transmission/FileReader.cpp b/src/caosdb/file_transmission/FileReader.cpp index 8ea983023f86cb8f71807fb0da3809696f92ab27..5124846c5f993fa3311c72f3445f53ea5482bba7 100644 --- a/src/caosdb/file_transmission/FileReader.cpp +++ b/src/caosdb/file_transmission/FileReader.cpp @@ -4,7 +4,7 @@ #include <mutex> // for try_to_lock #include <utility> // for move -namespace FileExchange { +namespace caosdb::transaction { FileReader::FileReader(boost::filesystem::path filename) : filename_(std::move(filename)), size_(0) { @@ -54,4 +54,4 @@ std::size_t FileReader::read(std::string &buffer) { return bytesRead; } -} // namespace FileExchange +} // namespace caosdb::transaction diff --git a/src/caosdb/file_transmission/FileWriter.cpp b/src/caosdb/file_transmission/FileWriter.cpp index 3956ee22d26516b1aa7067451c1c2ff631bad9ca..3d89c1c6f2343ca5a39a167ad2dc88ff5a4e3aa2 100644 --- a/src/caosdb/file_transmission/FileWriter.cpp +++ b/src/caosdb/file_transmission/FileWriter.cpp @@ -4,7 +4,7 @@ #include <mutex> // for try_to_lock #include <utility> // for move -namespace FileExchange { +namespace caosdb::transaction { FileWriter::FileWriter(boost::filesystem::path filename) : filename_(std::move(filename)) { @@ -40,4 +40,4 @@ void FileWriter::write(const std::string &buffer) { } } -} // namespace FileExchange +} // namespace caosdb::transaction diff --git a/src/caosdb/file_transmission/LICENSE b/src/caosdb/file_transmission/LICENSE deleted file mode 100644 index 83d0cd5808aaf27991c5206fc1beacd2bc0c8131..0000000000000000000000000000000000000000 --- a/src/caosdb/file_transmission/LICENSE +++ /dev/null @@ -1,21 +0,0 @@ -MIT License - -Copyright (c) 2019 NeiRo21 - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. diff --git a/src/caosdb/file_transmission/UploadRequestHandler.cpp b/src/caosdb/file_transmission/UploadRequestHandler.cpp index 526362c102d90f7431dc9871b927a06129b98821..c96115d29215c1189833fe1b774d48070c1391c3 100644 --- a/src/caosdb/file_transmission/UploadRequestHandler.cpp +++ b/src/caosdb/file_transmission/UploadRequestHandler.cpp @@ -24,7 +24,7 @@ #include <string> // for basic_string #include <utility> // for move -namespace FileExchange { +namespace caosdb::transaction { using caosdb::StatusCode; using caosdb::exceptions::AuthenticationError; using caosdb::exceptions::ConnectionError; @@ -40,11 +40,9 @@ UploadRequestHandler::UploadRequestHandler(HandlerTag tag, request_(Arena::CreateMessage<FileUploadRequest>(get_arena())), response_(Arena::CreateMessage<FileUploadResponse>(get_arena())), state_(CallState::NewCall), file_descriptor_(std::move(file_descriptor)), - bytesToSend_(0) { - this->onNext(true); -} + bytesToSend_(0) {} -bool UploadRequestHandler::onNext(bool ok) { +bool UploadRequestHandler::OnNext(bool ok) { try { if (state_ == CallState::CallComplete) { this->handleCallCompleteState(); @@ -84,7 +82,7 @@ bool UploadRequestHandler::onNext(bool ok) { return true; } -void UploadRequestHandler::cancel() { ctx_.TryCancel(); } +void UploadRequestHandler::Cancel() { ctx_.TryCancel(); } void UploadRequestHandler::handleNewCallState() { auto filename = file_descriptor_.local_path; @@ -157,4 +155,4 @@ void UploadRequestHandler::handleCallCompleteState() { } } -} // namespace FileExchange +} // namespace caosdb::transaction diff --git a/src/caosdb/file_transmission/register_file_upload_handler.cpp b/src/caosdb/file_transmission/register_file_upload_handler.cpp new file mode 100644 index 0000000000000000000000000000000000000000..2661756cdc565516a7ec6dfa74fde36bf2f1cc7e --- /dev/null +++ b/src/caosdb/file_transmission/register_file_upload_handler.cpp @@ -0,0 +1,38 @@ +#include "caosdb/file_transmission/register_file_upload_handler.h" +#include "caosdb/logging.h" // for CAOSDB_LOG_TRACE +#include <boost/log/core/record.hpp> // for record +#include <boost/log/sources/record_ostream.hpp> // for basic_record_... +#include <boost/preprocessor/seq/limits/enum_256.hpp> // for BOOST_PP_SEQ_... +#include <boost/preprocessor/seq/limits/size_256.hpp> // for BOOST_PP_SEQ_... +#include <grpcpp/impl/codegen/async_unary_call.h> // for ClientAsyncRes... +#include <grpcpp/impl/codegen/completion_queue.h> // for CompletionQueue + +namespace caosdb::transaction { + +RegisterFileUploadHandler::~RegisterFileUploadHandler() = default; + +RegisterFileUploadHandler::RegisterFileUploadHandler( + HandlerTag tag, FileTransmissionService::Stub *stub, + grpc::CompletionQueue *completion_queue, RegisterFileUploadRequest *request, + RegisterFileUploadResponse *response) + : UnaryRpcHandler(completion_queue), tag_(tag), stub_(stub), + request_(request), response_(response) {} + +void RegisterFileUploadHandler::handleNewCallState() { + CAOSDB_LOG_TRACE(logger_name) + << "Enter RegisterFileUploadHandler::handleNewCallState."; + + rpc_ = stub_->PrepareAsyncRegisterFileUpload(&call_context, *request_, + completion_queue); + + state_ = CallState::CallComplete; + rpc_->StartCall(); + rpc_->Finish(response_, &status_, tag_); + + CAOSDB_LOG_TRACE(logger_name) + << "Leave RegisterFileUploadHandler::handleNewCallState"; +} + +void RegisterFileUploadHandler::handleReceivingFileState() {} + +} // namespace caosdb::transaction diff --git a/src/caosdb/transaction.cpp b/src/caosdb/transaction.cpp index 9c90517278b5c4ef7f3b667f4ef01950577f963f..377b3b0febaec8086cc385aa6d1b815bb49fa335 100644 --- a/src/caosdb/transaction.cpp +++ b/src/caosdb/transaction.cpp @@ -18,12 +18,14 @@ * */ #include "caosdb/transaction.h" -#include "caosdb/entity/v1alpha1/main.grpc.pb.h" // for EntityTransac... -#include "caosdb/entity/v1alpha1/main.pb.h" // for TransactionRe... -#include "caosdb/file_transmission/Client.h" // for FileExchangeC... -#include "caosdb/logging.h" // for CAOSDB_LOG_FATAL -#include "caosdb/protobuf_helper.h" // for get_arena -#include "caosdb/status_code.h" // for StatusCode +#include "caosdb/entity/v1alpha1/main.grpc.pb.h" // for EntityTransac... +#include "caosdb/entity/v1alpha1/main.pb.h" // for TransactionRe... +#include "caosdb/file_transmission/Client.h" // for FileExchangeC... +#include "caosdb/file_transmission/register_file_upload_handler.h" +#include "caosdb/logging.h" // for CAOSDB_LOG_FATAL +#include "caosdb/protobuf_helper.h" // for get_arena +#include "caosdb/status_code.h" // for StatusCode +#include "caosdb/transaction_handler.h" #include <algorithm> // for max #include <boost/filesystem/path.hpp> // for operator<<, path #include <boost/log/core/record.hpp> // for record @@ -31,19 +33,16 @@ #include <boost/log/sources/record_ostream.hpp> // for basic_record_... #include <boost/preprocessor/seq/limits/enum_256.hpp> // for BOOST_PP_SEQ_... #include <boost/preprocessor/seq/limits/size_256.hpp> // for BOOST_PP_SEQ_... -#include <cassert> // for assert -#include <google/protobuf/arena.h> // for Arena -#include <grpcpp/grpcpp.h> // for CompletionQueue -#include <grpcpp/impl/codegen/async_unary_call.h> // for ClientAsyncRe... -#include <grpcpp/impl/codegen/client_context.h> // for ClientContext -#include <grpcpp/impl/codegen/completion_queue.h> // for CompletionQueue -#include <grpcpp/impl/codegen/status.h> // for Status -#include <grpcpp/impl/codegen/status_code_enum.h> // for StatusCode -#include <iosfwd> // for streamsize -#include <map> // for map, operator!= -#include <memory> // for unique_ptr -#include <stdexcept> // for out_of_range -#include <utility> // for move, pair +// IWYU pragma: no_include <bits/exception.h> +#include <exception> // IWYU pragma: keep +#include <google/protobuf/arena.h> // for Arena +#include <grpc/impl/codegen/gpr_types.h> +#include <grpcpp/impl/codegen/completion_queue.h> // for CompletionQueue +#include <iosfwd> // for streamsize +#include <map> // for map, operator!= +#include <memory> // for unique_ptr +#include <stdexcept> // for out_of_range +#include <utility> // for move, pair namespace caosdb { @@ -116,12 +115,10 @@ using TransactionResponseCase = caosdb::entity::v1alpha1::TransactionResponse::TransactionResponseCase; using RetrieveResponseCase = caosdb::entity::v1alpha1::RetrieveResponse::RetrieveResponseCase; -using caosdb::utility::get_arena; -using grpc::ClientAsyncResponseReader; using ProtoEntity = caosdb::entity::v1alpha1::Entity; -using FileExchange::FileExchangeClient; using google::protobuf::Arena; -using grpc::CompletionQueue; +using NextStatus = grpc::CompletionQueue::NextStatus; +using RegistrationStatus = caosdb::entity::v1alpha1::RegistrationStatus; ResultSet::iterator::iterator(const ResultSet *result_set_param, int index) : current_index(index), result_set(result_set_param) {} @@ -159,8 +156,8 @@ MultiResultSet::MultiResultSet(std::vector<std::unique_ptr<Entity>> result_set) Transaction::Transaction( std::shared_ptr<EntityTransactionService::Stub> entity_service, std::shared_ptr<FileTransmissionService::Stub> file_service) - : request(Arena::CreateMessage<MultiTransactionRequest>(get_arena())), - response(Arena::CreateMessage<MultiTransactionResponse>(get_arena())) { + : request(Arena::CreateMessage<MultiTransactionRequest>(GetArena())), + response(Arena::CreateMessage<MultiTransactionResponse>(GetArena())) { this->entity_service = std::move(entity_service); this->file_service = std::move(file_service); this->query_count = -1; @@ -285,14 +282,26 @@ auto Transaction::ExecuteAsynchronously() noexcept -> StatusCode { // upload files first if (!upload_files.empty()) { + CAOSDB_LOG_INFO(logger_name) + << "Number of files to be uploaded: " << upload_files.size(); + + // TODO(tf): Use Arena + auto *registration_request = + Arena::CreateMessage<RegisterFileUploadRequest>(GetArena()); auto *registration_response = - Arena::CreateMessage<RegisterFileUploadResponse>(get_arena()); - RegisterUploadFile(registration_response); + Arena::CreateMessage<RegisterFileUploadResponse>(GetArena()); + + handler_ = std::make_unique<RegisterFileUploadHandler>( + &handler_, file_service.get(), &completion_queue, registration_request, + registration_response); + this->status = ProcessCalls(); + + if (registration_response->status() != + RegistrationStatus::REGISTRATION_STATUS_ACCEPTED) { + this->status = TransactionStatus::FILE_UPLOAD_ERROR(); + return StatusCode::EXECUTING; + } - // TODO(tf): if(registration_response.status != - // REGISTRATION_STATUS_ACCEPTED){ - // return StatusCode::FILE_UPLOAD_FAILED - // } FileExchangeClient upload_client(file_service); for (auto file_descriptor : upload_files) { file_descriptor.file_transmission_id->set_registration_id( @@ -300,50 +309,17 @@ auto Transaction::ExecuteAsynchronously() noexcept -> StatusCode { CAOSDB_LOG_INFO(logger_name) << "Uploading " << file_descriptor.local_path; auto file_upload_status = upload_client.upload(file_descriptor); - if (static_cast<int>(static_cast<int>((file_upload_status) == 0) == - StatusCode::SUCCESS) != 0) { + if (file_upload_status != StatusCode::SUCCESS) { this->status = TransactionStatus::FILE_UPLOAD_ERROR(); return StatusCode::EXECUTING; } } } - grpc::Status grpc_status; - CompletionQueue cq; - - grpc::ClientContext context; - std::unique_ptr<ClientAsyncResponseReader<MultiTransactionResponse>> rpc( - this->entity_service->PrepareAsyncMultiTransaction(&context, - *(this->request), &cq)); - rpc->StartCall(); - - int tag = 1; - void *send_tag = static_cast<void *>(&tag); - rpc->Finish(this->response, &grpc_status, send_tag); - void *recv_tag = nullptr; - bool ok = false; + handler_ = std::make_unique<EntityTransactionHandler>( + &handler_, entity_service.get(), &completion_queue, request, response); - // TODO(tf) make this actually asynchronous by moving this to WaitForIt() - cq.Next(&recv_tag, &ok); - assert(recv_tag == send_tag); - assert(ok); - - if (!grpc_status.ok()) { - switch (grpc_status.error_code()) { - case grpc::StatusCode::UNAUTHENTICATED: - this->status = TransactionStatus::AUTHENTICATION_ERROR(); - break; - case grpc::StatusCode::UNAVAILABLE: - this->status = TransactionStatus::CONNECTION_ERROR(); - break; - default: - auto error_details = std::to_string(grpc_status.error_code()) + " - " + - grpc_status.error_message(); - this->status = TransactionStatus::RPC_ERROR(error_details); - } - } else { - this->status = TransactionStatus::SUCCESS(); - } + this->status = ProcessCalls(); // file download afterwards if (status.GetCode() == StatusCode::SUCCESS && !download_files.empty()) { @@ -364,14 +340,6 @@ auto Transaction::ExecuteAsynchronously() noexcept -> StatusCode { } } - auto *registration_response = - Arena::CreateMessage<RegisterFileUploadResponse>(get_arena()); - RegisterUploadFile(registration_response); - - // TODO(tf): if(registration_response.status != - // REGISTRATION_STATUS_ACCEPTED){ - // return StatusCode::FILE_UPLOAD_FAILED - // } FileExchangeClient download_client(file_service); for (const auto &item : download_files) { auto file_descriptor(item.second); @@ -379,10 +347,8 @@ auto Transaction::ExecuteAsynchronously() noexcept -> StatusCode { CAOSDB_LOG_INFO(logger_name) << "Downloading " << file_descriptor.local_path << ", " << out; auto file_download_status = download_client.download(file_descriptor); - if (static_cast<int>(static_cast<int>((file_download_status) == 0) == - StatusCode::SUCCESS) != 0) { + if (file_download_status != StatusCode::SUCCESS) { this->status = TransactionStatus::FILE_DOWNLOAD_ERROR(); - // TODO (tf) handle multiple errors? return StatusCode::EXECUTING; } } @@ -483,28 +449,75 @@ auto Transaction::WaitForIt() const noexcept -> TransactionStatus { return this->status; } -auto Transaction::RegisterUploadFile(RegisterFileUploadResponse *response) - -> void { - grpc::Status grpc_status; - CompletionQueue cq; +auto Transaction::ProcessCalls() -> TransactionStatus { + gpr_timespec deadline; + deadline.tv_sec = 1; + deadline.tv_nsec = 0; + deadline.clock_type = gpr_clock_type::GPR_TIMESPAN; - RegisterFileUploadRequest request; + TransactionStatus result = TransactionStatus::EXECUTING(); + handler_->Start(); + void *tag = nullptr; + bool ok = false; + while (true) { + switch (completion_queue.AsyncNext(&tag, &ok, deadline)) { + case NextStatus::GOT_EVENT: { + if (tag != nullptr) { + auto res = handler_->OnNext(ok); + if (!res) { + // The handler has finished it's work + result = handler_->GetStatus(); + handler_.reset(); + return result; + } + } else { + std::string description("Invalid tag delivered by notification queue."); + CAOSDB_LOG_ERROR(logger_name) << description; + handler_.reset(); + return TransactionStatus::RPC_ERROR(description); + } + } break; + case NextStatus::SHUTDOWN: { + CAOSDB_LOG_ERROR(logger_name) + << "Notification queue has been shut down unexpectedly."; + result = handler_->GetStatus(); + handler_.reset(); + return result; + } break; + case NextStatus::TIMEOUT: { + CAOSDB_LOG_DEBUG(logger_name) << "Timeout, waiting..."; + } break; + default: + CAOSDB_LOG_FATAL(logger_name) + << "Got an invalid NextStatus from CompletionQueue."; + result = handler_->GetStatus(); + handler_.reset(); + return result; + } + } + result = handler_->GetStatus(); + handler_.reset(); + return result; +} - grpc::ClientContext context; - std::unique_ptr<ClientAsyncResponseReader<RegisterFileUploadResponse>> rpc( - this->file_service->PrepareAsyncRegisterFileUpload(&context, request, &cq)); +Transaction::~Transaction() { + this->Cancel(); - rpc->StartCall(); + completion_queue.Shutdown(); - int tag = 1; - void *send_tag = static_cast<void *>(&tag); - rpc->Finish(response, &grpc_status, send_tag); - void *recv_tag = nullptr; + // drain the queue + void *ignoredTag = nullptr; bool ok = false; + while (completion_queue.Next(&ignoredTag, &ok)) { + ; + } +} - cq.Next(&recv_tag, &ok); - assert(recv_tag == send_tag); - assert(ok); +void Transaction::Cancel() { + // TODO(tf) State Canceled + if (handler_ != nullptr) { + handler_->Cancel(); + } } } // namespace caosdb::transaction diff --git a/src/caosdb/transaction_handler.cpp b/src/caosdb/transaction_handler.cpp new file mode 100644 index 0000000000000000000000000000000000000000..61b69a2e52d9c675d78c96e37329ae57f8cda04d --- /dev/null +++ b/src/caosdb/transaction_handler.cpp @@ -0,0 +1,42 @@ +#include "caosdb/transaction_handler.h" +#include "caosdb/logging.h" // for CAOSDB_LOG_TRACE +#include <boost/log/core/record.hpp> // for record +#include <boost/log/sources/record_ostream.hpp> // for basic_record_... +#include <boost/preprocessor/seq/limits/enum_256.hpp> // for BOOST_PP_SEQ_... +#include <boost/preprocessor/seq/limits/size_256.hpp> // for BOOST_PP_SEQ_... +#include <exception> // IWYU pragma: keep +// IWYU pragma: no_include <bits/exception.h> +#include <grpcpp/impl/codegen/async_unary_call.h> // for ClientAsyncRes... +#include <grpcpp/impl/codegen/completion_queue.h> // for CompletionQueue + +namespace caosdb::transaction { + +EntityTransactionHandler::EntityTransactionHandler( + HandlerTag tag, EntityTransactionService::Stub *stub, + grpc::CompletionQueue *completion_queue, MultiTransactionRequest *request, + MultiTransactionResponse *response) + : UnaryRpcHandler(completion_queue), tag_(tag), stub_(stub), + request_(request), response_(response) {} + +void EntityTransactionHandler::handleNewCallState() { + CAOSDB_LOG_TRACE(logger_name) + << "Enter EntityTransactionHandler::handleNewCallState with " + "CompletionQueue " + << completion_queue; + + rpc_ = stub_->PrepareAsyncMultiTransaction(&call_context, *request_, + completion_queue); + + state_ = CallState::CallComplete; + rpc_->StartCall(); + rpc_->Finish(response_, &status_, tag_); + + CAOSDB_LOG_TRACE(logger_name) + << "Leave EntityTransactionHandler::handleNewCallState"; +} + +void EntityTransactionHandler::handleReceivingFileState() { + // TODO(tf) remove +} + +} // namespace caosdb::transaction diff --git a/src/caosdb/unary_rpc_handler.cpp b/src/caosdb/unary_rpc_handler.cpp new file mode 100644 index 0000000000000000000000000000000000000000..b41c13af66cff101fc91fa0ee1d283ffc0289483 --- /dev/null +++ b/src/caosdb/unary_rpc_handler.cpp @@ -0,0 +1,90 @@ +#include "caosdb/unary_rpc_handler.h" +#include "caosdb/logging.h" // for CAOSDB_LOG_TRACE +#include "caosdb/status_code.h" // for GENERIC_RPC_E... +#include <boost/log/core/record.hpp> // for record +#include <boost/log/detail/attachable_sstream_buf.hpp> // for basic_ostring... +#include <boost/log/sources/record_ostream.hpp> // for basic_record_... +#include <boost/preprocessor/seq/limits/enum_256.hpp> // for BOOST_PP_SEQ_... +#include <boost/preprocessor/seq/limits/size_256.hpp> // for BOOST_PP_SEQ_... +// IWYU pragma: no_include <bits/exception.h> +#include <exception> // IWYU pragma: keep +#include <grpcpp/impl/codegen/client_context.h> // for ClientContext +#include <grpcpp/impl/codegen/status.h> // for Status +#include <grpcpp/impl/codegen/status_code_enum.h> // for OK, UNAUTHENT... +#include <iosfwd> // for streamsize +#include <string> // for string, opera... + +namespace caosdb::transaction { + +bool UnaryRpcHandler::OnNext(bool ok) { + try { + if (ok) { + if (state_ == CallState::NewCall) { + this->handleNewCallState(); + } else if (state_ == CallState::ReceivingFile) { + this->handleReceivingFileState(); + } else if (state_ == CallState::CallComplete) { + this->handleCallCompleteState(); + return false; + } + } else { + CAOSDB_LOG_ERROR(logger_name) + << "UnaryRpcHandler::OnNext(false)!. This should not happen."; + // TODO(tf) Handle this error: + // in CallComplete state: "Client-side Finish: ok should always be true" + // in ReceivingFile state: "ok indicates that the RPC is going to go to + // the wire. If it is false, it not going to the wire. This would happen + // if the channel is either permanently broken or transiently broken but + // with the fail-fast option. (Note that async unary RPCs don't post a CQ + // tag at this point, nor do client-streaming or bidi-streaming RPCs that + // have the initial metadata corked option set.)" + } + + return true; + } catch (std::exception &e) { + CAOSDB_LOG_ERROR(logger_name) + << "UnaryRpcHandler caught an exception: " << e.what(); + transaction_status = TransactionStatus::GENERIC_ERROR(e.what()); + state_ = CallState::CallComplete; + } catch (...) { + CAOSDB_LOG_ERROR(logger_name) + << "Transaction error: unknown exception caught"; + transaction_status = TransactionStatus::GENERIC_ERROR( + "UnaryRpcHandler caught an unknown exception"); + state_ = CallState::CallComplete; + } + + if (state_ != CallState::NewCall) { + call_context.TryCancel(); + } + + return false; +} + +void UnaryRpcHandler::Cancel() { call_context.TryCancel(); } + +void UnaryRpcHandler::handleCallCompleteState() { + CAOSDB_LOG_TRACE(logger_name) + << "Enter UnaryRpcHandler::handleCallCompleteState"; + + switch (status_.error_code()) { + case grpc::OK: + transaction_status = TransactionStatus::SUCCESS(); + CAOSDB_LOG_INFO(logger_name) << "UnaryRpcHandler finished successfully."; + break; + default: + auto code(static_cast<StatusCode>(status_.error_code())); + std::string description(get_status_description(code) + + " Original message: " + status_.error_message()); + transaction_status = TransactionStatus(code, description); + CAOSDB_LOG_ERROR(logger_name) + << "UnaryRpcHandler finished with an error (Code " << code + << "): " << description; + break; + } + + CAOSDB_LOG_TRACE(logger_name) + << "Leave UnaryRpcHandler::handleCallCompleteState"; +} + +} // namespace caosdb::transaction diff --git a/test/test_file_transmission.cpp b/test/test_file_transmission.cpp index 80f08457665f42d61cb2463397bfb306ad1a0331..c5afd8741d9803a9899a229c12d53f71a266317e 100644 --- a/test/test_file_transmission.cpp +++ b/test/test_file_transmission.cpp @@ -10,7 +10,7 @@ namespace fs = boost::filesystem; -namespace FileExchange { +namespace caosdb::transaction { class test_file_transmission : public ::testing::Test { protected: @@ -41,4 +41,4 @@ TEST_F(test_file_transmission, test_file_writer_reader) { } } -} // namespace FileExchange +} // namespace caosdb::transaction diff --git a/test/test_transaction.cpp b/test/test_transaction.cpp index b4b1c88bc41279de26063b624ac8660b362139d7..e3787966fb4e801d9d08afe692a94b3a39c1431e 100644 --- a/test/test_transaction.cpp +++ b/test/test_transaction.cpp @@ -23,26 +23,40 @@ #include "caosdb/entity/v1alpha1/main.pb.h" // for Entity #include "caosdb/exceptions.h" // for ConnectionError #include "caosdb/status_code.h" -#include "caosdb/transaction.h" // for Transaction -#include "caosdb/transaction_status.h" // for ConnectionError -#include "caosdb_test_utility.h" // for EXPECT_THROW_MESSAGE -#include "gtest/gtest-message.h" // for Message -#include "gtest/gtest-test-part.h" // for SuiteApiResolver, TestPa... -#include "gtest/gtest_pred_impl.h" // for Test, TestInfo, TEST -#include <memory> // for allocator, unique_ptr -#include <stdexcept> // for out_of_range -#include <string> // for string, basic_string -#include <utility> // for move -#include <vector> // for vector +#include "caosdb/transaction.h" // for Transaction +#include "caosdb/transaction_handler.h" // for MultiTransactionResponse +#include "caosdb/transaction_status.h" // for ConnectionError +#include "caosdb_test_utility.h" // for EXPECT_THROW_MESSAGE +#include "gtest/gtest-message.h" // for Message +#include "gtest/gtest-test-part.h" // for SuiteApiResolver, TestPa... +#include "gtest/gtest_pred_impl.h" // for Test, TestInfo, TEST +#include <memory> // for allocator, unique_ptr +#include <stdexcept> // for out_of_range +#include <string> // for string, basic_string +#include <utility> // for move +#include <vector> // for vector namespace caosdb::transaction { using caosdb::configuration::InsecureConnectionConfiguration; using caosdb::connection::Connection; using caosdb::entity::Entity; -using caosdb::exceptions::ConnectionError; using ProtoEntity = caosdb::entity::v1alpha1::Entity; using caosdb::entity::v1alpha1::RetrieveResponse; +TEST(test_transaction, create_transaction) { + const auto *host = "localhost"; + auto configuration = InsecureConnectionConfiguration(host, 8000); + Connection connection(configuration); + auto transaction = connection.CreateTransaction(); + + ASSERT_EQ(StatusCode::GO_ON, transaction->RetrieveById("100")); + EXPECT_THROW_MESSAGE( + transaction->Execute(), ConnectionError, + "The attempt to execute this transaction was not successful because the " + "connection to the server could not be established. " + "Original message: failed to connect to all addresses"); +} + TEST(test_transaction, test_multi_result_set) { std::vector<std::unique_ptr<Entity>> entities; for (int i = 0; i < 5; i++) { @@ -64,19 +78,6 @@ TEST(test_transaction, test_multi_result_set) { EXPECT_EQ(counter, 5); } -TEST(test_transaction, create_transaction) { - const auto *host = "localhost"; - auto configuration = InsecureConnectionConfiguration(host, 8000); - Connection connection(configuration); - auto transaction = connection.CreateTransaction(); - - ASSERT_EQ(StatusCode::GO_ON, transaction->RetrieveById("100")); - EXPECT_THROW_MESSAGE( - transaction->Execute(), ConnectionError, - "The attempt to execute this transaction was not successful because the " - "connection to the server could not be established."); -} - TEST(test_transaction, test_unavailable) { const auto *host = "localhost"; auto configuration = InsecureConnectionConfiguration(host, 8000); @@ -217,9 +218,9 @@ TEST(test_transaction, test_insert_with_file) { entity.SetRole("File"); entity.SetLocalPath(TEST_DATA_DIR + "/test.json"); - EXPECT_TRUE(transaction->upload_files.empty()); + EXPECT_TRUE(transaction->GetUploadFiles().empty()); transaction->InsertEntity(&entity); - EXPECT_EQ(transaction->upload_files.size(), 1); + EXPECT_EQ(transaction->GetUploadFiles().size(), 1); transaction->ExecuteAsynchronously(); EXPECT_EQ(transaction->GetStatus().GetCode(), StatusCode::FILE_UPLOAD_ERROR);