diff --git a/include/CMakeLists.txt b/include/CMakeLists.txt index a37100eaa065bc9757ca3f504c839a716adb9759..c08b7d67238979a5c2007b6a58d38734538f5b49 100644 --- a/include/CMakeLists.txt +++ b/include/CMakeLists.txt @@ -40,12 +40,10 @@ set(libcaosdb_INCL ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/filestreaming/UploadRequestHandler.h ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/filestreaming/DownloadRequestHandler.h ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/filestreaming/HandlerInterface.h - ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/filestreaming/RequestStatus.h ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/filestreaming/FileWriter.h ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/filestreaming/FileReader.h ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/filestreaming/FileLock.h ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/filestreaming/FileError.h - ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/filestreaming/FileManager.h ) # pass variable to parent scope diff --git a/include/caosdb/entity.h b/include/caosdb/entity.h index 28d54464f46f3c75d51078338b47118ae304759f..dac3e73d363a161ea69b3610d3bd2301aa94da72 100644 --- a/include/caosdb/entity.h +++ b/include/caosdb/entity.h @@ -29,17 +29,23 @@ #ifndef CAOSDB_ENTITY_H #define CAOSDB_ENTITY_H -#include "caosdb/entity/v1alpha1/main.pb.h" // for ProtoEntity, ProtoParent... -#include "caosdb/status_code.h" -#include "caosdb/logging.h" -#include "google/protobuf/message.h" // for RepeatedPtrField -#include "caosdb/message_code.h" // for get_message_code, Messag... -#include "google/protobuf/util/json_util.h" // for MessageToJsonString, Jso... -#include <boost/filesystem/path.hpp> // for path -#include <boost/filesystem.hpp> -#include <random> -#include <string> // for string -#include <stdexcept> // for out_of_range +#include "caosdb/entity/v1alpha1/main.pb.h" // for RepeatedPtrField +#include "caosdb/logging.h" // for CAOSDB_LOG_WARN +#include "caosdb/message_code.h" // for get_message_code +#include "caosdb/status_code.h" // for StatusCode +#include <boost/filesystem/operations.hpp> // for exists, is_di... +#include <boost/filesystem/path.hpp> // for path +#include <boost/log/core/record.hpp> // for record +#include <boost/log/detail/attachable_sstream_buf.hpp> // for basic_ostring... +#include <boost/log/sources/record_ostream.hpp> // for basic_record_... +#include <boost/preprocessor/seq/limits/enum_256.hpp> // for BOOST_PP_SEQ_... +#include <boost/preprocessor/seq/limits/size_256.hpp> // for BOOST_PP_SEQ_... +#include <google/protobuf/message.h> // for RepeatedPtrField +#include <google/protobuf/util/json_util.h> // for MessageToJson... +#include <iosfwd> // for streamsize +#include <random> // for mt19937, rand... +#include <stdexcept> // for out_of_range +#include <string> // for string, basic... namespace caosdb::entity { using boost::filesystem::exists; diff --git a/include/caosdb/filestreaming/Client.h b/include/caosdb/filestreaming/Client.h index 28507bf50043a3ad2330c8797869191b318511c3..35afc97069fb36523ce5e7fb4fb3223b709d6e3e 100644 --- a/include/caosdb/filestreaming/Client.h +++ b/include/caosdb/filestreaming/Client.h @@ -1,13 +1,9 @@ -#include "caosdb/filestreaming/HandlerInterface.h" -#include "caosdb/entity/v1alpha1/main.grpc.pb.h" -#include "caosdb/entity.h" -#include "caosdb/status_code.h" - -#include <grpcpp/grpcpp.h> - -#include <iostream> -#include <memory> -#include <string> +#include "caosdb/entity.h" // for FileDescriptor +#include "caosdb/entity/v1alpha1/main.grpc.pb.h" // for FileTransmissionS... +#include "caosdb/filestreaming/HandlerInterface.h" // for HandlerInterface +#include "caosdb/status_code.h" // for StatusCode +#include <grpcpp/impl/codegen/completion_queue.h> // for CompletionQueue +#include <memory> // for shared_ptr, uniqu... namespace FileExchange { using caosdb::StatusCode; diff --git a/include/caosdb/filestreaming/DownloadRequestHandler.h b/include/caosdb/filestreaming/DownloadRequestHandler.h index dbb0d2e422b6111f63fd158a1cb8a15a7ba46025..45e52c5615575e05096eebe6f1eb17c4311607ae 100644 --- a/include/caosdb/filestreaming/DownloadRequestHandler.h +++ b/include/caosdb/filestreaming/DownloadRequestHandler.h @@ -1,9 +1,13 @@ -#include "caosdb/filestreaming/HandlerInterface.h" -#include "caosdb/entity/v1alpha1/main.grpc.pb.h" -#include "caosdb/entity.h" -#include "caosdb/filestreaming/FileWriter.h" - -#include <grpcpp/grpcpp.h> +#include "caosdb/entity.h" // for FileDescriptor +#include "caosdb/entity/v1alpha1/main.grpc.pb.h" // for FileTransmissionS... +#include "caosdb/entity/v1alpha1/main.pb.h" // for FileDownloadResponse +#include "caosdb/filestreaming/FileWriter.h" // for FileWriter +#include "caosdb/filestreaming/HandlerInterface.h" // for HandlerTag, Handl... +#include <grpcpp/impl/codegen/async_stream.h> // for ClientAsyncReader +#include <grpcpp/impl/codegen/client_context.h> // for ClientContext +#include <grpcpp/impl/codegen/completion_queue.h> // for CompletionQueue +#include <grpcpp/impl/codegen/status.h> // for Status +#include <memory> // for unique_ptr namespace FileExchange { using caosdb::entity::FileDescriptor; @@ -11,11 +15,11 @@ using caosdb::entity::v1alpha1::FileDownloadRequest; using caosdb::entity::v1alpha1::FileDownloadResponse; using caosdb::entity::v1alpha1::FileTransmissionService; -class DownloadRequestHandler : public HandlerInterface { +class DownloadRequestHandler final : public HandlerInterface { public: DownloadRequestHandler(HandlerTag tag, FileTransmissionService::Stub *stub, grpc::CompletionQueue *cq, - const FileDescriptor &file_descriptor); + FileDescriptor file_descriptor); ~DownloadRequestHandler() override = default; diff --git a/include/caosdb/filestreaming/FileManager.h b/include/caosdb/filestreaming/FileManager.h deleted file mode 100644 index 2f67e43f5c42acefb3adbfb8da121cd03310a4a3..0000000000000000000000000000000000000000 --- a/include/caosdb/filestreaming/FileManager.h +++ /dev/null @@ -1,53 +0,0 @@ -#pragma once - -#include "FileLock.h" -#include "FileReader.h" -#include "FileWriter.h" - -#include <memory> -#include <mutex> -#include <string> -#include <unordered_map> - -namespace FileExchange { - -class FileManager final { -public: - FileManager() = default; - - FileManager(const std::string &filename); - - template <class InputIt> FileManager(InputIt first, InputIt last); - - ~FileManager() = default; - - // FileManager is not copyable - FileManager(const FileManager &) = delete; - FileManager &operator=(const FileManager &) = delete; - - // FileManager is movable - FileManager(FileManager &&) = default; - FileManager &operator=(FileManager &&) = default; - - bool isManaged(const std::string &filename) const; - - std::unique_ptr<FileReader> readFile(const std::string &filename) const; - std::unique_ptr<FileWriter> writeFile(const std::string &filename); - -private: - void manageFile(const std::string &filename); - - mutable std::unordered_map<std::string, std::shared_ptr<FileMutex>> - fileMutexes_; - mutable std::mutex mgrMutex_; -}; - -template <class InputIt> FileManager::FileManager(InputIt first, InputIt last) { - while (first != last) { - const auto &filename = *first; - this->manageFile(filename); - ++first; - } -} - -} // namespace FileExchange diff --git a/include/caosdb/filestreaming/FileReader.h b/include/caosdb/filestreaming/FileReader.h index 3d12d42c497337cc16e029d3c83d0812f9eb9796..ff0ac49c48186c0481f9547c8946b7c71d68b562 100644 --- a/include/caosdb/filestreaming/FileReader.h +++ b/include/caosdb/filestreaming/FileReader.h @@ -1,13 +1,12 @@ #pragma once -#include "caosdb/filestreaming/FileLock.h" - -#include <boost/filesystem.hpp> -#include <boost/filesystem/fstream.hpp> -#include <boost/filesystem/string_file.hpp> -#include <fstream> -#include <memory> -#include <string> +#include "caosdb/filestreaming/FileLock.h" // for FileMutex, FileReadLock +#include <boost/filesystem/fstream.hpp> // for ifstream +#include <boost/filesystem/operations.hpp> // for exists +#include <boost/filesystem/path.hpp> // for path +#include <fstream> // for ifstream, size_t +#include <memory> // for shared_ptr +#include <string> // for string namespace FileExchange { using boost::filesystem::exists; @@ -16,9 +15,9 @@ using boost::filesystem::path; class FileReader final { public: - FileReader(const boost::filesystem::path &filename); - FileReader(const boost::filesystem::path &filename, - const std::shared_ptr<FileMutex> &mutexPtr); + FileReader(boost::filesystem::path filename); + FileReader(boost::filesystem::path filename, + std::shared_ptr<FileMutex> mutexPtr); ~FileReader() = default; diff --git a/include/caosdb/filestreaming/FileWriter.h b/include/caosdb/filestreaming/FileWriter.h index 1ef9bd55445514e118067d48d976e7b2b7a858b7..a757fe849e6bbf5c0e952bc097b4559872806971 100644 --- a/include/caosdb/filestreaming/FileWriter.h +++ b/include/caosdb/filestreaming/FileWriter.h @@ -1,22 +1,18 @@ #pragma once -#include "caosdb/filestreaming/FileLock.h" - -#include <boost/filesystem.hpp> -#include <boost/filesystem/fstream.hpp> -#include <boost/filesystem/string_file.hpp> - -#include <fstream> -#include <memory> -#include <string> +#include "caosdb/filestreaming/FileLock.h" // for FileMutex, FileWriteLock +#include <boost/filesystem/path.hpp> // for path +#include <fstream> // for ofstream +#include <memory> // for shared_ptr +#include <string> // for string namespace FileExchange { class FileWriter final { public: - FileWriter(const boost::filesystem::path &filename); - FileWriter(const boost::filesystem::path &filename, - const std::shared_ptr<FileMutex> &mutexPtr); + FileWriter(boost::filesystem::path filename); + FileWriter(boost::filesystem::path filename, + std::shared_ptr<FileMutex> mutexPtr); ~FileWriter() = default; diff --git a/include/caosdb/filestreaming/RequestStatus.h b/include/caosdb/filestreaming/RequestStatus.h deleted file mode 100644 index dad810da851c19d07155685b69e940e2901b5163..0000000000000000000000000000000000000000 --- a/include/caosdb/filestreaming/RequestStatus.h +++ /dev/null @@ -1,19 +0,0 @@ -#pragma once - -#include <grpcpp/grpcpp.h> - -namespace FileExchange { - -namespace RequestStatus { - -extern const grpc::Status FileHeaderExpected; -extern const grpc::Status FileNameEmpty; -extern const grpc::Status FileChunkExpected; -extern const grpc::Status FileNotFound; -extern const grpc::Status FileLocked; -extern const grpc::Status FileIOError; -extern const grpc::Status UnknownError; - -} // namespace RequestStatus - -} // namespace FileExchange diff --git a/include/caosdb/filestreaming/UploadRequestHandler.h b/include/caosdb/filestreaming/UploadRequestHandler.h index 4c35f5934fdaa01151f627835a65435a22cca554..1e83c432ee64fe39faeb1a16ddae01ac514675b3 100644 --- a/include/caosdb/filestreaming/UploadRequestHandler.h +++ b/include/caosdb/filestreaming/UploadRequestHandler.h @@ -1,9 +1,14 @@ -#include "caosdb/filestreaming/HandlerInterface.h" -#include "caosdb/entity.h" -#include "caosdb/entity/v1alpha1/main.grpc.pb.h" -#include "caosdb/filestreaming/FileManager.h" - -#include <grpcpp/grpcpp.h> +#include "caosdb/entity.h" // for FileDescriptor +#include "caosdb/entity/v1alpha1/main.grpc.pb.h" // for FileTransmissionS... +#include "caosdb/entity/v1alpha1/main.pb.h" // for FileUploadRequest +#include "caosdb/filestreaming/FileReader.h" // for FileReader +#include "caosdb/filestreaming/HandlerInterface.h" // for HandlerTag, Handl... +#include <cstdint> // for uint64_t +#include <grpcpp/impl/codegen/async_stream.h> // for ClientAsyncWriter +#include <grpcpp/impl/codegen/client_context.h> // for ClientContext +#include <grpcpp/impl/codegen/completion_queue.h> // for CompletionQueue +#include <grpcpp/impl/codegen/status.h> // for Status +#include <memory> // for unique_ptr namespace FileExchange { using caosdb::entity::FileDescriptor; @@ -11,11 +16,11 @@ using caosdb::entity::v1alpha1::FileTransmissionService; using caosdb::entity::v1alpha1::FileUploadRequest; using caosdb::entity::v1alpha1::FileUploadResponse; -class UploadRequestHandler : public HandlerInterface { +class UploadRequestHandler final : public HandlerInterface { public: UploadRequestHandler(HandlerTag tag, FileTransmissionService::Stub *stub, grpc::CompletionQueue *cq, - const FileDescriptor &file_descriptor); + FileDescriptor file_descriptor); ~UploadRequestHandler() override = default; @@ -61,7 +66,7 @@ protected: FileDescriptor file_descriptor_; - unsigned long long bytesToSend_; + uint64_t bytesToSend_; }; } // namespace FileExchange diff --git a/include/caosdb/protobuf_helper.h b/include/caosdb/protobuf_helper.h index 838908e157b408c4ef7eb6f437cc58c664189608..bd3395f4de7e8cd586a613869e84b1d5ffeb729b 100644 --- a/include/caosdb/protobuf_helper.h +++ b/include/caosdb/protobuf_helper.h @@ -22,8 +22,8 @@ #ifndef CAOSDB_PROTOBUF_HELPER_H #define CAOSDB_PROTOBUF_HELPER_H -#include <google/protobuf/arena.h> -#include <google/protobuf/util/json_util.h> // for MessageToJsonString, Jso... +#include <google/protobuf/arena.h> // for Arena +#include <google/protobuf/extension_set.h> // for Arena #define CAOSDB_DEBUG_MESSAGE_STRING(message, out) \ std::string out; \ diff --git a/include/caosdb/transaction.h b/include/caosdb/transaction.h index 3fd1e52649de4536a594542426dc12cf2d720092..9073b73752119fc29222589747d2cb65c1c54a74 100644 --- a/include/caosdb/transaction.h +++ b/include/caosdb/transaction.h @@ -20,24 +20,26 @@ */ #ifndef CAOSDB_TRANSACTION_H #define CAOSDB_TRANSACTION_H -#include "boost/log/core/record.hpp" // for record -#include "boost/log/sources/record_ostream.hpp" // for basic_record_o... -#include "boost/preprocessor/seq/limits/enum_256.hpp" // for BOOST_PP_SEQ_E... -#include "boost/preprocessor/seq/limits/size_256.hpp" // for BOOST_PP_SEQ_S... -#include "caosdb/entity.h" // for Entity -#include "caosdb/logging.h" -#include "caosdb/entity/v1alpha1/main.grpc.pb.h" // for EntityTransactionSe... -#include "caosdb/entity/v1alpha1/main.pb.h" // for Entity, RetrieveReq... -#include "caosdb/transaction_status.h" // for TransactionStatus -#include "caosdb/status_code.h" // for StatusCode -#include "google/protobuf/util/json_util.h" // for MessageToJsonString, Jso... -#include <iterator> + +#include "caosdb/entity.h" // for Entity, FileDe... +#include "caosdb/entity/v1alpha1/main.grpc.pb.h" // for EntityTransact... +#include "caosdb/entity/v1alpha1/main.pb.h" // for MultiTransacti... +#include "caosdb/logging.h" // for CAOSDB_LOG_ERR... +#include "caosdb/status_code.h" // for StatusCode +#include "caosdb/transaction_status.h" // for StatusCode +#include <boost/log/core/record.hpp> // for record +#include <boost/log/sources/record_ostream.hpp> // for basic_record_o... +#include <boost/preprocessor/seq/limits/enum_256.hpp> // for BOOST_PP_SEQ_E... +#include <boost/preprocessor/seq/limits/size_256.hpp> // for BOOST_PP_SEQ_S... // IWYU pragma: no_include <ext/alloc_traits.h> -#include <memory> // for shared_ptr, unique_ptr -#include <stdexcept> // for out_of_range -#include <string> // for string -#include <utility> // for move -#include <vector> // for vector +#include <google/protobuf/util/json_util.h> // for MessageToJsonS... +#include <iterator> // for iterator, next +#include <map> // for map +#include <memory> // for unique_ptr +#include <stdexcept> // for out_of_range +#include <string> // for string +#include <utility> // for move +#include <vector> // for vector /** * Do all necessary checks and assure that another retrieval (by id or by diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 7d35fadadd677321f9809d3d1ee2dfcec0002909..9f447d60162f7b49125500767508af42c64d8fb5 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -31,10 +31,8 @@ set(libcaosdb_SRC ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/filestreaming/Client.cpp ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/filestreaming/UploadRequestHandler.cpp ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/filestreaming/DownloadRequestHandler.cpp - ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/filestreaming/RequestStatus.cpp ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/filestreaming/FileWriter.cpp ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/filestreaming/FileReader.cpp - ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/filestreaming/FileManager.cpp ) # pass variable to parent scope diff --git a/src/caosdb/entity.cpp b/src/caosdb/entity.cpp index 23984f25d0f7c9eaed7c7d78f788d35956f1c96e..dc67f7df69985f5e0d0cefa9ea7f7820b8bdb7ef 100644 --- a/src/caosdb/entity.cpp +++ b/src/caosdb/entity.cpp @@ -20,9 +20,10 @@ * */ #include "caosdb/entity.h" -#include "caosdb/entity/v1alpha1/main.pb.h" // for Parent, Arena::CreateMay... +#include "caosdb/entity/v1alpha1/main.pb.h" // for RepeatedPtrField, Property #include "caosdb/protobuf_helper.h" // for get_arena -#include "google/protobuf/arena.h" // for Arena +#include <google/protobuf/arena.h> // for Arena +#include <new> // for operator new namespace caosdb::entity { using caosdb::entity::v1alpha1::IdResponse; diff --git a/src/caosdb/filestreaming/Client.cpp b/src/caosdb/filestreaming/Client.cpp index 4bfc55fad6e65d5fe91695eb98b48513bb686a7a..8c668da379744a5765bcb03a4120337a8078fd04 100644 --- a/src/caosdb/filestreaming/Client.cpp +++ b/src/caosdb/filestreaming/Client.cpp @@ -1,8 +1,15 @@ #include "caosdb/filestreaming/Client.h" -#include "caosdb/logging.h" -#include "caosdb/status_code.h" -#include "caosdb/filestreaming/DownloadRequestHandler.h" -#include "caosdb/filestreaming/UploadRequestHandler.h" +#include "caosdb/filestreaming/DownloadRequestHandler.h" // for DownloadReq... +#include "caosdb/filestreaming/UploadRequestHandler.h" // for UploadReque... +#include "caosdb/logging.h" // for CAOSDB_LOG_... +#include "caosdb/status_code.h" // for StatusCode +#include <boost/log/core/record.hpp> // for record +#include <boost/log/sources/record_ostream.hpp> // for basic_recor... +#include <boost/preprocessor/seq/limits/enum_256.hpp> // for BOOST_PP_SE... +#include <boost/preprocessor/seq/limits/size_256.hpp> // for BOOST_PP_SE... +#include <exception> // IWYU pragma: keep +// IWYU pragma: no_include <bits/exception.h> +#include <grpcpp/impl/codegen/completion_queue.h> // for CompletionQ... namespace FileExchange { using caosdb::StatusCode; @@ -15,8 +22,9 @@ FileExchangeClient::~FileExchangeClient() { // drain the queue void *ignoredTag = nullptr; bool ok = false; - while (cq_.Next(&ignoredTag, &ok)) + while (cq_.Next(&ignoredTag, &ok)) { ; + } } StatusCode FileExchangeClient::upload(const FileDescriptor &file_descriptor) { @@ -53,11 +61,11 @@ int FileExchangeClient::processMessages() { bool ok = false; while (true) { if (cq_.Next(&tag, &ok)) { - if (tag) { - // TODO assert + if (tag != nullptr) { + // TODO(tf): assert auto res = handler_->onNext(ok); if (!res) { - // TODO comment + // TODO(tf): comment handler_.reset(); break; } diff --git a/src/caosdb/filestreaming/DownloadRequestHandler.cpp b/src/caosdb/filestreaming/DownloadRequestHandler.cpp index 74be7e7207a67130cc0c823f5fd5b2ca0565b6a7..6b59210a6525ff823942e56624f6c227f7660060 100644 --- a/src/caosdb/filestreaming/DownloadRequestHandler.cpp +++ b/src/caosdb/filestreaming/DownloadRequestHandler.cpp @@ -1,10 +1,26 @@ #include "caosdb/filestreaming/DownloadRequestHandler.h" -#include "caosdb/protobuf_helper.h" -#include "caosdb/exceptions.h" -#include "caosdb/status_code.h" -#include "caosdb/logging.h" - -#include <iostream> +#include "caosdb/exceptions.h" // for Exception +#include "caosdb/logging.h" // for CAOSDB_LOG_TRACE +#include "caosdb/protobuf_helper.h" // for get_arena +#include "caosdb/status_code.h" // for GENERIC_RPC_E... +#include <boost/filesystem/path.hpp> // for operator<<, path +#include <boost/log/core/record.hpp> // for record +#include <boost/log/detail/attachable_sstream_buf.hpp> // for basic_ostring... +#include <boost/log/sources/record_ostream.hpp> // for basic_record_... +#include <boost/preprocessor/seq/limits/enum_256.hpp> // for BOOST_PP_SEQ_... +#include <boost/preprocessor/seq/limits/size_256.hpp> // for BOOST_PP_SEQ_... +#include <exception> // IWYU pragma: keep +// IWYU pragma: no_include <bits/exception.h> +#include <google/protobuf/arena.h> // for Arena +#include <grpcpp/impl/codegen/async_stream.h> // for ClientAsyncRe... +#include <grpcpp/impl/codegen/client_context.h> // for ClientContext +#include <grpcpp/impl/codegen/completion_queue.h> // for CompletionQueue +#include <grpcpp/impl/codegen/status.h> // for Status +#include <grpcpp/impl/codegen/status_code_enum.h> // for OK, UNAUTHENT... +#include <iostream> // for char_traits +#include <stdexcept> // for runtime_error +#include <string> // for string, opera... +#include <utility> // for move namespace FileExchange { using caosdb::StatusCode; @@ -16,11 +32,12 @@ using google::protobuf::Arena; DownloadRequestHandler::DownloadRequestHandler( HandlerTag tag, FileTransmissionService::Stub *stub, - grpc::CompletionQueue *cq, const FileDescriptor &file_descriptor) - : tag_(tag), stub_(stub), cq_(cq), state_(CallState::NewCall), - file_descriptor_(file_descriptor), bytesReceived_(0), + grpc::CompletionQueue *cq, FileDescriptor file_descriptor) + : tag_(tag), stub_(stub), cq_(cq), request_(Arena::CreateMessage<FileDownloadRequest>(get_arena())), - response_(Arena::CreateMessage<FileDownloadResponse>(get_arena())) { + response_(Arena::CreateMessage<FileDownloadResponse>(get_arena())), + state_(CallState::NewCall), file_descriptor_(std::move(file_descriptor)), + bytesReceived_(0) { this->onNext(true); } @@ -35,7 +52,7 @@ bool DownloadRequestHandler::onNext(bool ok) { this->handleReceivingFileState(); } else if (state_ == CallState::CallComplete) { this->handleCallCompleteState(); - return false; // TODO comment + return false; // TODO(tf): comment } } else { state_ = CallState::CallComplete; @@ -55,7 +72,7 @@ bool DownloadRequestHandler::onNext(bool ok) { } if (state_ == CallState::NewCall) { - // TODO comment + // TODO(tf): comment return false; } @@ -71,7 +88,6 @@ void DownloadRequestHandler::handleNewCallState() { << "Enter DownloadRequestHandler::handleNewCallState. local_path = " << file_descriptor_.local_path << ", download_id = " << file_descriptor_.file_transmission_id; - const std::size_t ServerDefaultChunkSize = 0; fileWriter_ = std::make_unique<FileWriter>(file_descriptor_.local_path); CAOSDB_DEBUG_MESSAGE_STRING(*request_, request_out) @@ -105,7 +121,7 @@ void DownloadRequestHandler::handleReceivingFileState() { CAOSDB_LOG_TRACE(logger_name) << "Enter DownloadRequestHandler::handleReceivingFileState"; if (response_->has_chunk()) { - auto &chunkData = response_->chunk().data(); + const auto &chunkData = response_->chunk().data(); if (chunkData.empty()) { CAOSDB_LOG_DEBUG(logger_name) << "Received an empty FileChunk, ignoring"; } else { diff --git a/src/caosdb/filestreaming/FileManager.cpp b/src/caosdb/filestreaming/FileManager.cpp deleted file mode 100644 index 47a013aa119374226b41155c4d31e4857d706b5d..0000000000000000000000000000000000000000 --- a/src/caosdb/filestreaming/FileManager.cpp +++ /dev/null @@ -1,59 +0,0 @@ -#include "caosdb/filestreaming/FileManager.h" -#include "caosdb/filestreaming/FileError.h" - -using namespace FileExchange; - -using FileMutexSP = std::shared_ptr<FileMutex>; - -FileManager::FileManager(const std::string &filename) { - this->manageFile(filename); -} - -bool FileManager::isManaged(const std::string &filename) const { - std::lock_guard<std::mutex> lock(mgrMutex_); - - return fileMutexes_.count(filename) > 0; -} - -std::unique_ptr<FileReader> -FileManager::readFile(const std::string &filename) const { - FileMutexSP *mutexPtr = nullptr; - - { - std::lock_guard<std::mutex> lock(mgrMutex_); - - auto it = fileMutexes_.find(filename); - if (it == fileMutexes_.end()) { - std::string message = "File is not managed by the server: " + filename; - throw FileNotManagedError(message); - } else { - mutexPtr = &it->second; - } - } - - return std::make_unique<FileReader>(filename, *mutexPtr); -} - -std::unique_ptr<FileWriter> -FileManager::writeFile(const std::string &filename) { - FileMutexSP *mutexPtr = nullptr; - - { - std::lock_guard<std::mutex> lock(mgrMutex_); - - auto it = fileMutexes_.find(filename); - if (it == fileMutexes_.end()) { - it = fileMutexes_.emplace(filename, std::make_shared<FileMutex>()).first; - } - - mutexPtr = &it->second; - } - - return std::make_unique<FileWriter>(filename, *mutexPtr); -} - -void FileManager::manageFile(const std::string &filename) { - if (fileMutexes_.count(filename) == 0) { - fileMutexes_.emplace(filename, std::make_shared<FileMutex>()); - } -} diff --git a/src/caosdb/filestreaming/FileReader.cpp b/src/caosdb/filestreaming/FileReader.cpp index 33df9f4be4c8a33ab6f8ec5843a4b729c45c1139..c7d3648739d9965fff1e205ced5603c6864406bf 100644 --- a/src/caosdb/filestreaming/FileReader.cpp +++ b/src/caosdb/filestreaming/FileReader.cpp @@ -1,16 +1,19 @@ #include "caosdb/filestreaming/FileReader.h" -#include "caosdb/filestreaming/FileError.h" +#include "caosdb/filestreaming/FileError.h" // for FileIOError, FileLockError +#include <boost/filesystem/path.hpp> // for path +#include <mutex> // for try_to_lock +#include <utility> // for move -using namespace FileExchange; +namespace FileExchange { -FileReader::FileReader(const boost::filesystem::path &filename) - : filename_(filename), size_(0) { +FileReader::FileReader(boost::filesystem::path filename) + : filename_(std::move(filename)), size_(0) { this->openFile(); } -FileReader::FileReader(const boost::filesystem::path &filename, - const std::shared_ptr<FileMutex> &mutexPtr) - : filename_(filename), size_(0), mutexPtr_(mutexPtr) { +FileReader::FileReader(boost::filesystem::path filename, + std::shared_ptr<FileMutex> mutexPtr) + : filename_(std::move(filename)), size_(0), mutexPtr_(std::move(mutexPtr)) { this->openFile(); } @@ -50,3 +53,5 @@ std::size_t FileReader::read(std::string &buffer) { return bytesRead; } + +} // namespace FileExchange diff --git a/src/caosdb/filestreaming/FileWriter.cpp b/src/caosdb/filestreaming/FileWriter.cpp index ade045ab6c19fd608818893f4ae7f7036c1c601a..a832934726cd0b49c80000d98204ab3a1db0e6fa 100644 --- a/src/caosdb/filestreaming/FileWriter.cpp +++ b/src/caosdb/filestreaming/FileWriter.cpp @@ -1,16 +1,19 @@ #include "caosdb/filestreaming/FileWriter.h" -#include "caosdb/filestreaming/FileError.h" +#include "caosdb/filestreaming/FileError.h" // for FileIOError, FileLockError +#include <boost/filesystem/path.hpp> // for path +#include <mutex> // for try_to_lock +#include <utility> // for move -using namespace FileExchange; +namespace FileExchange { -FileWriter::FileWriter(const boost::filesystem::path &filename) - : filename_(filename) { +FileWriter::FileWriter(boost::filesystem::path filename) + : filename_(std::move(filename)) { this->openFile(); } -FileWriter::FileWriter(const boost::filesystem::path &filename, - const std::shared_ptr<FileMutex> &mutexPtr) - : filename_(filename), mutexPtr_(mutexPtr) { +FileWriter::FileWriter(boost::filesystem::path filename, + std::shared_ptr<FileMutex> mutexPtr) + : filename_(std::move(filename)), mutexPtr_(std::move(mutexPtr)) { this->openFile(); } @@ -36,3 +39,5 @@ void FileWriter::write(const std::string &buffer) { } } } + +} // namespace FileExchange diff --git a/src/caosdb/filestreaming/RequestStatus.cpp b/src/caosdb/filestreaming/RequestStatus.cpp deleted file mode 100644 index e6f730832d317e6568e76c0b378ef91df4479d36..0000000000000000000000000000000000000000 --- a/src/caosdb/filestreaming/RequestStatus.cpp +++ /dev/null @@ -1,19 +0,0 @@ -#include "caosdb/filestreaming/RequestStatus.h" - -namespace FileExchange { - -namespace RequestStatus { - -const grpc::Status FileHeaderExpected(grpc::INVALID_ARGUMENT, - "File header expected"); -const grpc::Status FileNameEmpty(grpc::INVALID_ARGUMENT, "File name is empty"); -const grpc::Status FileChunkExpected(grpc::INVALID_ARGUMENT, - "File chunk expected"); -const grpc::Status FileNotFound(grpc::NOT_FOUND, "File not found"); -const grpc::Status FileLocked(grpc::UNAVAILABLE, "File is locked"); -const grpc::Status FileIOError(grpc::ABORTED, "File IO error"); -const grpc::Status UnknownError(grpc::UNKNOWN, "Unknown error"); - -} // namespace RequestStatus - -} // namespace FileExchange diff --git a/src/caosdb/filestreaming/UploadRequestHandler.cpp b/src/caosdb/filestreaming/UploadRequestHandler.cpp index 30f4ce0d972887e64eaf93c25d9acaba19872eba..cefb63e4a39d89140623ce2aa6f08f500c3f76a2 100644 --- a/src/caosdb/filestreaming/UploadRequestHandler.cpp +++ b/src/caosdb/filestreaming/UploadRequestHandler.cpp @@ -1,11 +1,28 @@ #include "caosdb/filestreaming/UploadRequestHandler.h" -#include "caosdb/protobuf_helper.h" -#include "caosdb/exceptions.h" -#include "caosdb/status_code.h" -#include "caosdb/logging.h" - -#include <algorithm> -#include <iostream> +#include "caosdb/exceptions.h" // for Exception +#include "caosdb/logging.h" // for CAOSDB_LOG_ERROR +#include "caosdb/protobuf_helper.h" // for get_arena +#include "caosdb/status_code.h" // for GENERIC_RPC_E... +#include <algorithm> // for min +#include <boost/filesystem/path.hpp> // for operator<<, path +#include <boost/log/core/record.hpp> // for record +#include <boost/log/detail/attachable_sstream_buf.hpp> // for basic_ostring... +#include <boost/log/sources/record_ostream.hpp> // for basic_record_... +#include <boost/preprocessor/seq/limits/enum_256.hpp> // for BOOST_PP_SEQ_... +#include <boost/preprocessor/seq/limits/size_256.hpp> // for BOOST_PP_SEQ_... +#include <cstdint> // for uint64_t +#include <exception> // IWYU pragma: keep +// IWYU pragma: no_include <bits/exception.h> +#include <google/protobuf/arena.h> // for Arena +#include <grpcpp/impl/codegen/async_stream.h> // for ClientAsyncWr... +#include <grpcpp/impl/codegen/call_op_set.h> // for WriteOptions +#include <grpcpp/impl/codegen/client_context.h> // for ClientContext +#include <grpcpp/impl/codegen/completion_queue.h> // for CompletionQueue +#include <grpcpp/impl/codegen/status.h> // for Status +#include <grpcpp/impl/codegen/status_code_enum.h> // for OK, UNAUTHENT... +#include <iostream> // for endl, streamsize +#include <string> // for basic_string +#include <utility> // for move namespace FileExchange { using caosdb::StatusCode; @@ -15,13 +32,15 @@ using caosdb::exceptions::Exception; using caosdb::utility::get_arena; using google::protobuf::Arena; -UploadRequestHandler::UploadRequestHandler( - HandlerTag tag, FileTransmissionService::Stub *stub, - grpc::CompletionQueue *cq, const FileDescriptor &file_descriptor) - : tag_(tag), stub_(stub), cq_(cq), state_(CallState::NewCall), - file_descriptor_(file_descriptor), bytesToSend_(0), +UploadRequestHandler::UploadRequestHandler(HandlerTag tag, + FileTransmissionService::Stub *stub, + grpc::CompletionQueue *cq, + FileDescriptor file_descriptor) + : tag_(tag), stub_(stub), cq_(cq), request_(Arena::CreateMessage<FileUploadRequest>(get_arena())), - response_(Arena::CreateMessage<FileUploadResponse>(get_arena())) { + response_(Arena::CreateMessage<FileUploadResponse>(get_arena())), + state_(CallState::NewCall), file_descriptor_(std::move(file_descriptor)), + bytesToSend_(0) { this->onNext(true); } @@ -29,7 +48,7 @@ bool UploadRequestHandler::onNext(bool ok) { try { if (state_ == CallState::CallComplete) { this->handleCallCompleteState(); - return false; // TODO comment + return false; // TODO(tf): comment } else if (ok) { if (state_ == CallState::NewCall) { this->handleNewCallState(); @@ -56,7 +75,7 @@ bool UploadRequestHandler::onNext(bool ok) { } if (state_ == CallState::NewCall) { - // TODO comment + // TODO(tf): comment return false; } @@ -93,12 +112,12 @@ void UploadRequestHandler::handleSendingHeaderState() { } void UploadRequestHandler::handleSendingFileState() { - const unsigned long long DefaultChunkSize = 4 * 1024; // 4K + const uint64_t DefaultChunkSize = 4 * 1024; // 4K auto chunkSize = std::min(DefaultChunkSize, bytesToSend_); request_->Clear(); - auto buffer = request_->mutable_chunk()->mutable_data(); + auto *buffer = request_->mutable_chunk()->mutable_data(); buffer->resize(chunkSize); fileReader_->read(*buffer); diff --git a/src/caosdb/protobuf_helper.cpp b/src/caosdb/protobuf_helper.cpp index a9ad000595285f1c0fb6402182e0d48294daa37d..e8bbd07834ead9b561c7e8769ed834527337f7a6 100644 --- a/src/caosdb/protobuf_helper.cpp +++ b/src/caosdb/protobuf_helper.cpp @@ -19,7 +19,8 @@ * */ #include "caosdb/protobuf_helper.h" -#include <google/protobuf/arena.h> +#include <google/protobuf/arena.h> // for Arena +#include <google/protobuf/extension_set.h> // for Arena namespace caosdb::utility { diff --git a/src/caosdb/transaction.cpp b/src/caosdb/transaction.cpp index 15f320ad55089510e441afaf0000c0b5045a8ff6..8122b130fc94078f76a81876fbbcae4e4fc38f58 100644 --- a/src/caosdb/transaction.cpp +++ b/src/caosdb/transaction.cpp @@ -18,25 +18,32 @@ * */ #include "caosdb/transaction.h" -#include "caosdb/entity/v1alpha1/main.grpc.pb.h" // for EntityTransactionS... -#include "caosdb/entity/v1alpha1/main.pb.h" // for SingleRetrieveRequest -#include "caosdb/filestreaming/Client.h" -#include "caosdb/logging.h" -#include "caosdb/protobuf_helper.h" // for get_arena -#include "caosdb/status_code.h" // for StatusCode, AUTHEN... -#include "google/protobuf/arena.h" // for Arena -#include "grpcpp/grpcpp.h" // for CompletionQueue -#include "grpcpp/impl/codegen/async_unary_call.h" // for ClientAsyncRespons... -#include "grpcpp/impl/codegen/client_context.h" // for ClientContext -#include "grpcpp/impl/codegen/completion_queue.h" // for CompletionQueue -#include "grpcpp/impl/codegen/status.h" // for Status -#include "grpcpp/impl/codegen/status_code_enum.h" // for StatusCode, UNAUTH... -#include <cassert> // for assert -#include <google/protobuf/util/json_util.h> // for MessageToJsonString, Jso... -#include <map> // for map -#include <memory> // for allocator, unique_ptr -#include <stdexcept> // for out_of_range -#include <utility> // for move +#include "caosdb/entity/v1alpha1/main.grpc.pb.h" // for EntityTransac... +#include "caosdb/entity/v1alpha1/main.pb.h" // for TransactionRe... +#include "caosdb/filestreaming/Client.h" // for FileExchangeC... +#include "caosdb/logging.h" // for CAOSDB_LOG_FATAL +#include "caosdb/protobuf_helper.h" // for get_arena +#include "caosdb/status_code.h" // for StatusCode +#include <algorithm> // for max +#include <boost/filesystem/path.hpp> // for operator<<, path +#include <boost/log/core/record.hpp> // for record +#include <boost/log/detail/attachable_sstream_buf.hpp> // for basic_ostring... +#include <boost/log/sources/record_ostream.hpp> // for basic_record_... +#include <boost/preprocessor/seq/limits/enum_256.hpp> // for BOOST_PP_SEQ_... +#include <boost/preprocessor/seq/limits/size_256.hpp> // for BOOST_PP_SEQ_... +#include <cassert> // for assert +#include <google/protobuf/arena.h> // for Arena +#include <grpcpp/grpcpp.h> // for CompletionQueue +#include <grpcpp/impl/codegen/async_unary_call.h> // for ClientAsyncRe... +#include <grpcpp/impl/codegen/client_context.h> // for ClientContext +#include <grpcpp/impl/codegen/completion_queue.h> // for CompletionQueue +#include <grpcpp/impl/codegen/status.h> // for Status +#include <grpcpp/impl/codegen/status_code_enum.h> // for StatusCode +#include <iosfwd> // for streamsize +#include <map> // for map, operator!= +#include <memory> // for unique_ptr +#include <stdexcept> // for out_of_range +#include <utility> // for move, pair namespace caosdb { @@ -252,12 +259,13 @@ auto Transaction::ExecuteAsynchronously() noexcept -> StatusCode { this->status = TransactionStatus::EXECUTING(); // upload files first - if (upload_files.size() > 0) { + if (!upload_files.empty()) { auto *registration_response = Arena::CreateMessage<RegisterFileUploadResponse>(get_arena()); RegisterUploadFile(registration_response); - // TODO if(registration_response.status != REGISTRATION_STATUS_ACCEPTED){ + // TODO(tf): if(registration_response.status != + // REGISTRATION_STATUS_ACCEPTED){ // return StatusCode::FILE_UPLOAD_FAILED // } FileExchangeClient upload_client(file_service); @@ -267,7 +275,8 @@ auto Transaction::ExecuteAsynchronously() noexcept -> StatusCode { CAOSDB_LOG_INFO(logger_name) << "Uploading " << file_descriptor.local_path; auto file_upload_status = upload_client.upload(file_descriptor); - if (!file_upload_status == StatusCode::SUCCESS) { + if (static_cast<int>(static_cast<int>((file_upload_status) == 0) == + StatusCode::SUCCESS) != 0) { this->status = TransactionStatus::FILE_UPLOAD_ERROR(); return StatusCode::EXECUTING; } @@ -312,7 +321,7 @@ auto Transaction::ExecuteAsynchronously() noexcept -> StatusCode { } // file download afterwards - if (status.GetCode() == StatusCode::SUCCESS && download_files.size() > 0) { + if (status.GetCode() == StatusCode::SUCCESS && !download_files.empty()) { // run over all retrieved entities and get the download_id for (auto sub_response : *(response->mutable_responses())) { if (sub_response.wrapped_response_case() == @@ -320,7 +329,7 @@ auto Transaction::ExecuteAsynchronously() noexcept -> StatusCode { if (sub_response.retrieve_response() .entity_response() .has_download_id()) { - auto entity_response = + auto *entity_response = sub_response.mutable_retrieve_response()->mutable_entity_response(); auto entity_id = entity_response->entity().id(); download_files[entity_id].file_transmission_id = @@ -334,17 +343,19 @@ auto Transaction::ExecuteAsynchronously() noexcept -> StatusCode { Arena::CreateMessage<RegisterFileUploadResponse>(get_arena()); RegisterUploadFile(registration_response); - // TODO if(registration_response.status != REGISTRATION_STATUS_ACCEPTED){ + // TODO(tf): if(registration_response.status != + // REGISTRATION_STATUS_ACCEPTED){ // return StatusCode::FILE_UPLOAD_FAILED // } FileExchangeClient download_client(file_service); - for (auto item : download_files) { + for (const auto &item : download_files) { auto file_descriptor(item.second); CAOSDB_DEBUG_MESSAGE_STRING(*file_descriptor.file_transmission_id, out) CAOSDB_LOG_INFO(logger_name) << "Downloading " << file_descriptor.local_path << ", " << out; auto file_download_status = download_client.download(file_descriptor); - if (!file_download_status == StatusCode::SUCCESS) { + if (static_cast<int>(static_cast<int>((file_download_status) == 0) == + StatusCode::SUCCESS) != 0) { this->status = TransactionStatus::FILE_DOWNLOAD_ERROR(); // TODO (tf) handle multiple errors? return StatusCode::EXECUTING; @@ -373,7 +384,7 @@ auto Transaction::WaitForIt() const noexcept -> TransactionStatus { if (!id.empty() && download_files.count(id) == 1) { const auto &local_path = download_files.at(id).local_path; const auto &unique_result_set = - dynamic_cast<const UniqueResult &>(*result_set.get()); + dynamic_cast<const UniqueResult &>(*result_set); unique_result_set.entity->SetLocalPath(local_path); } } break; diff --git a/test/test_entity.cpp b/test/test_entity.cpp index 0b81f68540a22dd4c6ed9efb6f72dd0b738edf9b..7930ab52d242a19f1833e77aedaabf34c56608c8 100644 --- a/test/test_entity.cpp +++ b/test/test_entity.cpp @@ -26,12 +26,14 @@ #include "caosdb/entity/v1alpha1/main.pb.h" // for IdResponse, Message #include "caosdb/message_code.h" // for MessageCode #include "caosdb/protobuf_helper.h" // for get_arena +#include "caosdb/status_code.h" // for StatusCode, FILE_DO... #include "caosdb/transaction.h" // for Transaction -#include "google/protobuf/arena.h" // for Arena -#include "gtest/gtest-message.h" // for Message -#include "gtest/gtest-test-part.h" // for TestPartResult, Sui... -#include "gtest/gtest_pred_impl.h" // for Test, EXPECT_EQ +#include <google/protobuf/arena.h> // for Arena +#include <gtest/gtest-message.h> // for Message +#include <gtest/gtest-test-part.h> // for TestPartResult, Sui... +#include <gtest/gtest_pred_impl.h> // for Test, EXPECT_EQ #include <memory> // for allocator, shared_ptr +#include <string> // for operator+, string namespace caosdb::entity { using caosdb::entity::v1alpha1::IdResponse;