Skip to content
Snippets Groups Projects
Verified Commit cba3d369 authored by Timm Fitschen's avatar Timm Fitschen
Browse files

WIP: files

parent 9f011472
Branches
Tags
1 merge request!11F files
Pipeline #11649 failed
Showing
with 567 additions and 79 deletions
...@@ -36,6 +36,16 @@ set(libcaosdb_INCL ...@@ -36,6 +36,16 @@ set(libcaosdb_INCL
${CMAKE_CURRENT_SOURCE_DIR}/caosdb/transaction.h ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/transaction.h
${CMAKE_CURRENT_SOURCE_DIR}/caosdb/transaction_status.h ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/transaction_status.h
${CMAKE_CURRENT_SOURCE_DIR}/caosdb/utility.h ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/utility.h
${CMAKE_CURRENT_SOURCE_DIR}/caosdb/filestreaming/Client.h
${CMAKE_CURRENT_SOURCE_DIR}/caosdb/filestreaming/UploadRequestHandler.h
${CMAKE_CURRENT_SOURCE_DIR}/caosdb/filestreaming/DownloadRequestHandler.h
${CMAKE_CURRENT_SOURCE_DIR}/caosdb/filestreaming/HandlerInterface.h
${CMAKE_CURRENT_SOURCE_DIR}/caosdb/filestreaming/RequestStatus.h
${CMAKE_CURRENT_SOURCE_DIR}/caosdb/filestreaming/FileWriter.h
${CMAKE_CURRENT_SOURCE_DIR}/caosdb/filestreaming/FileReader.h
${CMAKE_CURRENT_SOURCE_DIR}/caosdb/filestreaming/FileLock.h
${CMAKE_CURRENT_SOURCE_DIR}/caosdb/filestreaming/FileError.h
${CMAKE_CURRENT_SOURCE_DIR}/caosdb/filestreaming/FileManager.h
) )
# pass variable to parent scope # pass variable to parent scope
......
...@@ -29,31 +29,35 @@ ...@@ -29,31 +29,35 @@
#ifndef CAOSDB_ENTITY_H #ifndef CAOSDB_ENTITY_H
#define CAOSDB_ENTITY_H #define CAOSDB_ENTITY_H
#include <string> // for string #include "caosdb/entity/v1alpha1/main.pb.h" // for ProtoEntity, ProtoParent...
#include "caosdb/entity/v1alpha1/main.pb.h" // for RepeatedPtrField, Message #include "caosdb/status_code.h"
#include "google/protobuf/message.h" // for RepeatedPtrField
#include "caosdb/message_code.h" // for get_message_code, Messag... #include "caosdb/message_code.h" // for get_message_code, Messag...
#include "google/protobuf/util/json_util.h" // for MessageToJsonString, Jso... #include "google/protobuf/util/json_util.h" // for MessageToJsonString, Jso...
#include "boost/filesystem/path.hpp" // for path #include <boost/filesystem/path.hpp> // for path
#include <boost/filesystem.hpp>
#include <random>
#include <string> // for string
#include <stdexcept> // for out_of_range
namespace caosdb::entity { namespace caosdb::entity {
using boost::filesystem::exists;
using boost::filesystem::is_directory;
using caosdb::entity::v1alpha1::IdResponse; using caosdb::entity::v1alpha1::IdResponse;
using ProtoParent = caosdb::entity::v1alpha1::Parent; using ProtoParent = caosdb::entity::v1alpha1::Parent;
using ProtoProperty = caosdb::entity::v1alpha1::Property; using ProtoProperty = caosdb::entity::v1alpha1::Property;
using ProtoEntity = caosdb::entity::v1alpha1::Entity; using ProtoEntity = caosdb::entity::v1alpha1::Entity;
using ProtoFileDescriptor = caosdb::entity::v1alpha1::FileDescriptor;
using ProtoMessage = caosdb::entity::v1alpha1::Message;
using caosdb::StatusCode;
using caosdb::entity::v1alpha1::EntityResponse;
using caosdb::entity::v1alpha1::FileTransmissionId; using caosdb::entity::v1alpha1::FileTransmissionId;
using google::protobuf::RepeatedPtrField;
class FileDescriptor { struct FileDescriptor {
public: FileTransmissionId *file_transmission_id;
auto GetEntityId() const -> const std::string &; ProtoFileDescriptor *wrapped;
auto GetLocalPath() const -> const boost::filesystem::path &; boost::filesystem::path local_path;
auto GetRemotePath() const -> const std::string &;
auto GetSize() const -> long long;
private:
std::string entity_id;
std::string remote_path;
std::string local_path;
long long size;
}; };
/** /**
...@@ -79,10 +83,9 @@ public: ...@@ -79,10 +83,9 @@ public:
friend class Messages; friend class Messages;
private: private:
explicit inline Message(caosdb::entity::v1alpha1::Message *wrapped) explicit inline Message(ProtoMessage *wrapped) : wrapped(wrapped){};
: wrapped(wrapped){};
caosdb::entity::v1alpha1::Message *wrapped; ProtoMessage *wrapped;
}; };
/** /**
...@@ -90,8 +93,16 @@ private: ...@@ -90,8 +93,16 @@ private:
*/ */
class Messages { class Messages {
public: public:
[[nodiscard]] inline auto Size() const -> int { return wrapped->size(); } [[nodiscard]] inline auto Size() const -> int {
if (wrapped == nullptr) {
return 0;
}
return wrapped->size();
}
[[nodiscard]] inline auto At(int index) const -> const Message { [[nodiscard]] inline auto At(int index) const -> const Message {
if (wrapped == nullptr) {
throw std::out_of_range("Number of messages: 0");
}
return Message(&(wrapped->at(index))); return Message(&(wrapped->at(index)));
} }
...@@ -103,8 +114,7 @@ public: ...@@ -103,8 +114,7 @@ public:
private: private:
inline Messages() : wrapped(nullptr){}; inline Messages() : wrapped(nullptr){};
::google::protobuf::RepeatedPtrField<caosdb::entity::v1alpha1::Message> RepeatedPtrField<ProtoMessage> *wrapped;
*wrapped;
}; };
/** /**
...@@ -232,8 +242,7 @@ public: ...@@ -232,8 +242,7 @@ public:
private: private:
inline Parents(){}; inline Parents(){};
explicit inline Parents( explicit inline Parents(
::google::protobuf::RepeatedPtrField<caosdb::entity::v1alpha1::Parent> RepeatedPtrField<caosdb::entity::v1alpha1::Parent> *wrapped)
*wrapped)
: wrapped(wrapped){}; : wrapped(wrapped){};
/** /**
...@@ -246,8 +255,7 @@ private: ...@@ -246,8 +255,7 @@ private:
* The collection of parent messages which serves as a backend for this * The collection of parent messages which serves as a backend for this
* class. * class.
*/ */
::google::protobuf::RepeatedPtrField<caosdb::entity::v1alpha1::Parent> RepeatedPtrField<caosdb::entity::v1alpha1::Parent> *wrapped;
*wrapped;
}; };
/** /**
...@@ -369,8 +377,7 @@ public: ...@@ -369,8 +377,7 @@ public:
private: private:
inline Properties(){}; inline Properties(){};
explicit inline Properties( explicit inline Properties(
::google::protobuf::RepeatedPtrField<caosdb::entity::v1alpha1::Property> RepeatedPtrField<caosdb::entity::v1alpha1::Property> *wrapped)
*wrapped)
: wrapped(wrapped){}; : wrapped(wrapped){};
/** /**
...@@ -380,8 +387,7 @@ private: ...@@ -380,8 +387,7 @@ private:
*/ */
auto Append(const Property &property) -> void; auto Append(const Property &property) -> void;
::google::protobuf::RepeatedPtrField<caosdb::entity::v1alpha1::Property> RepeatedPtrField<caosdb::entity::v1alpha1::Property> *wrapped;
*wrapped;
}; };
/** /**
...@@ -394,12 +400,18 @@ public: ...@@ -394,12 +400,18 @@ public:
this->wrapped->CopyFrom(*original.wrapped); this->wrapped->CopyFrom(*original.wrapped);
}; };
explicit Entity(IdResponse *idResponse); explicit Entity(IdResponse *idResponse);
explicit inline Entity(ProtoEntity *wrapped) : wrapped(wrapped) { explicit Entity(ProtoEntity *wrapped) : wrapped(wrapped) {
errors.wrapped = this->wrapped->mutable_errors();
warnings.wrapped = this->wrapped->mutable_warnings();
infos.wrapped = this->wrapped->mutable_infos();
properties.wrapped = this->wrapped->mutable_properties(); properties.wrapped = this->wrapped->mutable_properties();
parents.wrapped = this->wrapped->mutable_parents(); parents.wrapped = this->wrapped->mutable_parents();
errors.wrapped = CreateMessagesField();
warnings.wrapped = CreateMessagesField();
infos.wrapped = CreateMessagesField();
};
explicit inline Entity(EntityResponse *response)
: Entity(response->release_entity()) {
errors.wrapped->Swap(response->mutable_errors());
warnings.wrapped->Swap(response->mutable_warnings());
infos.wrapped->Swap(response->mutable_infos());
}; };
[[nodiscard]] inline auto GetId() const noexcept -> const std::string & { [[nodiscard]] inline auto GetId() const noexcept -> const std::string & {
...@@ -471,21 +483,59 @@ public: ...@@ -471,21 +483,59 @@ public:
*/ */
auto CopyTo(ProtoEntity *target) -> void; auto CopyTo(ProtoEntity *target) -> void;
auto SetFileTransmissionId(const std::string &registration_id,
const std::string &file_id) -> void;
auto SetFilePath(const std::string &path) -> void; auto SetFilePath(const std::string &path) -> void;
inline auto GetFileTransmissionId() const -> const FileTransmissionId & { inline auto GetFileTransmissionId() const -> const FileTransmissionId & {
return *(this->file_transmission_id); return *(this->file_transmission_id);
} }
inline auto HasFile() const -> bool { inline auto HasFile() const -> bool {
return this->file_transmission_id != nullptr; return !this->file_descriptor.local_path.empty();
}
auto SetFileTransmissionRegistrationId(const std::string &registration_id)
-> void;
inline auto SetFileTransmissionId(FileTransmissionId *file_transmission_id)
-> void {
file_transmission_id->set_file_id(GetNextFileId());
file_descriptor.file_transmission_id = file_transmission_id;
}
inline auto GetFileDescriptor() -> FileDescriptor & {
return this->file_descriptor;
}
inline auto SetLocalPath(const boost::filesystem::path &local_path) noexcept
-> StatusCode {
if (GetRole() != "File") {
return StatusCode::NOT_A_FILE_ENTITY;
}
if (!exists(local_path)) {
return StatusCode::FILE_DOES_NOT_EXIST_LOCALLY;
}
if (is_directory(local_path)) {
return StatusCode::PATH_IS_A_DIRECTORY;
}
this->file_descriptor.local_path = local_path;
return StatusCode::SUCCESS;
} }
private: private:
inline auto GetNextFileId() -> std::string {
std::string str = "0123456789abcdef";
std::mt19937 generator(std::random_device{}());
std::uniform_int_distribution<int> distribution(0, str.size() - 1);
std::string result(10, '\0');
for (auto &dis : result) {
dis = str[distribution(generator)];
}
return result;
}
static auto CreateProtoEntity() -> ProtoEntity *; static auto CreateProtoEntity() -> ProtoEntity *;
static auto CreateMessagesField() -> RepeatedPtrField<ProtoMessage> *;
auto SetId(const std::string &id) -> void; auto SetId(const std::string &id) -> void;
auto SetVersionId(const std::string &id) -> void; auto SetVersionId(const std::string &id) -> void;
FileTransmissionId *file_transmission_id = nullptr; FileTransmissionId *file_transmission_id = nullptr;
FileDescriptor file_descriptor;
ProtoEntity *wrapped; ProtoEntity *wrapped;
Properties properties; Properties properties;
Parents parents; Parents parents;
......
#include "caosdb/filestreaming/HandlerInterface.h"
#include "caosdb/entity/v1alpha1/main.grpc.pb.h"
#include "caosdb/entity.h"
#include "caosdb/status_code.h"
#include <grpcpp/grpcpp.h>
#include <iostream>
#include <memory>
#include <string>
namespace FileExchange {
using caosdb::StatusCode;
using caosdb::entity::FileDescriptor;
using caosdb::entity::v1alpha1::FileTransmissionService;
class FileExchangeClient final {
public:
FileExchangeClient(
const std::shared_ptr<FileTransmissionService::Stub> &service)
: stub_(service) {}
~FileExchangeClient();
FileExchangeClient(const FileExchangeClient &) = delete;
FileExchangeClient &operator=(const FileExchangeClient &) = delete;
FileExchangeClient(FileExchangeClient &&) = delete;
FileExchangeClient &operator=(FileExchangeClient &&) = delete;
StatusCode upload(const FileDescriptor &file_descriptor);
StatusCode download(const FileDescriptor &file_descriptor);
void cancel();
private:
int processMessages();
grpc::CompletionQueue cq_;
std::shared_ptr<FileTransmissionService::Stub> stub_;
std::unique_ptr<HandlerInterface> handler_;
};
} // namespace FileExchange
#include "caosdb/filestreaming/HandlerInterface.h"
#include "caosdb/entity/v1alpha1/main.grpc.pb.h"
#include "caosdb/entity.h"
#include "caosdb/filestreaming/FileWriter.h"
#include <grpcpp/grpcpp.h>
namespace FileExchange {
using caosdb::entity::FileDescriptor;
using caosdb::entity::v1alpha1::FileDownloadRequest;
using caosdb::entity::v1alpha1::FileDownloadResponse;
using caosdb::entity::v1alpha1::FileTransmissionService;
class DownloadRequestHandler : public HandlerInterface {
public:
DownloadRequestHandler(HandlerTag tag, FileTransmissionService::Stub *stub,
grpc::CompletionQueue *cq,
const FileDescriptor &file_descriptor);
~DownloadRequestHandler() override = default;
DownloadRequestHandler(const DownloadRequestHandler &) = delete;
DownloadRequestHandler &operator=(const DownloadRequestHandler &) = delete;
DownloadRequestHandler(DownloadRequestHandler &&) = delete;
DownloadRequestHandler &operator=(DownloadRequestHandler &&) = delete;
bool onNext(bool ok) override;
void cancel() override;
protected:
enum class CallState { NewCall, SendingRequest, ReceivingFile, CallComplete };
void handleNewCallState();
void handleSendingRequestState();
void handleReceivingFileState();
void handleCallCompleteState();
HandlerTag tag_;
FileTransmissionService::Stub *stub_;
grpc::CompletionQueue *cq_;
grpc::ClientContext ctx_;
std::unique_ptr<grpc::ClientAsyncReader<FileDownloadResponse>> rpc_;
FileDownloadRequest *request_;
FileDownloadResponse *response_;
grpc::Status status_;
CallState state_;
std::unique_ptr<FileWriter> fileWriter_;
FileDescriptor file_descriptor_;
unsigned long long bytesReceived_;
};
} // namespace FileExchange
#pragma once
#include <stdexcept>
#include <string>
namespace FileExchange {
class FileLockError : public std::runtime_error {
public:
FileLockError(const std::string &message) : std::runtime_error(message) {}
};
class FileIOError : public std::runtime_error {
public:
FileIOError(const std::string &message) : std::runtime_error(message) {}
};
class FileNotManagedError : public std::runtime_error {
public:
FileNotManagedError(const std::string &message)
: std::runtime_error(message) {}
};
} // namespace FileExchange
#pragma once
#include <mutex>
#include <shared_mutex>
namespace FileExchange {
using FileMutex = std::shared_timed_mutex;
using FileReadLock = std::shared_lock<FileMutex>;
using FileWriteLock = std::unique_lock<FileMutex>;
} // namespace FileExchange
#pragma once
#include "FileLock.h"
#include "FileReader.h"
#include "FileWriter.h"
#include <memory>
#include <mutex>
#include <string>
#include <unordered_map>
namespace FileExchange {
class FileManager final {
public:
FileManager() = default;
FileManager(const std::string &filename);
template <class InputIt> FileManager(InputIt first, InputIt last);
~FileManager() = default;
// FileManager is not copyable
FileManager(const FileManager &) = delete;
FileManager &operator=(const FileManager &) = delete;
// FileManager is movable
FileManager(FileManager &&) = default;
FileManager &operator=(FileManager &&) = default;
bool isManaged(const std::string &filename) const;
std::unique_ptr<FileReader> readFile(const std::string &filename) const;
std::unique_ptr<FileWriter> writeFile(const std::string &filename);
private:
void manageFile(const std::string &filename);
mutable std::unordered_map<std::string, std::shared_ptr<FileMutex>>
fileMutexes_;
mutable std::mutex mgrMutex_;
};
template <class InputIt> FileManager::FileManager(InputIt first, InputIt last) {
while (first != last) {
const auto &filename = *first;
this->manageFile(filename);
++first;
}
}
} // namespace FileExchange
#pragma once
#include "caosdb/filestreaming/FileLock.h"
#include <boost/filesystem.hpp>
#include <boost/filesystem/fstream.hpp>
#include <boost/filesystem/string_file.hpp>
#include <fstream>
#include <memory>
#include <string>
namespace FileExchange {
using boost::filesystem::exists;
using boost::filesystem::ifstream;
using boost::filesystem::path;
class FileReader final {
public:
FileReader(const boost::filesystem::path &filename);
FileReader(const boost::filesystem::path &filename,
const std::shared_ptr<FileMutex> &mutexPtr);
~FileReader() = default;
FileReader(const FileReader &) = delete;
FileReader &operator=(const FileReader &) = delete;
FileReader(FileReader &&) = default;
FileReader &operator=(FileReader &&) = default;
unsigned long long fileSize() const { return size_; }
std::size_t read(std::string &buffer);
private:
void openFile();
std::ifstream stream_;
boost::filesystem::path filename_;
unsigned long long size_;
std::shared_ptr<FileMutex> mutexPtr_;
FileReadLock lock_;
};
} // namespace FileExchange
#pragma once
#include "FileLock.h"
#include <fstream>
#include <memory>
#include <string>
namespace FileExchange {
class FileWriter final {
public:
FileWriter(const std::string &filename);
FileWriter(const std::string &filename,
const std::shared_ptr<FileMutex> &mutexPtr);
~FileWriter() = default;
FileWriter(const FileWriter &) = delete;
FileWriter &operator=(const FileWriter &) = delete;
FileWriter(FileWriter &&) = default;
FileWriter &operator=(FileWriter &&) = default;
void write(const std::string &buffer);
private:
void openFile();
std::ofstream stream_;
std::string filename_;
std::shared_ptr<FileMutex> mutexPtr_;
FileWriteLock lock_;
};
} // namespace FileExchange
#pragma once
#include <memory>
#include <string>
namespace FileExchange {
const std::string logger_name("caosdb::transaction::file_transmission");
class HandlerInterface {
public:
virtual ~HandlerInterface() = default;
virtual bool onNext(bool ok) = 0;
virtual void cancel() = 0;
};
using HandlerPtr = std::unique_ptr<HandlerInterface>;
using HandlerTag = HandlerPtr *;
} // namespace FileExchange
MIT License
Copyright (c) 2019 NeiRo21
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
#pragma once
#include <grpcpp/grpcpp.h>
namespace FileExchange {
namespace RequestStatus {
extern const grpc::Status FileHeaderExpected;
extern const grpc::Status FileNameEmpty;
extern const grpc::Status FileChunkExpected;
extern const grpc::Status FileNotFound;
extern const grpc::Status FileLocked;
extern const grpc::Status FileIOError;
extern const grpc::Status UnknownError;
} // namespace RequestStatus
} // namespace FileExchange
#include "caosdb/filestreaming/HandlerInterface.h"
#include "caosdb/entity.h"
#include "caosdb/entity/v1alpha1/main.grpc.pb.h"
#include "caosdb/filestreaming/FileManager.h"
#include <grpcpp/grpcpp.h>
namespace FileExchange {
using caosdb::entity::FileDescriptor;
using caosdb::entity::v1alpha1::FileTransmissionService;
using caosdb::entity::v1alpha1::FileUploadRequest;
using caosdb::entity::v1alpha1::FileUploadResponse;
class UploadRequestHandler : public HandlerInterface {
public:
UploadRequestHandler(HandlerTag tag, FileTransmissionService::Stub *stub,
grpc::CompletionQueue *cq,
const FileDescriptor &file_descriptor);
~UploadRequestHandler() override = default;
UploadRequestHandler(const UploadRequestHandler &) = delete;
UploadRequestHandler &operator=(const UploadRequestHandler &) = delete;
UploadRequestHandler(UploadRequestHandler &&) = delete;
UploadRequestHandler &operator=(UploadRequestHandler &&) = delete;
bool onNext(bool ok) override;
void cancel() override;
protected:
enum class CallState {
NewCall,
SendingHeader,
SendingFile,
ExpectingResponse,
CallComplete
};
void handleNewCallState();
void handleSendingHeaderState();
void handleSendingFileState();
void handleExpectingResponseState();
void handleCallCompleteState();
HandlerTag tag_;
FileTransmissionService::Stub *stub_;
grpc::CompletionQueue *cq_;
grpc::ClientContext ctx_;
std::unique_ptr<grpc::ClientAsyncWriter<FileUploadRequest>> rpc_;
FileUploadRequest *request_;
FileUploadResponse *response_;
grpc::Status status_;
CallState state_;
std::unique_ptr<FileReader> fileReader_;
FileDescriptor file_descriptor_;
unsigned long long bytesToSend_;
};
} // namespace FileExchange
...@@ -52,6 +52,11 @@ enum StatusCode { ...@@ -52,6 +52,11 @@ enum StatusCode {
TRANSACTION_TYPE_ERROR = 26, TRANSACTION_TYPE_ERROR = 26,
UNSUPPORTED_FEATURE = 27, UNSUPPORTED_FEATURE = 27,
ORIGINAL_ENTITY_MISSING_ID = 28, ORIGINAL_ENTITY_MISSING_ID = 28,
NOT_A_FILE_ENTITY = 29,
PATH_IS_A_DIRECTORY = 30,
FILE_DOES_NOT_EXIST_LOCALLY = 31,
FILE_UPLOAD_ERROR = 32,
FILE_DOWNLOAD_ERROR = 33,
}; };
auto get_status_description(int code) -> const std::string &; auto get_status_description(int code) -> const std::string &;
......
...@@ -160,17 +160,18 @@ ...@@ -160,17 +160,18 @@
namespace caosdb::transaction { namespace caosdb::transaction {
using caosdb::entity::Entity; using caosdb::entity::Entity;
using caosdb::entity::FileDescriptor; using caosdb::entity::FileDescriptor;
using ProtoEntity = caosdb::entity::v1alpha1::Entity; using caosdb::entity::v1alpha1::EntityResponse;
using caosdb::entity::v1alpha1::EntityTransactionService; using caosdb::entity::v1alpha1::EntityTransactionService;
using caosdb::entity::v1alpha1::FileDownloadRequest;
using caosdb::entity::v1alpha1::FileDownloadResponse; using caosdb::entity::v1alpha1::FileDownloadResponse;
using caosdb::entity::v1alpha1::FileTransmissionId;
using caosdb::entity::v1alpha1::FileTransmissionService; using caosdb::entity::v1alpha1::FileTransmissionService;
using caosdb::entity::v1alpha1::FileUploadRequest; using caosdb::entity::v1alpha1::FileUploadRequest;
using caosdb::entity::v1alpha1::FileUploadResponse; using caosdb::entity::v1alpha1::FileUploadResponse;
using caosdb::entity::v1alpha1::IdResponse; using caosdb::entity::v1alpha1::IdResponse;
using caosdb::entity::v1alpha1::MultiTransactionRequest; using caosdb::entity::v1alpha1::MultiTransactionRequest;
using caosdb::entity::v1alpha1::MultiTransactionResponse; using caosdb::entity::v1alpha1::MultiTransactionResponse;
using caosdb::entity::v1alpha1::RegisterFileDownloadRequest; using caosdb::entity::v1alpha1::RegisterFileUploadRequest;
using caosdb::entity::v1alpha1::RegisterFileDownloadResponse;
using caosdb::entity::v1alpha1::RegisterFileUploadResponse; using caosdb::entity::v1alpha1::RegisterFileUploadResponse;
using caosdb::transaction::TransactionStatus; using caosdb::transaction::TransactionStatus;
using WrappedResponseCase = using WrappedResponseCase =
...@@ -242,10 +243,11 @@ template <class T> auto ResultSet<T>::end() const -> ResultSet<T>::iterator { ...@@ -242,10 +243,11 @@ template <class T> auto ResultSet<T>::end() const -> ResultSet<T>::iterator {
return ResultSet<T>::iterator(this, Size()); return ResultSet<T>::iterator(this, Size());
} }
template <class T> class IMultiResultSet : public ResultSet<T> { template <class T> class AbstractMultiResultSet : public ResultSet<T> {
public: public:
virtual ~IMultiResultSet() = default; virtual ~AbstractMultiResultSet() = default;
inline explicit IMultiResultSet(std::vector<std::unique_ptr<T>> result_set) inline explicit AbstractMultiResultSet(
std::vector<std::unique_ptr<T>> result_set)
: items(std::move(result_set)) {} : items(std::move(result_set)) {}
[[nodiscard]] inline auto Size() const noexcept -> int override { [[nodiscard]] inline auto Size() const noexcept -> int override {
return this->items.size(); return this->items.size();
...@@ -258,7 +260,10 @@ protected: ...@@ -258,7 +260,10 @@ protected:
std::vector<std::unique_ptr<T>> items; std::vector<std::unique_ptr<T>> items;
}; };
class FilesResultSet : public IMultiResultSet<FileDescriptor> { /**
* Container of files which have been downloaded during a transaction.
*/
class FilesResultSet : public AbstractMultiResultSet<FileDescriptor> {
public: public:
~FilesResultSet() = default; ~FilesResultSet() = default;
explicit FilesResultSet( explicit FilesResultSet(
...@@ -271,7 +276,7 @@ public: ...@@ -271,7 +276,7 @@ public:
* In contrast to UniqueResult, this one can also hold multiple entities or zero * In contrast to UniqueResult, this one can also hold multiple entities or zero
* entities. * entities.
*/ */
class MultiResultSet : public IMultiResultSet<Entity> { class MultiResultSet : public AbstractMultiResultSet<Entity> {
public: public:
~MultiResultSet() = default; ~MultiResultSet() = default;
explicit MultiResultSet(std::vector<std::unique_ptr<Entity>> result_set); explicit MultiResultSet(std::vector<std::unique_ptr<Entity>> result_set);
...@@ -286,8 +291,8 @@ public: ...@@ -286,8 +291,8 @@ public:
class UniqueResult : public ResultSet<Entity> { class UniqueResult : public ResultSet<Entity> {
public: public:
~UniqueResult() = default; ~UniqueResult() = default;
explicit inline UniqueResult(ProtoEntity *protoEntity) explicit inline UniqueResult(EntityResponse *entityResponse)
: entity(new Entity(protoEntity)){}; : entity(new Entity(entityResponse)){};
explicit inline UniqueResult(IdResponse *idResponse) explicit inline UniqueResult(IdResponse *idResponse)
: entity(new Entity(idResponse)){}; : entity(new Entity(idResponse)){};
[[nodiscard]] auto GetEntity() const -> const Entity &; [[nodiscard]] auto GetEntity() const -> const Entity &;
...@@ -309,14 +314,6 @@ private: ...@@ -309,14 +314,6 @@ private:
*/ */
class Transaction { class Transaction {
public: public:
auto UploadFile(FileUploadResponse *response,
const std::string &registration_id) -> void;
auto RegisterUploadFile(RegisterFileUploadResponse *response) -> void;
auto DownloadFile(FileDownloadResponse *response,
const RegisterFileDownloadResponse &registration_response)
-> void;
auto RegisterDownloadFile(const RegisterFileDownloadRequest &request,
RegisterFileDownloadResponse *response) -> void;
/** /**
* The transaction type restricts the kind of sub-transaction which may be * The transaction type restricts the kind of sub-transaction which may be
* added to a transaction (insertion, update, deletion, retrieval). * added to a transaction (insertion, update, deletion, retrieval).
...@@ -347,10 +344,6 @@ public: ...@@ -347,10 +344,6 @@ public:
*/ */
auto RetrieveAndDownloadFilesById(const std::string &id) noexcept auto RetrieveAndDownloadFilesById(const std::string &id) noexcept
-> StatusCode; -> StatusCode;
auto DownloadFilesById(const std::string &path) noexcept -> StatusCode;
auto RetrieveAndDownloadFilesByQuery(const std::string &query) noexcept
-> StatusCode;
auto DownloadFilesByQuery(const std::string &query) noexcept -> StatusCode;
/** /**
* Return a ResultSet<FileDescriptor>. * Return a ResultSet<FileDescriptor>.
...@@ -493,7 +486,16 @@ public: ...@@ -493,7 +486,16 @@ public:
return out; return out;
} }
std::vector<FileDescriptor> upload_files;
std::vector<FileDescriptor> download_files;
private: private:
auto RegisterUploadFile(RegisterFileUploadResponse *response) -> void;
auto UploadFile(FileUploadResponse *response,
const FileDescriptor &file_descriptor,
const std::string &registration_id) -> void;
auto DownloadFile(FileDownloadResponse *response,
const FileTransmissionId &file_transmission_id) -> void;
bool has_query = false; bool has_query = false;
TransactionType transaction_type = TransactionType::NONE; TransactionType transaction_type = TransactionType::NONE;
mutable std::unique_ptr<ResultSet<Entity>> result_set; mutable std::unique_ptr<ResultSet<Entity>> result_set;
......
...@@ -119,6 +119,14 @@ public: ...@@ -119,6 +119,14 @@ public:
caosdb::get_status_description(StatusCode::AUTHENTICATION_ERROR) + caosdb::get_status_description(StatusCode::AUTHENTICATION_ERROR) +
" Original error: " + details); " Original error: " + details);
} }
/**
* Factory for a FILE_UPLOAD_ERROR status.
*
* This status means that the transaction failed during the upload of the
* file blobs of file entities.
*/
CAOSDB_TRANSACTION_STATUS_DEFAULT_FACTORY(FILE_UPLOAD_ERROR,
StatusCode::FILE_UPLOAD_ERROR);
/** /**
* Factory for a TRANSACTION_ERROR status. * Factory for a TRANSACTION_ERROR status.
* *
......
...@@ -33,6 +33,8 @@ ...@@ -33,6 +33,8 @@
#include <memory> #include <memory>
#include <string> #include <string>
#include <string_view> #include <string_view>
#include <mutex>
#include <shared_mutex>
namespace caosdb::utility { namespace caosdb::utility {
using boost::filesystem::exists; using boost::filesystem::exists;
......
Subproject commit d78d4c76f5cd08dd418e1ab42183d1172cb0b383 Subproject commit ab5494165946c032325a2d37dec1d563ffc8a959
...@@ -28,6 +28,13 @@ set(libcaosdb_SRC ...@@ -28,6 +28,13 @@ set(libcaosdb_SRC
${CMAKE_CURRENT_SOURCE_DIR}/caosdb/configuration.cpp ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/configuration.cpp
${CMAKE_CURRENT_SOURCE_DIR}/caosdb/protobuf_helper.cpp ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/protobuf_helper.cpp
${CMAKE_CURRENT_SOURCE_DIR}/caosdb/transaction.cpp ${CMAKE_CURRENT_SOURCE_DIR}/caosdb/transaction.cpp
${CMAKE_CURRENT_SOURCE_DIR}/caosdb/filestreaming/Client.cpp
${CMAKE_CURRENT_SOURCE_DIR}/caosdb/filestreaming/UploadRequestHandler.cpp
${CMAKE_CURRENT_SOURCE_DIR}/caosdb/filestreaming/DownloadRequestHandler.cpp
${CMAKE_CURRENT_SOURCE_DIR}/caosdb/filestreaming/RequestStatus.cpp
${CMAKE_CURRENT_SOURCE_DIR}/caosdb/filestreaming/FileWriter.cpp
${CMAKE_CURRENT_SOURCE_DIR}/caosdb/filestreaming/FileReader.cpp
${CMAKE_CURRENT_SOURCE_DIR}/caosdb/filestreaming/FileManager.cpp
) )
# pass variable to parent scope # pass variable to parent scope
......
...@@ -29,7 +29,10 @@ using caosdb::entity::v1alpha1::IdResponse; ...@@ -29,7 +29,10 @@ using caosdb::entity::v1alpha1::IdResponse;
using ProtoParent = caosdb::entity::v1alpha1::Parent; using ProtoParent = caosdb::entity::v1alpha1::Parent;
using ProtoProperty = caosdb::entity::v1alpha1::Property; using ProtoProperty = caosdb::entity::v1alpha1::Property;
using ProtoEntity = caosdb::entity::v1alpha1::Entity; using ProtoEntity = caosdb::entity::v1alpha1::Entity;
using ProtoMessage = caosdb::entity::v1alpha1::Message;
using ProtoFileDescriptor = caosdb::entity::v1alpha1::FileDescriptor;
using caosdb::utility::get_arena; using caosdb::utility::get_arena;
using google::protobuf::Arena;
Parent::Parent() : wrapped(Parent::CreateProtoParent()) { Parent::Parent() : wrapped(Parent::CreateProtoParent()) {
// TODO(fspreck) Re-enable once we have decided how to attach // TODO(fspreck) Re-enable once we have decided how to attach
...@@ -40,7 +43,7 @@ Parent::Parent() : wrapped(Parent::CreateProtoParent()) { ...@@ -40,7 +43,7 @@ Parent::Parent() : wrapped(Parent::CreateProtoParent()) {
} }
auto Parent::CreateProtoParent() -> ProtoParent * { auto Parent::CreateProtoParent() -> ProtoParent * {
return google::protobuf::Arena::CreateMessage<ProtoParent>(get_arena()); return Arena::CreateMessage<ProtoParent>(get_arena());
} }
auto Parent::SetName(const std::string &name) -> void { auto Parent::SetName(const std::string &name) -> void {
...@@ -71,7 +74,7 @@ auto Parents::Append(const Parent &parent) -> void { ...@@ -71,7 +74,7 @@ auto Parents::Append(const Parent &parent) -> void {
Property::Property() : wrapped(Property::CreateProtoProperty()) {} Property::Property() : wrapped(Property::CreateProtoProperty()) {}
auto Property::CreateProtoProperty() -> ProtoProperty * { auto Property::CreateProtoProperty() -> ProtoProperty * {
return google::protobuf::Arena::CreateMessage<ProtoProperty>(get_arena()); return Arena::CreateMessage<ProtoProperty>(get_arena());
} }
[[nodiscard]] auto Property::GetId() const -> const std::string & { [[nodiscard]] auto Property::GetId() const -> const std::string & {
...@@ -152,23 +155,20 @@ auto Entity::AppendProperty(const Property &property) -> void { ...@@ -152,23 +155,20 @@ auto Entity::AppendProperty(const Property &property) -> void {
} }
auto Entity::CreateProtoEntity() -> ProtoEntity * { auto Entity::CreateProtoEntity() -> ProtoEntity * {
return google::protobuf::Arena::CreateMessage<ProtoEntity>(get_arena()); return Arena::CreateMessage<ProtoEntity>(get_arena());
} }
Entity::Entity() : wrapped(Entity::CreateProtoEntity()) { auto Entity::CreateMessagesField() -> RepeatedPtrField<ProtoMessage> * {
properties.wrapped = this->wrapped->mutable_properties(); return Arena::CreateMessage<RepeatedPtrField<ProtoMessage>>(get_arena());
parents.wrapped = this->wrapped->mutable_parents();
errors.wrapped = this->wrapped->mutable_errors();
warnings.wrapped = this->wrapped->mutable_warnings();
infos.wrapped = this->wrapped->mutable_infos();
} }
Entity::Entity() : Entity(Entity::CreateProtoEntity()) {}
Entity::Entity(IdResponse *idResponse) : Entity() { Entity::Entity(IdResponse *idResponse) : Entity() {
this->wrapped->set_id(idResponse->id()); this->wrapped->set_id(idResponse->id());
this->wrapped->mutable_errors()->Swap(idResponse->mutable_entity_errors()); this->errors.wrapped->Swap(idResponse->mutable_errors());
this->wrapped->mutable_warnings()->Swap( this->warnings.wrapped->Swap(idResponse->mutable_warnings());
idResponse->mutable_entity_warnings()); this->infos.wrapped->Swap(idResponse->mutable_infos());
this->wrapped->mutable_infos()->Swap(idResponse->mutable_entity_infos());
} }
auto Entity::SetId(const std::string &id) -> void { this->wrapped->set_id(id); } auto Entity::SetId(const std::string &id) -> void { this->wrapped->set_id(id); }
...@@ -205,13 +205,13 @@ auto Entity::SetFilePath(const std::string &path) -> void { ...@@ -205,13 +205,13 @@ auto Entity::SetFilePath(const std::string &path) -> void {
this->wrapped->mutable_file_descriptor()->set_path(path); this->wrapped->mutable_file_descriptor()->set_path(path);
} }
auto Entity::SetFileTransmissionId(const std::string &registration_id, // auto Entity::SetFileTransmissionId(const std::string &registration_id,
const std::string &file_id) -> void { // const std::string &file_id) -> void {
this->file_transmission_id = // this->file_transmission_id =
google::protobuf::Arena::CreateMessage<FileTransmissionId>(get_arena()); // Arena::CreateMessage<FileTransmissionId>(get_arena());
this->file_transmission_id->set_registration_id(registration_id); // this->file_transmission_id->set_registration_id(registration_id);
this->file_transmission_id->set_file_id(file_id); // this->file_transmission_id->set_file_id(file_id);
} //}
} // namespace caosdb::entity } // namespace caosdb::entity
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment