Skip to content
Snippets Groups Projects

F files

Merged Timm Fitschen requested to merge f-files into dev
1 file
+ 0
170
Compare changes
  • Side-by-side
  • Inline
+ 312
140
@@ -18,23 +18,32 @@
*
*/
#include "caosdb/transaction.h"
#include "caosdb/entity/v1alpha1/main.grpc.pb.h" // for EntityTransactionS...
#include "caosdb/entity/v1alpha1/main.pb.h" // for SingleRetrieveRequest
#include "caosdb/logging.h"
#include "caosdb/protobuf_helper.h" // for get_arena
#include "caosdb/status_code.h" // for StatusCode, AUTHEN...
#include "google/protobuf/arena.h" // for Arena
#include "grpcpp/grpcpp.h" // for CompletionQueue
#include "grpcpp/impl/codegen/async_unary_call.h" // for ClientAsyncRespons...
#include "grpcpp/impl/codegen/client_context.h" // for ClientContext
#include "grpcpp/impl/codegen/completion_queue.h" // for CompletionQueue
#include "grpcpp/impl/codegen/status.h" // for Status
#include "grpcpp/impl/codegen/status_code_enum.h" // for StatusCode, UNAUTH...
#include <cassert> // for assert
#include <map> // for map
#include <memory> // for allocator, unique_ptr
#include "caosdb/entity/v1alpha1/main.grpc.pb.h" // for EntityTransac...
#include "caosdb/entity/v1alpha1/main.pb.h" // for TransactionRe...
#include "caosdb/file_transmission/download_request_handler.h" // Download...
#include "caosdb/file_transmission/file_reader.h" // for path
#include "caosdb/file_transmission/register_file_upload_handler.h"
#include "caosdb/file_transmission/upload_request_handler.h" // Upload...
#include "caosdb/logging.h" // for CAOSDB_LOG_FATAL
#include "caosdb/status_code.h" // for StatusCode
#include "caosdb/transaction_handler.h"
#include <algorithm> // for max
#include <boost/filesystem/path.hpp> // for operator<<, path
#include <boost/log/core/record.hpp> // for record
#include <boost/log/detail/attachable_sstream_buf.hpp> // for basic_ostring...
#include <boost/log/sources/record_ostream.hpp> // for basic_record_...
#include <boost/preprocessor/seq/limits/enum_256.hpp> // for BOOST_PP_SEQ_...
#include <boost/preprocessor/seq/limits/size_256.hpp> // for BOOST_PP_SEQ_...
// IWYU pragma: no_include <bits/exception.h>
#include <exception> // IWYU pragma: keep
#include <google/protobuf/arena.h> // for Arena
#include <grpc/impl/codegen/gpr_types.h>
#include <grpcpp/impl/codegen/completion_queue.h> // for CompletionQueue
#include <iosfwd> // for streamsize
#include <map> // for map, operator!=
#include <memory> // for unique_ptr
#include <stdexcept> // for out_of_range
#include <utility> // for move
#include <utility> // for move, pair
namespace caosdb {
@@ -79,8 +88,23 @@ auto get_status_description(int code) -> const std::string & {
"have "
"an id. This is the case when you did not retrieve it before applying any "
"changes and instantiated the Entity class explicitely."},
{StatusCode::NOT_A_FILE_ENTITY, "You can only add files to file entities."},
{StatusCode::PATH_IS_A_DIRECTORY, "The given path is a directory."},
{StatusCode::FILE_DOES_NOT_EXIST_LOCALLY,
"The file does not not exist in the local file system."},
{StatusCode::FILE_DOWNLOAD_ERROR,
"The transaction failed during the download of the files"},
{StatusCode::FILE_UPLOAD_ERROR,
"The transaction failed during the upload of the files"},
{StatusCode::UNSUPPORTED_FEATURE,
"This feature is not available in the this client implementation."}};
"This feature is not available in the this client implementation."},
{StatusCode::EXTERN_C_ASSIGNMENT_ERROR,
"You tried to assign a new object to the wrapped void pointer. You have "
"to delete the old pointee first."},
{StatusCode::OTHER_CLIENT_ERROR,
"This is code is reserved to errors raised by other clients wrapping the "
"C++ client (or its Extern C interface). This should never occur when "
"working with the C++ code itself."}};
try {
return descriptions.at(code);
} catch (const std::out_of_range &exc) {
@@ -92,22 +116,23 @@ auto get_status_description(int code) -> const std::string & {
namespace caosdb::transaction {
using caosdb::entity::v1alpha1::EntityTransactionService;
using caosdb::entity::v1alpha1::FileTransmissionService;
using caosdb::entity::v1alpha1::MultiTransactionRequest;
using caosdb::entity::v1alpha1::MultiTransactionResponse;
using WrappedResponseCase =
caosdb::entity::v1alpha1::TransactionResponse::WrappedResponseCase;
using QueryResponseCase =
caosdb::entity::v1alpha1::RetrieveResponse::QueryResponseCase;
using caosdb::utility::get_arena;
using grpc::ClientAsyncResponseReader;
using TransactionResponseCase =
caosdb::entity::v1alpha1::TransactionResponse::TransactionResponseCase;
using RetrieveResponseCase =
caosdb::entity::v1alpha1::RetrieveResponse::RetrieveResponseCase;
using ProtoEntity = caosdb::entity::v1alpha1::Entity;
using grpc::CompletionQueue;
using google::protobuf::Arena;
using NextStatus = grpc::CompletionQueue::NextStatus;
using RegistrationStatus = caosdb::entity::v1alpha1::RegistrationStatus;
ResultSet::iterator::iterator(const ResultSet *result_set_param, int index)
: current_index(index), result_set(result_set_param) {}
auto ResultSet::iterator::operator*() const -> const Entity & {
return this->result_set->At(current_index);
return this->result_set->at(current_index);
}
auto ResultSet::iterator::operator++() -> ResultSet::iterator & {
@@ -130,24 +155,19 @@ auto ResultSet::begin() const -> ResultSet::iterator {
}
auto ResultSet::end() const -> ResultSet::iterator {
return ResultSet::iterator(this, Size());
return ResultSet::iterator(this, size());
}
MultiResultSet::MultiResultSet(std::vector<std::unique_ptr<Entity>> result_set)
: entities(std::move(result_set)) {}
[[nodiscard]] auto UniqueResult::GetEntity() const -> const Entity & {
const Entity *result = this->entity.get();
return *result;
}
: AbstractMultiResultSet(std::move(result_set)) {}
Transaction::Transaction(
std::shared_ptr<EntityTransactionService::Stub> service_stub)
: request(google::protobuf::Arena::CreateMessage<MultiTransactionRequest>(
get_arena())),
response(google::protobuf::Arena::CreateMessage<MultiTransactionResponse>(
get_arena())) {
this->service_stub = std::move(service_stub);
std::shared_ptr<EntityTransactionService::Stub> entity_service,
std::shared_ptr<FileTransmissionService::Stub> file_service)
: request(Arena::CreateMessage<MultiTransactionRequest>(GetArena())),
response(Arena::CreateMessage<MultiTransactionResponse>(GetArena())) {
this->entity_service = std::move(entity_service);
this->file_service = std::move(file_service);
this->query_count = -1;
}
@@ -161,6 +181,21 @@ auto Transaction::RetrieveById(const std::string &id) noexcept -> StatusCode {
return this->status.GetCode();
}
auto Transaction::RetrieveAndDownloadFilesById(
const std::string &id, const std::string &local_path) noexcept -> StatusCode {
ASSERT_CAN_ADD_RETRIEVAL
auto *retrieve_request =
this->request->add_requests()->mutable_retrieve_request();
retrieve_request->set_id(id);
retrieve_request->set_register_file_download(true);
download_files[id].local_path = local_path;
this->status = TransactionStatus::GO_ON();
return this->status.GetCode();
}
auto Transaction::Query(const std::string &query) noexcept -> StatusCode {
ASSERT_CAN_ADD_QUERY
@@ -187,11 +222,18 @@ auto Transaction::DeleteById(const std::string &id) noexcept -> StatusCode {
auto Transaction::InsertEntity(Entity *entity) noexcept -> StatusCode {
ASSERT_CAN_ADD_INSERTION
auto *sub_request = this->request->add_requests();
auto *proto_entity = sub_request->mutable_insert_request();
auto *entity_request = this->request->add_requests()
->mutable_insert_request()
->mutable_entity_request();
auto *proto_entity = entity_request->mutable_entity();
// copy the original entity for the transaction
entity->CopyTo(proto_entity);
if (entity->HasFile()) {
auto *file_transmission_id = entity_request->mutable_upload_id();
entity->SetFileTransmissionId(file_transmission_id);
upload_files.push_back(entity->GetFileDescriptor());
}
this->status = TransactionStatus::READY();
return this->status.GetCode();
}
@@ -199,10 +241,17 @@ auto Transaction::InsertEntity(Entity *entity) noexcept -> StatusCode {
auto Transaction::UpdateEntity(Entity *entity) noexcept -> StatusCode {
ASSERT_CAN_ADD_UPDATE
auto *sub_request = this->request->add_requests();
auto *proto_entity = sub_request->mutable_update_request();
auto *entity_request = this->request->add_requests()
->mutable_update_request()
->mutable_entity_request();
auto *proto_entity = entity_request->mutable_entity();
entity->CopyTo(proto_entity);
if (entity->HasFile()) {
auto *file_transmission_id = entity_request->mutable_upload_id();
entity->SetFileTransmissionId(file_transmission_id);
upload_files.push_back(entity->GetFileDescriptor());
}
this->status = TransactionStatus::READY();
return this->status.GetCode();
}
@@ -216,7 +265,8 @@ auto Transaction::Execute() -> TransactionStatus {
return status;
}
auto Transaction::ExecuteAsynchronously() noexcept -> StatusCode {
// TODO(tf) This has apparently a cognitive complexity of 39>25 (threshold).
auto Transaction::ExecuteAsynchronously() noexcept -> StatusCode { // NOLINT
if (!IsStatus(TransactionStatus::READY()) &&
!IsStatus(TransactionStatus::GO_ON())) {
return StatusCode::TRANSACTION_STATUS_ERROR;
@@ -239,128 +289,250 @@ auto Transaction::ExecuteAsynchronously() noexcept -> StatusCode {
}
this->status = TransactionStatus::EXECUTING();
grpc::Status grpc_status;
CompletionQueue cq;
// upload files first
if (!upload_files.empty()) {
CAOSDB_LOG_INFO(logger_name)
<< "Number of files to be uploaded: " << upload_files.size();
auto *registration_request =
Arena::CreateMessage<RegisterFileUploadRequest>(GetArena());
auto *registration_response =
Arena::CreateMessage<RegisterFileUploadResponse>(GetArena());
handler_ = std::make_unique<RegisterFileUploadHandler>(
&handler_, file_service.get(), &completion_queue, registration_request,
registration_response);
this->status = ProcessCalls();
if (registration_response->status() !=
RegistrationStatus::REGISTRATION_STATUS_ACCEPTED) {
this->status = TransactionStatus::FILE_UPLOAD_ERROR();
return StatusCode::EXECUTING;
}
grpc::ClientContext context;
std::unique_ptr<ClientAsyncResponseReader<MultiTransactionResponse>> rpc(
this->service_stub->PrepareAsyncMultiTransaction(&context, *(this->request),
&cq));
rpc->StartCall();
for (auto &file_descriptor : upload_files) {
file_descriptor.file_transmission_id->set_registration_id(
registration_response->registration_id());
CAOSDB_LOG_INFO(logger_name)
<< "Uploading " << file_descriptor.local_path;
handler_ = std::make_unique<UploadRequestHandler>(
&handler_, file_service.get(), &completion_queue, file_descriptor);
this->status = ProcessCalls();
if (this->status.GetCode() != StatusCode::EXECUTING) {
return StatusCode::EXECUTING;
}
}
}
int tag = 1;
void *send_tag = static_cast<void *>(&tag);
rpc->Finish(this->response, &grpc_status, send_tag);
void *recv_tag = nullptr;
bool ok = false;
handler_ = std::make_unique<EntityTransactionHandler>(
&handler_, entity_service.get(), &completion_queue, request, response);
this->status = ProcessCalls();
if (this->status.GetCode() != StatusCode::EXECUTING) {
return StatusCode::EXECUTING;
}
// TODO(tf) make this actually asynchronous by moving this to WaitForIt()
cq.Next(&recv_tag, &ok);
assert(recv_tag == send_tag);
assert(ok);
// file download afterwards
if (status.GetCode() == StatusCode::EXECUTING && !download_files.empty()) {
// run over all retrieved entities and get the download_id
for (auto sub_response : *(response->mutable_responses())) {
if (sub_response.transaction_response_case() ==
TransactionResponseCase::kRetrieveResponse) {
if (sub_response.retrieve_response()
.entity_response()
.has_download_id()) {
auto *entity_response =
sub_response.mutable_retrieve_response()->mutable_entity_response();
auto entity_id = entity_response->entity().id();
download_files[entity_id].file_transmission_id =
entity_response->release_download_id();
// TODO(tf) handle error
}
}
}
if (!grpc_status.ok()) {
switch (grpc_status.error_code()) {
case grpc::StatusCode::UNAUTHENTICATED:
this->status = TransactionStatus::AUTHENTICATION_ERROR();
break;
case grpc::StatusCode::UNAVAILABLE:
this->status = TransactionStatus::CONNECTION_ERROR();
break;
default:
auto error_details = std::to_string(grpc_status.error_code()) + " - " +
grpc_status.error_message();
this->status = TransactionStatus::RPC_ERROR(error_details);
for (const auto &item : download_files) {
auto file_descriptor(item.second);
CAOSDB_LOG_INFO(logger_name)
<< "Downloading " << file_descriptor.local_path;
handler_ = std::make_unique<DownloadRequestHandler>(
&handler_, file_service.get(), &completion_queue, file_descriptor);
this->status = ProcessCalls();
if (this->status.GetCode() != StatusCode::EXECUTING) {
return StatusCode::EXECUTING;
}
}
} else {
this->status = TransactionStatus::SUCCESS();
}
return StatusCode::EXECUTING;
}
auto Transaction::WaitForIt() const noexcept -> TransactionStatus {
if (this->response->responses_size() == 1) {
auto *responses = this->response->mutable_responses(0);
switch (responses->wrapped_response_case()) {
case WrappedResponseCase::kRetrieveResponse: {
auto *retrieve_response = responses->mutable_retrieve_response();
switch (retrieve_response->query_response_case()) {
case QueryResponseCase::kEntity: {
auto *entity = retrieve_response->release_entity();
if (!entity->errors().empty()) {
this->status = TransactionStatus::TRANSACTION_ERROR(
"The request returned with errors.");
}
this->result_set = std::make_unique<UniqueResult>(entity);
// TODO(tf) This has apparently a cognitive complexity of 36>25 (threshold).
auto Transaction::WaitForIt() const noexcept -> TransactionStatus { // NOLINT
if (this->status.GetCode() != StatusCode::EXECUTING) {
return this->status;
}
this->status = TransactionStatus::SUCCESS();
bool set_error = false;
auto *responses = this->response->mutable_responses();
std::vector<std::unique_ptr<Entity>> entities;
for (auto &sub_response : *responses) {
std::unique_ptr<Entity> result;
switch (sub_response.transaction_response_case()) {
case TransactionResponseCase::kRetrieveResponse: {
auto *retrieve_response = sub_response.mutable_retrieve_response();
switch (retrieve_response->retrieve_response_case()) {
case RetrieveResponseCase::kEntityResponse: {
auto *retrieve_entity_response =
retrieve_response->release_entity_response();
result = std::make_unique<Entity>(retrieve_entity_response);
} break;
case QueryResponseCase::kSelectResult: {
case RetrieveResponseCase::kSelectResult: {
CAOSDB_LOG_ERROR(logger_name) << "Results of a SELECT query cannot be "
"processed by this client yet.";
// TODO(tf) Select queries
} break;
case QueryResponseCase::kCountResult: {
case RetrieveResponseCase::kCountResult: {
this->query_count = retrieve_response->count_result();
std::vector<std::unique_ptr<Entity>> entities;
this->result_set =
std::make_unique<MultiResultSet>(std::move(entities));
} break;
case RetrieveResponseCase::kFindResult: {
std::unique_ptr<Entity> find_result;
for (auto &entity_response :
*retrieve_response->mutable_find_result()->mutable_result_set()) {
find_result = std::make_unique<Entity>(&entity_response);
if (find_result->HasErrors()) {
set_error = true;
}
entities.push_back(std::move(find_result));
}
} break;
default:
// TODO(tf) Error
CAOSDB_LOG_FATAL(logger_name) << "Received invalid QueryResponseCase.";
break;
}
} break;
case WrappedResponseCase::kUpdateResponse: {
auto *updatedIdResponse = responses->mutable_update_response();
if (!updatedIdResponse->entity_errors().empty()) {
this->status = TransactionStatus::TRANSACTION_ERROR(
"The request returned with errors.");
}
this->result_set = std::make_unique<UniqueResult>(updatedIdResponse);
} break;
case WrappedResponseCase::kInsertResponse: {
auto *insertedIdResponse = responses->mutable_insert_response();
if (!insertedIdResponse->entity_errors().empty()) {
this->status = TransactionStatus::TRANSACTION_ERROR(
"The request returned with errors.");
}
this->result_set = std::make_unique<UniqueResult>(insertedIdResponse);
} break;
case WrappedResponseCase::kDeleteResponse: {
auto *deletedIdResponse = responses->mutable_delete_response();
if (!deletedIdResponse->entity_errors().empty()) {
this->status = TransactionStatus::TRANSACTION_ERROR(
"The request returned with errors.");
}
this->result_set = std::make_unique<UniqueResult>(deletedIdResponse);
} break;
break; // break TransactionResponseCase::kRetrieveResponse
}
case TransactionResponseCase::kInsertResponse: {
auto *inserted_id_response =
sub_response.mutable_insert_response()->release_id_response();
result = std::make_unique<Entity>(inserted_id_response);
break;
}
case TransactionResponseCase::kDeleteResponse: {
auto *deleted_id_response =
sub_response.mutable_delete_response()->release_id_response();
result = std::make_unique<Entity>(deleted_id_response);
break;
}
case TransactionResponseCase::kUpdateResponse: {
auto *updated_id_response =
sub_response.mutable_update_response()->release_id_response();
result = std::make_unique<Entity>(updated_id_response);
break;
}
default:
// TODO(tf) Error and Update
CAOSDB_LOG_FATAL(logger_name)
<< "Received invalid TransactionResponseCase.";
break;
}
} else {
auto *responses = this->response->mutable_responses();
std::vector<std::unique_ptr<Entity>> entities;
for (auto sub_response : *responses) {
switch (sub_response.wrapped_response_case()) {
case WrappedResponseCase::kRetrieveResponse:
entities.push_back(std::make_unique<Entity>(
sub_response.mutable_retrieve_response()->release_entity()));
break;
case WrappedResponseCase::kInsertResponse:
entities.push_back(
std::make_unique<Entity>(sub_response.release_insert_response()));
break;
case WrappedResponseCase::kDeleteResponse:
entities.push_back(
std::make_unique<Entity>(sub_response.release_insert_response()));
break;
default:
// TODO(tf) Updates
break;
if (result != nullptr) {
if (result->HasErrors()) {
set_error = true;
}
entities.push_back(std::move(result));
}
}
// copy local path of downloaded files into the entities file descriptor
for (auto &entity : entities) {
auto id = entity->GetId();
if (!id.empty() && download_files.count(id) == 1) {
const auto &local_path = download_files.at(id).local_path;
entity->SetLocalPath(local_path);
}
this->result_set = std::make_unique<MultiResultSet>(std::move(entities));
}
this->result_set = std::make_unique<MultiResultSet>(std::move(entities));
if (set_error) {
this->status = TransactionStatus::TRANSACTION_ERROR(
"The request terminated with errors.");
}
return this->status;
}
auto Transaction::ProcessCalls() -> TransactionStatus {
gpr_timespec deadline;
deadline.tv_sec = 1;
deadline.tv_nsec = 0;
deadline.clock_type = gpr_clock_type::GPR_TIMESPAN;
TransactionStatus result = TransactionStatus::EXECUTING();
handler_->Start();
void *tag = nullptr;
bool ok = false;
while (true) {
switch (completion_queue.AsyncNext(&tag, &ok, deadline)) {
case NextStatus::GOT_EVENT: {
if (tag != nullptr) {
auto res = handler_->OnNext(ok);
if (!res) {
// The handler has finished it's work
result = handler_->GetStatus();
handler_.reset();
return result;
}
} else {
std::string description("Invalid tag delivered by notification queue.");
CAOSDB_LOG_ERROR(logger_name) << description;
handler_.reset();
return TransactionStatus::RPC_ERROR(description);
}
} break;
case NextStatus::SHUTDOWN: {
CAOSDB_LOG_ERROR(logger_name)
<< "Notification queue has been shut down unexpectedly.";
result = handler_->GetStatus();
handler_.reset();
return result;
} break;
case NextStatus::TIMEOUT: {
CAOSDB_LOG_DEBUG(logger_name) << "Timeout, waiting...";
} break;
default:
CAOSDB_LOG_FATAL(logger_name)
<< "Got an invalid NextStatus from CompletionQueue.";
result = handler_->GetStatus();
handler_.reset();
return result;
}
}
result = handler_->GetStatus();
handler_.reset();
return result;
}
Transaction::~Transaction() {
this->Cancel();
completion_queue.Shutdown();
// drain the queue
void *ignoredTag = nullptr;
bool ok = false;
while (completion_queue.Next(&ignoredTag, &ok)) {
;
}
}
void Transaction::Cancel() {
// TODO(tf) State Canceled
if (handler_ != nullptr) {
handler_->Cancel();
}
}
} // namespace caosdb::transaction
Loading