Skip to content
Snippets Groups Projects
Verified Commit 4b921fdb authored by Timm Fitschen's avatar Timm Fitschen
Browse files

WIP: files

parent 4e053068
Branches
Tags
1 merge request!11F files
Pipeline #11998 passed
Pipeline: caosdb-cppinttest

#12038

    ...@@ -22,7 +22,7 @@ ...@@ -22,7 +22,7 @@
    #include <string> // for string, opera... #include <string> // for string, opera...
    #include <utility> // for move #include <utility> // for move
    namespace FileExchange { namespace caosdb::transaction {
    using caosdb::StatusCode; using caosdb::StatusCode;
    using caosdb::exceptions::AuthenticationError; using caosdb::exceptions::AuthenticationError;
    using caosdb::exceptions::ConnectionError; using caosdb::exceptions::ConnectionError;
    ...@@ -37,11 +37,9 @@ DownloadRequestHandler::DownloadRequestHandler( ...@@ -37,11 +37,9 @@ DownloadRequestHandler::DownloadRequestHandler(
    request_(Arena::CreateMessage<FileDownloadRequest>(get_arena())), request_(Arena::CreateMessage<FileDownloadRequest>(get_arena())),
    response_(Arena::CreateMessage<FileDownloadResponse>(get_arena())), response_(Arena::CreateMessage<FileDownloadResponse>(get_arena())),
    state_(CallState::NewCall), file_descriptor_(std::move(file_descriptor)), state_(CallState::NewCall), file_descriptor_(std::move(file_descriptor)),
    bytesReceived_(0) { bytesReceived_(0) {}
    this->onNext(true);
    }
    bool DownloadRequestHandler::onNext(bool ok) { bool DownloadRequestHandler::OnNext(bool ok) {
    try { try {
    if (ok) { if (ok) {
    if (state_ == CallState::NewCall) { if (state_ == CallState::NewCall) {
    ...@@ -81,7 +79,7 @@ bool DownloadRequestHandler::onNext(bool ok) { ...@@ -81,7 +79,7 @@ bool DownloadRequestHandler::onNext(bool ok) {
    return true; return true;
    } }
    void DownloadRequestHandler::cancel() { ctx_.TryCancel(); } void DownloadRequestHandler::Cancel() { ctx_.TryCancel(); }
    void DownloadRequestHandler::handleNewCallState() { void DownloadRequestHandler::handleNewCallState() {
    CAOSDB_LOG_TRACE(logger_name) CAOSDB_LOG_TRACE(logger_name)
    ...@@ -90,17 +88,10 @@ void DownloadRequestHandler::handleNewCallState() { ...@@ -90,17 +88,10 @@ void DownloadRequestHandler::handleNewCallState() {
    << ", download_id = " << file_descriptor_.file_transmission_id; << ", download_id = " << file_descriptor_.file_transmission_id;
    fileWriter_ = std::make_unique<FileWriter>(file_descriptor_.local_path); fileWriter_ = std::make_unique<FileWriter>(file_descriptor_.local_path);
    CAOSDB_DEBUG_MESSAGE_STRING(*request_, request_out)
    CAOSDB_LOG_TRACE(logger_name) << "HERE1" << request_out;
    CAOSDB_DEBUG_MESSAGE_STRING(*(file_descriptor_.file_transmission_id),
    file_transmission_id_out)
    CAOSDB_LOG_TRACE(logger_name) << "HERE2" << file_transmission_id_out;
    request_->mutable_file_transmission_id()->CopyFrom( request_->mutable_file_transmission_id()->CopyFrom(
    *(file_descriptor_.file_transmission_id)); *(file_descriptor_.file_transmission_id));
    CAOSDB_LOG_TRACE(logger_name) << "HERE2";
    rpc_ = stub_->PrepareAsyncFileDownload(&ctx_, *request_, cq_); rpc_ = stub_->PrepareAsyncFileDownload(&ctx_, *request_, cq_);
    CAOSDB_LOG_TRACE(logger_name) << "HERE3";
    state_ = CallState::SendingRequest; state_ = CallState::SendingRequest;
    rpc_->StartCall(tag_); rpc_->StartCall(tag_);
    ...@@ -162,4 +153,4 @@ void DownloadRequestHandler::handleCallCompleteState() { ...@@ -162,4 +153,4 @@ void DownloadRequestHandler::handleCallCompleteState() {
    << "Leave DownloadRequestHandler::handleCallCompleteState"; << "Leave DownloadRequestHandler::handleCallCompleteState";
    } }
    } // namespace FileExchange } // namespace caosdb::transaction
    ...@@ -4,7 +4,7 @@ ...@@ -4,7 +4,7 @@
    #include <mutex> // for try_to_lock #include <mutex> // for try_to_lock
    #include <utility> // for move #include <utility> // for move
    namespace FileExchange { namespace caosdb::transaction {
    FileReader::FileReader(boost::filesystem::path filename) FileReader::FileReader(boost::filesystem::path filename)
    : filename_(std::move(filename)), size_(0) { : filename_(std::move(filename)), size_(0) {
    ...@@ -54,4 +54,4 @@ std::size_t FileReader::read(std::string &buffer) { ...@@ -54,4 +54,4 @@ std::size_t FileReader::read(std::string &buffer) {
    return bytesRead; return bytesRead;
    } }
    } // namespace FileExchange } // namespace caosdb::transaction
    ...@@ -4,7 +4,7 @@ ...@@ -4,7 +4,7 @@
    #include <mutex> // for try_to_lock #include <mutex> // for try_to_lock
    #include <utility> // for move #include <utility> // for move
    namespace FileExchange { namespace caosdb::transaction {
    FileWriter::FileWriter(boost::filesystem::path filename) FileWriter::FileWriter(boost::filesystem::path filename)
    : filename_(std::move(filename)) { : filename_(std::move(filename)) {
    ...@@ -40,4 +40,4 @@ void FileWriter::write(const std::string &buffer) { ...@@ -40,4 +40,4 @@ void FileWriter::write(const std::string &buffer) {
    } }
    } }
    } // namespace FileExchange } // namespace caosdb::transaction
    MIT License
    Copyright (c) 2019 NeiRo21
    Permission is hereby granted, free of charge, to any person obtaining a copy
    of this software and associated documentation files (the "Software"), to deal
    in the Software without restriction, including without limitation the rights
    to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
    copies of the Software, and to permit persons to whom the Software is
    furnished to do so, subject to the following conditions:
    The above copyright notice and this permission notice shall be included in all
    copies or substantial portions of the Software.
    THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
    IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
    FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
    AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
    LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
    OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
    SOFTWARE.
    ...@@ -24,7 +24,7 @@ ...@@ -24,7 +24,7 @@
    #include <string> // for basic_string #include <string> // for basic_string
    #include <utility> // for move #include <utility> // for move
    namespace FileExchange { namespace caosdb::transaction {
    using caosdb::StatusCode; using caosdb::StatusCode;
    using caosdb::exceptions::AuthenticationError; using caosdb::exceptions::AuthenticationError;
    using caosdb::exceptions::ConnectionError; using caosdb::exceptions::ConnectionError;
    ...@@ -40,11 +40,9 @@ UploadRequestHandler::UploadRequestHandler(HandlerTag tag, ...@@ -40,11 +40,9 @@ UploadRequestHandler::UploadRequestHandler(HandlerTag tag,
    request_(Arena::CreateMessage<FileUploadRequest>(get_arena())), request_(Arena::CreateMessage<FileUploadRequest>(get_arena())),
    response_(Arena::CreateMessage<FileUploadResponse>(get_arena())), response_(Arena::CreateMessage<FileUploadResponse>(get_arena())),
    state_(CallState::NewCall), file_descriptor_(std::move(file_descriptor)), state_(CallState::NewCall), file_descriptor_(std::move(file_descriptor)),
    bytesToSend_(0) { bytesToSend_(0) {}
    this->onNext(true);
    }
    bool UploadRequestHandler::onNext(bool ok) { bool UploadRequestHandler::OnNext(bool ok) {
    try { try {
    if (state_ == CallState::CallComplete) { if (state_ == CallState::CallComplete) {
    this->handleCallCompleteState(); this->handleCallCompleteState();
    ...@@ -84,7 +82,7 @@ bool UploadRequestHandler::onNext(bool ok) { ...@@ -84,7 +82,7 @@ bool UploadRequestHandler::onNext(bool ok) {
    return true; return true;
    } }
    void UploadRequestHandler::cancel() { ctx_.TryCancel(); } void UploadRequestHandler::Cancel() { ctx_.TryCancel(); }
    void UploadRequestHandler::handleNewCallState() { void UploadRequestHandler::handleNewCallState() {
    auto filename = file_descriptor_.local_path; auto filename = file_descriptor_.local_path;
    ...@@ -157,4 +155,4 @@ void UploadRequestHandler::handleCallCompleteState() { ...@@ -157,4 +155,4 @@ void UploadRequestHandler::handleCallCompleteState() {
    } }
    } }
    } // namespace FileExchange } // namespace caosdb::transaction
    #include "caosdb/file_transmission/register_file_upload_handler.h"
    #include "caosdb/logging.h" // for CAOSDB_LOG_TRACE
    #include <boost/log/core/record.hpp> // for record
    #include <boost/log/sources/record_ostream.hpp> // for basic_record_...
    #include <boost/preprocessor/seq/limits/enum_256.hpp> // for BOOST_PP_SEQ_...
    #include <boost/preprocessor/seq/limits/size_256.hpp> // for BOOST_PP_SEQ_...
    #include <grpcpp/impl/codegen/async_unary_call.h> // for ClientAsyncRes...
    #include <grpcpp/impl/codegen/completion_queue.h> // for CompletionQueue
    namespace caosdb::transaction {
    RegisterFileUploadHandler::~RegisterFileUploadHandler() = default;
    RegisterFileUploadHandler::RegisterFileUploadHandler(
    HandlerTag tag, FileTransmissionService::Stub *stub,
    grpc::CompletionQueue *completion_queue, RegisterFileUploadRequest *request,
    RegisterFileUploadResponse *response)
    : UnaryRpcHandler(completion_queue), tag_(tag), stub_(stub),
    request_(request), response_(response) {}
    void RegisterFileUploadHandler::handleNewCallState() {
    CAOSDB_LOG_TRACE(logger_name)
    << "Enter RegisterFileUploadHandler::handleNewCallState.";
    rpc_ = stub_->PrepareAsyncRegisterFileUpload(&call_context, *request_,
    completion_queue);
    state_ = CallState::CallComplete;
    rpc_->StartCall();
    rpc_->Finish(response_, &status_, tag_);
    CAOSDB_LOG_TRACE(logger_name)
    << "Leave RegisterFileUploadHandler::handleNewCallState";
    }
    void RegisterFileUploadHandler::handleReceivingFileState() {}
    } // namespace caosdb::transaction
    ...@@ -21,9 +21,11 @@ ...@@ -21,9 +21,11 @@
    #include "caosdb/entity/v1alpha1/main.grpc.pb.h" // for EntityTransac... #include "caosdb/entity/v1alpha1/main.grpc.pb.h" // for EntityTransac...
    #include "caosdb/entity/v1alpha1/main.pb.h" // for TransactionRe... #include "caosdb/entity/v1alpha1/main.pb.h" // for TransactionRe...
    #include "caosdb/file_transmission/Client.h" // for FileExchangeC... #include "caosdb/file_transmission/Client.h" // for FileExchangeC...
    #include "caosdb/file_transmission/register_file_upload_handler.h"
    #include "caosdb/logging.h" // for CAOSDB_LOG_FATAL #include "caosdb/logging.h" // for CAOSDB_LOG_FATAL
    #include "caosdb/protobuf_helper.h" // for get_arena #include "caosdb/protobuf_helper.h" // for get_arena
    #include "caosdb/status_code.h" // for StatusCode #include "caosdb/status_code.h" // for StatusCode
    #include "caosdb/transaction_handler.h"
    #include <algorithm> // for max #include <algorithm> // for max
    #include <boost/filesystem/path.hpp> // for operator<<, path #include <boost/filesystem/path.hpp> // for operator<<, path
    #include <boost/log/core/record.hpp> // for record #include <boost/log/core/record.hpp> // for record
    ...@@ -31,14 +33,11 @@ ...@@ -31,14 +33,11 @@
    #include <boost/log/sources/record_ostream.hpp> // for basic_record_... #include <boost/log/sources/record_ostream.hpp> // for basic_record_...
    #include <boost/preprocessor/seq/limits/enum_256.hpp> // for BOOST_PP_SEQ_... #include <boost/preprocessor/seq/limits/enum_256.hpp> // for BOOST_PP_SEQ_...
    #include <boost/preprocessor/seq/limits/size_256.hpp> // for BOOST_PP_SEQ_... #include <boost/preprocessor/seq/limits/size_256.hpp> // for BOOST_PP_SEQ_...
    #include <cassert> // for assert // IWYU pragma: no_include <bits/exception.h>
    #include <exception> // IWYU pragma: keep
    #include <google/protobuf/arena.h> // for Arena #include <google/protobuf/arena.h> // for Arena
    #include <grpcpp/grpcpp.h> // for CompletionQueue #include <grpc/impl/codegen/gpr_types.h>
    #include <grpcpp/impl/codegen/async_unary_call.h> // for ClientAsyncRe...
    #include <grpcpp/impl/codegen/client_context.h> // for ClientContext
    #include <grpcpp/impl/codegen/completion_queue.h> // for CompletionQueue #include <grpcpp/impl/codegen/completion_queue.h> // for CompletionQueue
    #include <grpcpp/impl/codegen/status.h> // for Status
    #include <grpcpp/impl/codegen/status_code_enum.h> // for StatusCode
    #include <iosfwd> // for streamsize #include <iosfwd> // for streamsize
    #include <map> // for map, operator!= #include <map> // for map, operator!=
    #include <memory> // for unique_ptr #include <memory> // for unique_ptr
    ...@@ -116,12 +115,10 @@ using TransactionResponseCase = ...@@ -116,12 +115,10 @@ using TransactionResponseCase =
    caosdb::entity::v1alpha1::TransactionResponse::TransactionResponseCase; caosdb::entity::v1alpha1::TransactionResponse::TransactionResponseCase;
    using RetrieveResponseCase = using RetrieveResponseCase =
    caosdb::entity::v1alpha1::RetrieveResponse::RetrieveResponseCase; caosdb::entity::v1alpha1::RetrieveResponse::RetrieveResponseCase;
    using caosdb::utility::get_arena;
    using grpc::ClientAsyncResponseReader;
    using ProtoEntity = caosdb::entity::v1alpha1::Entity; using ProtoEntity = caosdb::entity::v1alpha1::Entity;
    using FileExchange::FileExchangeClient;
    using google::protobuf::Arena; using google::protobuf::Arena;
    using grpc::CompletionQueue; using NextStatus = grpc::CompletionQueue::NextStatus;
    using RegistrationStatus = caosdb::entity::v1alpha1::RegistrationStatus;
    ResultSet::iterator::iterator(const ResultSet *result_set_param, int index) ResultSet::iterator::iterator(const ResultSet *result_set_param, int index)
    : current_index(index), result_set(result_set_param) {} : current_index(index), result_set(result_set_param) {}
    ...@@ -159,8 +156,8 @@ MultiResultSet::MultiResultSet(std::vector<std::unique_ptr<Entity>> result_set) ...@@ -159,8 +156,8 @@ MultiResultSet::MultiResultSet(std::vector<std::unique_ptr<Entity>> result_set)
    Transaction::Transaction( Transaction::Transaction(
    std::shared_ptr<EntityTransactionService::Stub> entity_service, std::shared_ptr<EntityTransactionService::Stub> entity_service,
    std::shared_ptr<FileTransmissionService::Stub> file_service) std::shared_ptr<FileTransmissionService::Stub> file_service)
    : request(Arena::CreateMessage<MultiTransactionRequest>(get_arena())), : request(Arena::CreateMessage<MultiTransactionRequest>(GetArena())),
    response(Arena::CreateMessage<MultiTransactionResponse>(get_arena())) { response(Arena::CreateMessage<MultiTransactionResponse>(GetArena())) {
    this->entity_service = std::move(entity_service); this->entity_service = std::move(entity_service);
    this->file_service = std::move(file_service); this->file_service = std::move(file_service);
    this->query_count = -1; this->query_count = -1;
    ...@@ -285,14 +282,26 @@ auto Transaction::ExecuteAsynchronously() noexcept -> StatusCode { ...@@ -285,14 +282,26 @@ auto Transaction::ExecuteAsynchronously() noexcept -> StatusCode {
    // upload files first // upload files first
    if (!upload_files.empty()) { if (!upload_files.empty()) {
    CAOSDB_LOG_INFO(logger_name)
    << "Number of files to be uploaded: " << upload_files.size();
    // TODO(tf): Use Arena
    auto *registration_request =
    Arena::CreateMessage<RegisterFileUploadRequest>(GetArena());
    auto *registration_response = auto *registration_response =
    Arena::CreateMessage<RegisterFileUploadResponse>(get_arena()); Arena::CreateMessage<RegisterFileUploadResponse>(GetArena());
    RegisterUploadFile(registration_response);
    handler_ = std::make_unique<RegisterFileUploadHandler>(
    &handler_, file_service.get(), &completion_queue, registration_request,
    registration_response);
    this->status = ProcessCalls();
    if (registration_response->status() !=
    RegistrationStatus::REGISTRATION_STATUS_ACCEPTED) {
    this->status = TransactionStatus::FILE_UPLOAD_ERROR();
    return StatusCode::EXECUTING;
    }
    // TODO(tf): if(registration_response.status !=
    // REGISTRATION_STATUS_ACCEPTED){
    // return StatusCode::FILE_UPLOAD_FAILED
    // }
    FileExchangeClient upload_client(file_service); FileExchangeClient upload_client(file_service);
    for (auto file_descriptor : upload_files) { for (auto file_descriptor : upload_files) {
    file_descriptor.file_transmission_id->set_registration_id( file_descriptor.file_transmission_id->set_registration_id(
    ...@@ -300,50 +309,17 @@ auto Transaction::ExecuteAsynchronously() noexcept -> StatusCode { ...@@ -300,50 +309,17 @@ auto Transaction::ExecuteAsynchronously() noexcept -> StatusCode {
    CAOSDB_LOG_INFO(logger_name) CAOSDB_LOG_INFO(logger_name)
    << "Uploading " << file_descriptor.local_path; << "Uploading " << file_descriptor.local_path;
    auto file_upload_status = upload_client.upload(file_descriptor); auto file_upload_status = upload_client.upload(file_descriptor);
    if (static_cast<int>(static_cast<int>((file_upload_status) == 0) == if (file_upload_status != StatusCode::SUCCESS) {
    StatusCode::SUCCESS) != 0) {
    this->status = TransactionStatus::FILE_UPLOAD_ERROR(); this->status = TransactionStatus::FILE_UPLOAD_ERROR();
    return StatusCode::EXECUTING; return StatusCode::EXECUTING;
    } }
    } }
    } }
    grpc::Status grpc_status; handler_ = std::make_unique<EntityTransactionHandler>(
    CompletionQueue cq; &handler_, entity_service.get(), &completion_queue, request, response);
    grpc::ClientContext context; this->status = ProcessCalls();
    std::unique_ptr<ClientAsyncResponseReader<MultiTransactionResponse>> rpc(
    this->entity_service->PrepareAsyncMultiTransaction(&context,
    *(this->request), &cq));
    rpc->StartCall();
    int tag = 1;
    void *send_tag = static_cast<void *>(&tag);
    rpc->Finish(this->response, &grpc_status, send_tag);
    void *recv_tag = nullptr;
    bool ok = false;
    // TODO(tf) make this actually asynchronous by moving this to WaitForIt()
    cq.Next(&recv_tag, &ok);
    assert(recv_tag == send_tag);
    assert(ok);
    if (!grpc_status.ok()) {
    switch (grpc_status.error_code()) {
    case grpc::StatusCode::UNAUTHENTICATED:
    this->status = TransactionStatus::AUTHENTICATION_ERROR();
    break;
    case grpc::StatusCode::UNAVAILABLE:
    this->status = TransactionStatus::CONNECTION_ERROR();
    break;
    default:
    auto error_details = std::to_string(grpc_status.error_code()) + " - " +
    grpc_status.error_message();
    this->status = TransactionStatus::RPC_ERROR(error_details);
    }
    } else {
    this->status = TransactionStatus::SUCCESS();
    }
    // file download afterwards // file download afterwards
    if (status.GetCode() == StatusCode::SUCCESS && !download_files.empty()) { if (status.GetCode() == StatusCode::SUCCESS && !download_files.empty()) {
    ...@@ -364,14 +340,6 @@ auto Transaction::ExecuteAsynchronously() noexcept -> StatusCode { ...@@ -364,14 +340,6 @@ auto Transaction::ExecuteAsynchronously() noexcept -> StatusCode {
    } }
    } }
    auto *registration_response =
    Arena::CreateMessage<RegisterFileUploadResponse>(get_arena());
    RegisterUploadFile(registration_response);
    // TODO(tf): if(registration_response.status !=
    // REGISTRATION_STATUS_ACCEPTED){
    // return StatusCode::FILE_UPLOAD_FAILED
    // }
    FileExchangeClient download_client(file_service); FileExchangeClient download_client(file_service);
    for (const auto &item : download_files) { for (const auto &item : download_files) {
    auto file_descriptor(item.second); auto file_descriptor(item.second);
    ...@@ -379,10 +347,8 @@ auto Transaction::ExecuteAsynchronously() noexcept -> StatusCode { ...@@ -379,10 +347,8 @@ auto Transaction::ExecuteAsynchronously() noexcept -> StatusCode {
    CAOSDB_LOG_INFO(logger_name) CAOSDB_LOG_INFO(logger_name)
    << "Downloading " << file_descriptor.local_path << ", " << out; << "Downloading " << file_descriptor.local_path << ", " << out;
    auto file_download_status = download_client.download(file_descriptor); auto file_download_status = download_client.download(file_descriptor);
    if (static_cast<int>(static_cast<int>((file_download_status) == 0) == if (file_download_status != StatusCode::SUCCESS) {
    StatusCode::SUCCESS) != 0) {
    this->status = TransactionStatus::FILE_DOWNLOAD_ERROR(); this->status = TransactionStatus::FILE_DOWNLOAD_ERROR();
    // TODO (tf) handle multiple errors?
    return StatusCode::EXECUTING; return StatusCode::EXECUTING;
    } }
    } }
    ...@@ -483,28 +449,75 @@ auto Transaction::WaitForIt() const noexcept -> TransactionStatus { ...@@ -483,28 +449,75 @@ auto Transaction::WaitForIt() const noexcept -> TransactionStatus {
    return this->status; return this->status;
    } }
    auto Transaction::RegisterUploadFile(RegisterFileUploadResponse *response) auto Transaction::ProcessCalls() -> TransactionStatus {
    -> void { gpr_timespec deadline;
    grpc::Status grpc_status; deadline.tv_sec = 1;
    CompletionQueue cq; deadline.tv_nsec = 0;
    deadline.clock_type = gpr_clock_type::GPR_TIMESPAN;
    RegisterFileUploadRequest request; TransactionStatus result = TransactionStatus::EXECUTING();
    handler_->Start();
    void *tag = nullptr;
    bool ok = false;
    while (true) {
    switch (completion_queue.AsyncNext(&tag, &ok, deadline)) {
    case NextStatus::GOT_EVENT: {
    if (tag != nullptr) {
    auto res = handler_->OnNext(ok);
    if (!res) {
    // The handler has finished it's work
    result = handler_->GetStatus();
    handler_.reset();
    return result;
    }
    } else {
    std::string description("Invalid tag delivered by notification queue.");
    CAOSDB_LOG_ERROR(logger_name) << description;
    handler_.reset();
    return TransactionStatus::RPC_ERROR(description);
    }
    } break;
    case NextStatus::SHUTDOWN: {
    CAOSDB_LOG_ERROR(logger_name)
    << "Notification queue has been shut down unexpectedly.";
    result = handler_->GetStatus();
    handler_.reset();
    return result;
    } break;
    case NextStatus::TIMEOUT: {
    CAOSDB_LOG_DEBUG(logger_name) << "Timeout, waiting...";
    } break;
    default:
    CAOSDB_LOG_FATAL(logger_name)
    << "Got an invalid NextStatus from CompletionQueue.";
    result = handler_->GetStatus();
    handler_.reset();
    return result;
    }
    }
    result = handler_->GetStatus();
    handler_.reset();
    return result;
    }
    grpc::ClientContext context; Transaction::~Transaction() {
    std::unique_ptr<ClientAsyncResponseReader<RegisterFileUploadResponse>> rpc( this->Cancel();
    this->file_service->PrepareAsyncRegisterFileUpload(&context, request, &cq));
    rpc->StartCall(); completion_queue.Shutdown();
    int tag = 1; // drain the queue
    void *send_tag = static_cast<void *>(&tag); void *ignoredTag = nullptr;
    rpc->Finish(response, &grpc_status, send_tag);
    void *recv_tag = nullptr;
    bool ok = false; bool ok = false;
    while (completion_queue.Next(&ignoredTag, &ok)) {
    ;
    }
    }
    cq.Next(&recv_tag, &ok); void Transaction::Cancel() {
    assert(recv_tag == send_tag); // TODO(tf) State Canceled
    assert(ok); if (handler_ != nullptr) {
    handler_->Cancel();
    }
    } }
    } // namespace caosdb::transaction } // namespace caosdb::transaction
    #include "caosdb/transaction_handler.h"
    #include "caosdb/logging.h" // for CAOSDB_LOG_TRACE
    #include <boost/log/core/record.hpp> // for record
    #include <boost/log/sources/record_ostream.hpp> // for basic_record_...
    #include <boost/preprocessor/seq/limits/enum_256.hpp> // for BOOST_PP_SEQ_...
    #include <boost/preprocessor/seq/limits/size_256.hpp> // for BOOST_PP_SEQ_...
    #include <exception> // IWYU pragma: keep
    // IWYU pragma: no_include <bits/exception.h>
    #include <grpcpp/impl/codegen/async_unary_call.h> // for ClientAsyncRes...
    #include <grpcpp/impl/codegen/completion_queue.h> // for CompletionQueue
    namespace caosdb::transaction {
    EntityTransactionHandler::EntityTransactionHandler(
    HandlerTag tag, EntityTransactionService::Stub *stub,
    grpc::CompletionQueue *completion_queue, MultiTransactionRequest *request,
    MultiTransactionResponse *response)
    : UnaryRpcHandler(completion_queue), tag_(tag), stub_(stub),
    request_(request), response_(response) {}
    void EntityTransactionHandler::handleNewCallState() {
    CAOSDB_LOG_TRACE(logger_name)
    << "Enter EntityTransactionHandler::handleNewCallState with "
    "CompletionQueue "
    << completion_queue;
    rpc_ = stub_->PrepareAsyncMultiTransaction(&call_context, *request_,
    completion_queue);
    state_ = CallState::CallComplete;
    rpc_->StartCall();
    rpc_->Finish(response_, &status_, tag_);
    CAOSDB_LOG_TRACE(logger_name)
    << "Leave EntityTransactionHandler::handleNewCallState";
    }
    void EntityTransactionHandler::handleReceivingFileState() {
    // TODO(tf) remove
    }
    } // namespace caosdb::transaction
    #include "caosdb/unary_rpc_handler.h"
    #include "caosdb/logging.h" // for CAOSDB_LOG_TRACE
    #include "caosdb/status_code.h" // for GENERIC_RPC_E...
    #include <boost/log/core/record.hpp> // for record
    #include <boost/log/detail/attachable_sstream_buf.hpp> // for basic_ostring...
    #include <boost/log/sources/record_ostream.hpp> // for basic_record_...
    #include <boost/preprocessor/seq/limits/enum_256.hpp> // for BOOST_PP_SEQ_...
    #include <boost/preprocessor/seq/limits/size_256.hpp> // for BOOST_PP_SEQ_...
    // IWYU pragma: no_include <bits/exception.h>
    #include <exception> // IWYU pragma: keep
    #include <grpcpp/impl/codegen/client_context.h> // for ClientContext
    #include <grpcpp/impl/codegen/status.h> // for Status
    #include <grpcpp/impl/codegen/status_code_enum.h> // for OK, UNAUTHENT...
    #include <iosfwd> // for streamsize
    #include <string> // for string, opera...
    namespace caosdb::transaction {
    bool UnaryRpcHandler::OnNext(bool ok) {
    try {
    if (ok) {
    if (state_ == CallState::NewCall) {
    this->handleNewCallState();
    } else if (state_ == CallState::ReceivingFile) {
    this->handleReceivingFileState();
    } else if (state_ == CallState::CallComplete) {
    this->handleCallCompleteState();
    return false;
    }
    } else {
    CAOSDB_LOG_ERROR(logger_name)
    << "UnaryRpcHandler::OnNext(false)!. This should not happen.";
    // TODO(tf) Handle this error:
    // in CallComplete state: "Client-side Finish: ok should always be true"
    // in ReceivingFile state: "ok indicates that the RPC is going to go to
    // the wire. If it is false, it not going to the wire. This would happen
    // if the channel is either permanently broken or transiently broken but
    // with the fail-fast option. (Note that async unary RPCs don't post a CQ
    // tag at this point, nor do client-streaming or bidi-streaming RPCs that
    // have the initial metadata corked option set.)"
    }
    return true;
    } catch (std::exception &e) {
    CAOSDB_LOG_ERROR(logger_name)
    << "UnaryRpcHandler caught an exception: " << e.what();
    transaction_status = TransactionStatus::GENERIC_ERROR(e.what());
    state_ = CallState::CallComplete;
    } catch (...) {
    CAOSDB_LOG_ERROR(logger_name)
    << "Transaction error: unknown exception caught";
    transaction_status = TransactionStatus::GENERIC_ERROR(
    "UnaryRpcHandler caught an unknown exception");
    state_ = CallState::CallComplete;
    }
    if (state_ != CallState::NewCall) {
    call_context.TryCancel();
    }
    return false;
    }
    void UnaryRpcHandler::Cancel() { call_context.TryCancel(); }
    void UnaryRpcHandler::handleCallCompleteState() {
    CAOSDB_LOG_TRACE(logger_name)
    << "Enter UnaryRpcHandler::handleCallCompleteState";
    switch (status_.error_code()) {
    case grpc::OK:
    transaction_status = TransactionStatus::SUCCESS();
    CAOSDB_LOG_INFO(logger_name) << "UnaryRpcHandler finished successfully.";
    break;
    default:
    auto code(static_cast<StatusCode>(status_.error_code()));
    std::string description(get_status_description(code) +
    " Original message: " + status_.error_message());
    transaction_status = TransactionStatus(code, description);
    CAOSDB_LOG_ERROR(logger_name)
    << "UnaryRpcHandler finished with an error (Code " << code
    << "): " << description;
    break;
    }
    CAOSDB_LOG_TRACE(logger_name)
    << "Leave UnaryRpcHandler::handleCallCompleteState";
    }
    } // namespace caosdb::transaction
    ...@@ -10,7 +10,7 @@ ...@@ -10,7 +10,7 @@
    namespace fs = boost::filesystem; namespace fs = boost::filesystem;
    namespace FileExchange { namespace caosdb::transaction {
    class test_file_transmission : public ::testing::Test { class test_file_transmission : public ::testing::Test {
    protected: protected:
    ...@@ -41,4 +41,4 @@ TEST_F(test_file_transmission, test_file_writer_reader) { ...@@ -41,4 +41,4 @@ TEST_F(test_file_transmission, test_file_writer_reader) {
    } }
    } }
    } // namespace FileExchange } // namespace caosdb::transaction
    ...@@ -24,6 +24,7 @@ ...@@ -24,6 +24,7 @@
    #include "caosdb/exceptions.h" // for ConnectionError #include "caosdb/exceptions.h" // for ConnectionError
    #include "caosdb/status_code.h" #include "caosdb/status_code.h"
    #include "caosdb/transaction.h" // for Transaction #include "caosdb/transaction.h" // for Transaction
    #include "caosdb/transaction_handler.h" // for MultiTransactionResponse
    #include "caosdb/transaction_status.h" // for ConnectionError #include "caosdb/transaction_status.h" // for ConnectionError
    #include "caosdb_test_utility.h" // for EXPECT_THROW_MESSAGE #include "caosdb_test_utility.h" // for EXPECT_THROW_MESSAGE
    #include "gtest/gtest-message.h" // for Message #include "gtest/gtest-message.h" // for Message
    ...@@ -39,10 +40,23 @@ namespace caosdb::transaction { ...@@ -39,10 +40,23 @@ namespace caosdb::transaction {
    using caosdb::configuration::InsecureConnectionConfiguration; using caosdb::configuration::InsecureConnectionConfiguration;
    using caosdb::connection::Connection; using caosdb::connection::Connection;
    using caosdb::entity::Entity; using caosdb::entity::Entity;
    using caosdb::exceptions::ConnectionError;
    using ProtoEntity = caosdb::entity::v1alpha1::Entity; using ProtoEntity = caosdb::entity::v1alpha1::Entity;
    using caosdb::entity::v1alpha1::RetrieveResponse; using caosdb::entity::v1alpha1::RetrieveResponse;
    TEST(test_transaction, create_transaction) {
    const auto *host = "localhost";
    auto configuration = InsecureConnectionConfiguration(host, 8000);
    Connection connection(configuration);
    auto transaction = connection.CreateTransaction();
    ASSERT_EQ(StatusCode::GO_ON, transaction->RetrieveById("100"));
    EXPECT_THROW_MESSAGE(
    transaction->Execute(), ConnectionError,
    "The attempt to execute this transaction was not successful because the "
    "connection to the server could not be established. "
    "Original message: failed to connect to all addresses");
    }
    TEST(test_transaction, test_multi_result_set) { TEST(test_transaction, test_multi_result_set) {
    std::vector<std::unique_ptr<Entity>> entities; std::vector<std::unique_ptr<Entity>> entities;
    for (int i = 0; i < 5; i++) { for (int i = 0; i < 5; i++) {
    ...@@ -64,19 +78,6 @@ TEST(test_transaction, test_multi_result_set) { ...@@ -64,19 +78,6 @@ TEST(test_transaction, test_multi_result_set) {
    EXPECT_EQ(counter, 5); EXPECT_EQ(counter, 5);
    } }
    TEST(test_transaction, create_transaction) {
    const auto *host = "localhost";
    auto configuration = InsecureConnectionConfiguration(host, 8000);
    Connection connection(configuration);
    auto transaction = connection.CreateTransaction();
    ASSERT_EQ(StatusCode::GO_ON, transaction->RetrieveById("100"));
    EXPECT_THROW_MESSAGE(
    transaction->Execute(), ConnectionError,
    "The attempt to execute this transaction was not successful because the "
    "connection to the server could not be established.");
    }
    TEST(test_transaction, test_unavailable) { TEST(test_transaction, test_unavailable) {
    const auto *host = "localhost"; const auto *host = "localhost";
    auto configuration = InsecureConnectionConfiguration(host, 8000); auto configuration = InsecureConnectionConfiguration(host, 8000);
    ...@@ -217,9 +218,9 @@ TEST(test_transaction, test_insert_with_file) { ...@@ -217,9 +218,9 @@ TEST(test_transaction, test_insert_with_file) {
    entity.SetRole("File"); entity.SetRole("File");
    entity.SetLocalPath(TEST_DATA_DIR + "/test.json"); entity.SetLocalPath(TEST_DATA_DIR + "/test.json");
    EXPECT_TRUE(transaction->upload_files.empty()); EXPECT_TRUE(transaction->GetUploadFiles().empty());
    transaction->InsertEntity(&entity); transaction->InsertEntity(&entity);
    EXPECT_EQ(transaction->upload_files.size(), 1); EXPECT_EQ(transaction->GetUploadFiles().size(), 1);
    transaction->ExecuteAsynchronously(); transaction->ExecuteAsynchronously();
    EXPECT_EQ(transaction->GetStatus().GetCode(), StatusCode::FILE_UPLOAD_ERROR); EXPECT_EQ(transaction->GetStatus().GetCode(), StatusCode::FILE_UPLOAD_ERROR);
    ......
    0% Loading or .
    You are about to add 0 people to the discussion. Proceed with caution.
    Please register or to comment