Skip to content
Snippets Groups Projects
Verified Commit 5800839e authored by Timm Fitschen's avatar Timm Fitschen
Browse files

WIP:files

parent 0e4aa147
No related branches found
No related tags found
3 merge requests!44Release 0.6,!43Merge f-GRPC-main to dev,!26F grpc f files
caosdb-proto @ ab549416
Subproject commit d78d4c76f5cd08dd418e1ab42183d1172cb0b383 Subproject commit ab5494165946c032325a2d37dec1d563ffc8a959
...@@ -25,7 +25,7 @@ ...@@ -25,7 +25,7 @@
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<groupId>org.caosdb</groupId> <groupId>org.caosdb</groupId>
<artifactId>caosdb-server</artifactId> <artifactId>caosdb-server</artifactId>
<version>0.5.0-GRPC0.0.10</version> <version>0.5.0-GRPC0.0.11</version>
<packaging>jar</packaging> <packaging>jar</packaging>
<name>CaosDB Server</name> <name>CaosDB Server</name>
<scm> <scm>
......
package org.caosdb.server.grpc; package org.caosdb.server.grpc;
import org.caosdb.api.entity.v1alpha1.FileDescriptor; import com.google.protobuf.ByteString;
import org.caosdb.api.entity.v1alpha1.FileDescriptor.Builder; import java.io.FileInputStream;
import org.caosdb.api.entity.v1alpha1.FileDownloadHeader; import java.io.FileNotFoundException;
import org.caosdb.api.entity.v1alpha1.FileIdentifier; import java.io.IOException;
import org.caosdb.server.entity.EntityInterface; import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileChannel.MapMode;
import org.caosdb.api.entity.v1alpha1.FileChunk;
import org.caosdb.api.entity.v1alpha1.FileDownloadResponse;
import org.caosdb.api.entity.v1alpha1.TransmissionStatus;
import org.caosdb.server.entity.FileProperties; import org.caosdb.server.entity.FileProperties;
import org.caosdb.server.entity.Message;
public class DownloadBuffer { public class DownloadBuffer {
private final EntityInterface entity; private final FileProperties file_properties;
private final FileIdentifier fileIdentifier; private FileInputStream fileInputStream;
private FileChannel fileChannel;
public DownloadBuffer(final FileIdentifier fileIdentifier, final EntityInterface entity) { public DownloadBuffer(final FileProperties file_properties) {
this.fileIdentifier = fileIdentifier; this.file_properties = file_properties;
this.entity = entity; this.fileInputStream = null;
} }
public FileProperties getFileProperties() { public FileProperties getFileProperties() {
return entity.getFileProperties(); return file_properties;
} }
public FileDownloadHeader getFileDownloadHeader() { public FileDownloadResponse getNextChunk() throws FileNotFoundException, IOException {
final FileDownloadHeader.Builder builder = FileDownloadHeader.newBuilder(); if (fileChannel == null) {
if (entity.hasMessage(Message.MessageType.Error.toString())) { fileInputStream = new FileInputStream(file_properties.getFile());
builder.addAllErrors( fileChannel = fileInputStream.getChannel();
EntityTransactionServiceImpl.convert(
entity.getMessages(Message.MessageType.Error.toString())));
} }
if (entity.hasMessage(Message.MessageType.Warning.toString())) { final MappedByteBuffer map =
builder.addAllWarnings( fileChannel.map(MapMode.READ_ONLY, fileChannel.position(), getChunkSize());
EntityTransactionServiceImpl.convert(
entity.getMessages(Message.MessageType.Warning.toString()))); final FileChunk.Builder builder = FileChunk.newBuilder();
builder.setData(ByteString.copyFrom(map));
final TransmissionStatus status;
if (fileInputStream.available() > 0) {
status = TransmissionStatus.TRANSMISSION_STATUS_GO_ON;
} else {
status = TransmissionStatus.TRANSMISSION_STATUS_SUCCESS;
cleanUp();
} }
if (entity.hasMessage(Message.MessageType.Info.toString())) { return FileDownloadResponse.newBuilder().setChunk(builder).setStatus(status).build();
builder.addAllInfos( }
EntityTransactionServiceImpl.convert(
entity.getMessages(Message.MessageType.Info.toString()))); public void cleanUp() {
try {
if (fileChannel != null && fileChannel.isOpen()) {
fileChannel.close();
}
if (fileInputStream != null) {
fileInputStream.close();
}
} catch (final IOException e) {
e.printStackTrace();
} }
builder.setId(fileIdentifier);
builder.setFileDescriptor(convert(entity.getFileProperties()));
return builder.build();
} }
private FileDescriptor convert(final FileProperties fileProperties) { private int getChunkSize() {
final Builder builder = FileDescriptor.newBuilder(); // 16kB
builder.setEntityId(entity.getId().toString()); return 16384;
builder.setPath(entity.getFileProperties().getPath());
builder.setSize(entity.getFileProperties().getSize());
// TODO Hash
return builder.build();
} }
} }
...@@ -8,21 +8,27 @@ import java.util.LinkedList; ...@@ -8,21 +8,27 @@ import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.UUID; import java.util.UUID;
import org.apache.shiro.SecurityUtils; import org.apache.shiro.SecurityUtils;
import org.caosdb.api.entity.v1alpha1.DeleteRequest;
import org.caosdb.api.entity.v1alpha1.DeleteResponse;
import org.caosdb.api.entity.v1alpha1.Entity; import org.caosdb.api.entity.v1alpha1.Entity;
import org.caosdb.api.entity.v1alpha1.Entity.Builder; import org.caosdb.api.entity.v1alpha1.Entity.Builder;
import org.caosdb.api.entity.v1alpha1.EntityRequest; import org.caosdb.api.entity.v1alpha1.EntityRequest;
import org.caosdb.api.entity.v1alpha1.EntityResponse;
import org.caosdb.api.entity.v1alpha1.EntityTransactionServiceGrpc.EntityTransactionServiceImplBase; import org.caosdb.api.entity.v1alpha1.EntityTransactionServiceGrpc.EntityTransactionServiceImplBase;
import org.caosdb.api.entity.v1alpha1.FileDescriptor; import org.caosdb.api.entity.v1alpha1.FileDescriptor;
import org.caosdb.api.entity.v1alpha1.IdResponse; import org.caosdb.api.entity.v1alpha1.IdResponse;
import org.caosdb.api.entity.v1alpha1.InsertRequest;
import org.caosdb.api.entity.v1alpha1.InsertResponse;
import org.caosdb.api.entity.v1alpha1.MessageCode; import org.caosdb.api.entity.v1alpha1.MessageCode;
import org.caosdb.api.entity.v1alpha1.MultiTransactionRequest; import org.caosdb.api.entity.v1alpha1.MultiTransactionRequest;
import org.caosdb.api.entity.v1alpha1.MultiTransactionResponse; import org.caosdb.api.entity.v1alpha1.MultiTransactionResponse;
import org.caosdb.api.entity.v1alpha1.Parent; import org.caosdb.api.entity.v1alpha1.Parent;
import org.caosdb.api.entity.v1alpha1.QueryOrIdRequest;
import org.caosdb.api.entity.v1alpha1.RetrieveResponse; import org.caosdb.api.entity.v1alpha1.RetrieveResponse;
import org.caosdb.api.entity.v1alpha1.TransactionRequest; import org.caosdb.api.entity.v1alpha1.TransactionRequest;
import org.caosdb.api.entity.v1alpha1.TransactionRequest.WrappedRequestsCase; import org.caosdb.api.entity.v1alpha1.TransactionRequest.WrappedRequestsCase;
import org.caosdb.api.entity.v1alpha1.TransactionResponse; import org.caosdb.api.entity.v1alpha1.TransactionResponse;
import org.caosdb.api.entity.v1alpha1.UpdateRequest;
import org.caosdb.api.entity.v1alpha1.UpdateResponse;
import org.caosdb.api.entity.v1alpha1.Version; import org.caosdb.api.entity.v1alpha1.Version;
import org.caosdb.server.datatype.GenericValue; import org.caosdb.server.datatype.GenericValue;
import org.caosdb.server.datatype.Value; import org.caosdb.server.datatype.Value;
...@@ -64,39 +70,44 @@ public class EntityTransactionServiceImpl extends EntityTransactionServiceImplBa ...@@ -64,39 +70,44 @@ public class EntityTransactionServiceImpl extends EntityTransactionServiceImplBa
return null; return null;
} }
public Entity convert(final EntityInterface from) { public EntityResponse.Builder convert(final EntityInterface from) {
final Builder builder = Entity.newBuilder();
final Builder entityBuilder = Entity.newBuilder();
if (from.getRole() != null) { if (from.getRole() != null) {
builder.setRole(from.getRole().toString()); entityBuilder.setRole(from.getRole().toString());
} }
if (from.hasId()) { if (from.hasId()) {
builder.setId(from.getId().toString()); entityBuilder.setId(from.getId().toString());
} }
if (from.hasVersion()) { if (from.hasVersion()) {
builder.setVersion(convert(from.getVersion())); entityBuilder.setVersion(convert(from.getVersion()));
} }
if (from.hasName()) { if (from.hasName()) {
builder.setName(from.getName()); entityBuilder.setName(from.getName());
} }
if (from.hasDescription()) { if (from.hasDescription()) {
builder.setDescription(from.getDescription()); entityBuilder.setDescription(from.getDescription());
} }
if (from.hasDatatype()) { if (from.hasDatatype()) {
builder.setDatatype(from.getDatatype().getName()); entityBuilder.setDatatype(from.getDatatype().getName());
} }
if (from.hasUnit()) { if (from.hasUnit()) {
builder.setUnit(getStringUnit(from)); entityBuilder.setUnit(getStringUnit(from));
} }
if (from.hasProperties()) { if (from.hasProperties()) {
builder.addAllProperties(convert(from.getProperties())); entityBuilder.addAllProperties(convert(from.getProperties()));
} }
if (from.hasParents()) { if (from.hasParents()) {
builder.addAllParents(convert(from.getParents())); entityBuilder.addAllParents(convert(from.getParents()));
} }
appendMessages(from, builder);
return builder.build(); final EntityResponse.Builder responseBuilder = EntityResponse.newBuilder();
responseBuilder.setEntity(entityBuilder);
appendMessages(from, responseBuilder);
return responseBuilder;
} }
public static Iterable<? extends org.caosdb.api.entity.v1alpha1.Message> convert( public static Iterable<? extends org.caosdb.api.entity.v1alpha1.Message> convert(
...@@ -223,17 +234,25 @@ public class EntityTransactionServiceImpl extends EntityTransactionServiceImplBa ...@@ -223,17 +234,25 @@ public class EntityTransactionServiceImpl extends EntityTransactionServiceImplBa
final RetrieveContainer container = final RetrieveContainer container =
new RetrieveContainer( new RetrieveContainer(
SecurityUtils.getSubject(), getTimestamp(), getSRID(), new HashMap<>()); SecurityUtils.getSubject(), getTimestamp(), getSRID(), new HashMap<>());
FileDownload file_download = null;
for (final TransactionRequest sub_request : request.getRequestsList()) { for (final TransactionRequest sub_request : request.getRequestsList()) {
final boolean fileDownload = sub_request.getRetrieveRequest().getRegisterFileDownload();
if (sub_request.getRetrieveRequest().hasQuery() if (sub_request.getRetrieveRequest().hasQuery()
&& !sub_request.getRetrieveRequest().getQuery().getQuery().isBlank()) { && !sub_request.getRetrieveRequest().getQuery().getQuery().isBlank()) {
final String query = sub_request.getRetrieveRequest().getQuery().getQuery(); final String query = sub_request.getRetrieveRequest().getQuery().getQuery();
container.getFlags().put("query", query); container.getFlags().put("query", query);
if (fileDownload) {
container.getFlags().put("download_files", "true");
}
} else { } else {
final String id = sub_request.getRetrieveRequest().getId(); final String id = sub_request.getRetrieveRequest().getId();
if (!id.isBlank()) { if (!id.isBlank()) {
try { try {
final RetrieveEntity entity = new RetrieveEntity(getId(id)); final RetrieveEntity entity = new RetrieveEntity(getId(id));
if (fileDownload) {
entity.setFlag("download_files", "true");
}
container.add(entity); container.add(entity);
} catch (final NumberFormatException e) { } catch (final NumberFormatException e) {
// We handle this after the retrieval // We handle this after the retrieval
...@@ -250,10 +269,21 @@ public class EntityTransactionServiceImpl extends EntityTransactionServiceImplBa ...@@ -250,10 +269,21 @@ public class EntityTransactionServiceImpl extends EntityTransactionServiceImplBa
.addResponsesBuilder() .addResponsesBuilder()
.setRetrieveResponse(RetrieveResponse.newBuilder().setCountResult(count)); .setRetrieveResponse(RetrieveResponse.newBuilder().setCountResult(count));
} else { } else {
final boolean download_files_container = container.getFlags().containsKey("download_files");
for (final EntityInterface entity : container) { for (final EntityInterface entity : container) {
final EntityResponse.Builder entityResponse = convert(entity);
if ((download_files_container || entity.getFlags().containsKey("download_files"))
&& entity.hasFileProperties()) {
if (file_download == null) {
file_download = fileTransmissionService.registerFileDownload(null);
}
entityResponse.setDownloadId(
fileTransmissionService.registerFileDownload(
file_download.getId(), entity.getFileProperties()));
}
builder builder
.addResponsesBuilder() .addResponsesBuilder()
.setRetrieveResponse(RetrieveResponse.newBuilder().setEntity(convert(entity))); .setRetrieveResponse(RetrieveResponse.newBuilder().setEntityResponse(entityResponse));
} }
} }
...@@ -270,17 +300,17 @@ public class EntityTransactionServiceImpl extends EntityTransactionServiceImplBa ...@@ -270,17 +300,17 @@ public class EntityTransactionServiceImpl extends EntityTransactionServiceImplBa
builder.addResponses( builder.addResponses(
TransactionResponse.newBuilder() TransactionResponse.newBuilder()
.setRetrieveResponse( .setRetrieveResponse(
RetrieveResponse.newBuilder().setEntity(entityDoesNotExist(id)))); RetrieveResponse.newBuilder().setEntityResponse(entityDoesNotExist(id))));
} }
} }
} }
return builder.build(); return builder.build();
} }
private Entity entityDoesNotExist(final String id) { private EntityResponse entityDoesNotExist(final String id) {
return Entity.newBuilder() return EntityResponse.newBuilder()
.addErrors(convert(ServerMessages.ENTITY_DOES_NOT_EXIST)) .addErrors(convert(ServerMessages.ENTITY_DOES_NOT_EXIST))
.setId(id) .setEntity(Entity.newBuilder().setId(id))
.build(); .build();
} }
...@@ -331,7 +361,7 @@ public class EntityTransactionServiceImpl extends EntityTransactionServiceImplBa ...@@ -331,7 +361,7 @@ public class EntityTransactionServiceImpl extends EntityTransactionServiceImplBa
} }
} }
private IdResponse delete(final QueryOrIdRequest deleteRequest) throws Exception { private DeleteResponse delete(final DeleteRequest deleteRequest) throws Exception {
final String id = deleteRequest.getId(); final String id = deleteRequest.getId();
final org.caosdb.api.entity.v1alpha1.IdResponse.Builder builder = IdResponse.newBuilder(); final org.caosdb.api.entity.v1alpha1.IdResponse.Builder builder = IdResponse.newBuilder();
...@@ -349,36 +379,44 @@ public class EntityTransactionServiceImpl extends EntityTransactionServiceImplBa ...@@ -349,36 +379,44 @@ public class EntityTransactionServiceImpl extends EntityTransactionServiceImplBa
} catch (final NumberFormatException e) { } catch (final NumberFormatException e) {
// ID wasn't an integer - the server doesn't support string id's yet, so that entity cannot // ID wasn't an integer - the server doesn't support string id's yet, so that entity cannot
// exist. // exist.
builder.addEntityErrors(convert(ServerMessages.ENTITY_DOES_NOT_EXIST)); builder.addErrors(convert(ServerMessages.ENTITY_DOES_NOT_EXIST));
} }
return builder.build(); return DeleteResponse.newBuilder().setIdResponse(builder).build();
} }
private IdResponse insert(final EntityRequest entityRequest) throws Exception { private InsertResponse insert(final InsertRequest insertRequest) throws Exception {
final EntityRequest entityRequest = insertRequest.getEntityRequest();
final Entity insertEntity = entityRequest.getEntity(); final Entity insertEntity = entityRequest.getEntity();
final InsertEntity entity = final InsertEntity entity =
new InsertEntity( new InsertEntity(
insertEntity.getName().isEmpty() ? null : insertEntity.getName(), insertEntity.getName().isEmpty() ? null : insertEntity.getName(),
Role.parse(insertEntity.getRole())); Role.parse(insertEntity.getRole()));
return write(convert(insertEntity, entity), entityRequest); return InsertResponse.newBuilder()
.setIdResponse(write(convert(insertEntity, entity), entityRequest))
.build();
} }
private IdResponse update(final EntityRequest entityRequest) throws Exception { private UpdateResponse update(final UpdateRequest updateRequest) throws Exception {
final EntityRequest entityRequest = updateRequest.getEntityRequest();
final Entity updateEntity = entityRequest.getEntity(); final Entity updateEntity = entityRequest.getEntity();
final String id = updateEntity.getId(); final String id = updateEntity.getId();
try { try {
final UpdateEntity entity = new UpdateEntity(getId(id), Role.parse(updateEntity.getRole())); final UpdateEntity entity = new UpdateEntity(getId(id), Role.parse(updateEntity.getRole()));
entity.setName(updateEntity.getName().isEmpty() ? null : updateEntity.getName()); entity.setName(updateEntity.getName().isEmpty() ? null : updateEntity.getName());
return write(convert(updateEntity, entity), entityRequest); return UpdateResponse.newBuilder()
.setIdResponse(write(convert(updateEntity, entity), entityRequest))
.build();
} catch (final NumberFormatException e) { } catch (final NumberFormatException e) {
// ID wasn't an integer - the server doesn't support string id's yet, so that entity cannot // ID wasn't an integer - the server doesn't support string id's yet, so that entity cannot
// exist. // exist.
return IdResponse.newBuilder() return UpdateResponse.newBuilder()
.setId(id) .setIdResponse(
.addEntityErrors(convert(ServerMessages.ENTITY_DOES_NOT_EXIST)) IdResponse.newBuilder()
.setId(id)
.addErrors(convert(ServerMessages.ENTITY_DOES_NOT_EXIST)))
.build(); .build();
} }
} }
...@@ -407,7 +445,8 @@ public class EntityTransactionServiceImpl extends EntityTransactionServiceImplBa ...@@ -407,7 +445,8 @@ public class EntityTransactionServiceImpl extends EntityTransactionServiceImplBa
} }
private void appendMessages( private void appendMessages(
final EntityInterface from, final org.caosdb.api.entity.v1alpha1.Entity.Builder builder) { final EntityInterface from,
final org.caosdb.api.entity.v1alpha1.EntityResponse.Builder builder) {
if (from.hasMessage(Message.MessageType.Error.toString())) { if (from.hasMessage(Message.MessageType.Error.toString())) {
builder.addAllErrors(convert(from.getMessages(Message.MessageType.Error.toString()))); builder.addAllErrors(convert(from.getMessages(Message.MessageType.Error.toString())));
} }
...@@ -422,14 +461,13 @@ public class EntityTransactionServiceImpl extends EntityTransactionServiceImplBa ...@@ -422,14 +461,13 @@ public class EntityTransactionServiceImpl extends EntityTransactionServiceImplBa
private void appendMessages( private void appendMessages(
final EntityInterface from, final org.caosdb.api.entity.v1alpha1.IdResponse.Builder builder) { final EntityInterface from, final org.caosdb.api.entity.v1alpha1.IdResponse.Builder builder) {
if (from.hasMessage(Message.MessageType.Error.toString())) { if (from.hasMessage(Message.MessageType.Error.toString())) {
builder.addAllEntityErrors(convert(from.getMessages(Message.MessageType.Error.toString()))); builder.addAllErrors(convert(from.getMessages(Message.MessageType.Error.toString())));
} }
if (from.hasMessage(Message.MessageType.Warning.toString())) { if (from.hasMessage(Message.MessageType.Warning.toString())) {
builder.addAllEntityWarnings( builder.addAllWarnings(convert(from.getMessages(Message.MessageType.Warning.toString())));
convert(from.getMessages(Message.MessageType.Warning.toString())));
} }
if (from.hasMessage(Message.MessageType.Info.toString())) { if (from.hasMessage(Message.MessageType.Info.toString())) {
builder.addAllEntityInfos(convert(from.getMessages(Message.MessageType.Info.toString()))); builder.addAllInfos(convert(from.getMessages(Message.MessageType.Info.toString())));
} }
} }
......
package org.caosdb.server.grpc; package org.caosdb.server.grpc;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.HashMap; import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.shiro.SecurityUtils; import org.caosdb.api.entity.v1alpha1.FileDownloadResponse;
import org.caosdb.api.entity.v1alpha1.FileDownloadHeader; import org.caosdb.api.entity.v1alpha1.FileTransmissionSettings;
import org.caosdb.api.entity.v1alpha1.FileIdentifier;
import org.caosdb.server.entity.Entity;
import org.caosdb.server.entity.FileProperties; import org.caosdb.server.entity.FileProperties;
import org.caosdb.server.entity.RetrieveEntity; import org.caosdb.server.utils.Utils;
import org.caosdb.server.entity.container.RetrieveContainer;
import org.caosdb.server.entity.container.TransactionContainer;
import org.caosdb.server.transaction.Retrieve;
import org.caosdb.server.transaction.RetrieveSparseEntityByPath;
import org.caosdb.server.transaction.Transaction;
public class FileDownload extends FileTransmission { public class FileDownload extends FileTransmission {
Map<String, DownloadBuffer> buffers = new HashMap<>(); Map<String, DownloadBuffer> buffers = new HashMap<>();
private final FileTransmissionSettings settings;
public FileDownload(final String id, final List<FileIdentifier> filesList) throws Exception { public FileDownload(final FileTransmissionSettings settings, final String id) throws Exception {
super(id); super(id);
final TransactionContainer byPathContainer = new TransactionContainer(); this.settings = settings;
final RetrieveContainer byIdContainer =
new RetrieveContainer(SecurityUtils.getSubject(), createdTimestamp, id, new HashMap<>());
for (final FileIdentifier fileId : filesList) {
final String entity_id = fileId.getEntityId();
final String path = fileId.getPath();
if (!entity_id.isBlank()) {
final Entity e = new RetrieveEntity(entity_id);
addBufferById(fileId, entity_id, e);
byIdContainer.add(e);
} else if (!path.isBlank()) {
final Entity e = new RetrieveEntity(0);
addBufferByPath(fileId, path, e);
final FileProperties fp = new FileProperties(null, path, null);
e.setFileProperties(fp);
byPathContainer.add(e);
}
}
// TODO wrap into one transaction
if (!byPathContainer.isEmpty()) {
final Transaction<?> tByPath = new RetrieveSparseEntityByPath(byPathContainer);
tByPath.execute();
}
if (!byIdContainer.isEmpty()) {
final Transaction<?> tById = new Retrieve(byIdContainer);
tById.execute();
}
} }
private void addBufferByPath( @Override
final FileIdentifier fileIdentifier, final String path, final Entity entity) { public void cleanUp() {
buffers.put("path:" + path, new DownloadBuffer(fileIdentifier, entity)); buffers.forEach(
(id, buffer) -> {
buffer.cleanUp();
});
} }
private void addBufferById( @Override
final FileIdentifier fileIdentifier, final String entity_id, final Entity entity) { public FileProperties getFile(final String fileId) {
buffers.put("entity_id:" + entity_id, new DownloadBuffer(fileIdentifier, entity)); return buffers.get(fileId).getFileProperties();
} }
@Override @Override
public void cleanUp() {} public FileTransmissionSettings getTransmissionSettings() {
return settings;
}
@Override public String append(final FileProperties fp) {
public FileProperties getFile(final String fileId) { final String id = Utils.getUID();
return buffers.get(fileId).getFileProperties(); buffers.put(id, new DownloadBuffer(fp));
return id;
} }
public Iterable<? extends FileDownloadHeader> getFileHeaders() { public FileDownloadResponse getNextChunk(final String fileId)
final List<FileDownloadHeader> result = new LinkedList<>(); throws FileNotFoundException, IOException {
buffers.forEach( return buffers.get(fileId).getNextChunk();
(file_id, buffer) -> {
result.add(buffer.getFileDownloadHeader());
});
return result;
} }
} }
package org.caosdb.server.grpc; package org.caosdb.server.grpc;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import org.caosdb.api.entity.v1alpha1.FileDownloadResponse; import org.caosdb.api.entity.v1alpha1.FileDownloadResponse;
import org.caosdb.api.entity.v1alpha1.FileIdentifier;
import org.caosdb.api.entity.v1alpha1.FileTransmissionId; import org.caosdb.api.entity.v1alpha1.FileTransmissionId;
import org.caosdb.api.entity.v1alpha1.FileTransmissionSettings;
import org.caosdb.server.entity.FileProperties;
import org.caosdb.server.utils.Utils; import org.caosdb.server.utils.Utils;
public class FileDownloadRegistration { public class FileDownloadRegistration {
private final Map<String, FileDownload> registeredDownloads = new HashMap<>(); private final Map<String, FileDownload> registeredDownloads = new HashMap<>();
public FileDownload registerFileDownload( public FileTransmissionId registerFileDownload(
final List<FileIdentifier> filesList, final long maxChunkSize, final long maxFileSize) final String registration_id, final FileProperties fp) {
final String file_id = registeredDownloads.get(registration_id).append(fp);
return FileTransmissionId.newBuilder()
.setRegistrationId(registration_id)
.setFileId(file_id)
.build();
}
public FileDownload registerFileDownload(final FileTransmissionSettings settings)
throws Exception { throws Exception {
final FileDownload result = new FileDownload(Utils.getUID(), filesList); final FileDownload result = new FileDownload(settings, Utils.getUID());
register(result); register(result);
return result; return result;
} }
...@@ -24,7 +34,10 @@ public class FileDownloadRegistration { ...@@ -24,7 +34,10 @@ public class FileDownloadRegistration {
registeredDownloads.put(fileTransmission.getId(), fileTransmission); registeredDownloads.put(fileTransmission.getId(), fileTransmission);
} }
public FileDownloadResponse downloadNextChunk(final FileTransmissionId fileTransmissionId) { public FileDownloadResponse downloadNextChunk(final FileTransmissionId fileTransmissionId)
return null; throws FileNotFoundException, IOException {
return registeredDownloads
.get(fileTransmissionId.getRegistrationId())
.getNextChunk(fileTransmissionId.getFileId());
} }
} }
package org.caosdb.server.grpc; package org.caosdb.server.grpc;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import org.caosdb.api.entity.v1alpha1.FileTransmissionSettings;
import org.caosdb.api.entity.v1alpha1.RegistrationStatus; import org.caosdb.api.entity.v1alpha1.RegistrationStatus;
import org.caosdb.server.entity.FileProperties; import org.caosdb.server.entity.FileProperties;
...@@ -52,4 +53,6 @@ public abstract class FileTransmission { ...@@ -52,4 +53,6 @@ public abstract class FileTransmission {
} }
public abstract FileProperties getFile(final String fileId); public abstract FileProperties getFile(final String fileId);
public abstract FileTransmissionSettings getTransmissionSettings();
} }
package org.caosdb.server.grpc; package org.caosdb.server.grpc;
import io.grpc.stub.StreamObserver; import io.grpc.stub.StreamObserver;
import java.io.IOException;
import org.caosdb.api.entity.v1alpha1.FileChunk; import org.caosdb.api.entity.v1alpha1.FileChunk;
import org.caosdb.api.entity.v1alpha1.FileDownloadRequest; import org.caosdb.api.entity.v1alpha1.FileDownloadRequest;
import org.caosdb.api.entity.v1alpha1.FileDownloadResponse; import org.caosdb.api.entity.v1alpha1.FileDownloadResponse;
import org.caosdb.api.entity.v1alpha1.FileTransmissionId; import org.caosdb.api.entity.v1alpha1.FileTransmissionId;
import org.caosdb.api.entity.v1alpha1.FileTransmissionServiceGrpc.FileTransmissionServiceImplBase; import org.caosdb.api.entity.v1alpha1.FileTransmissionServiceGrpc.FileTransmissionServiceImplBase;
import org.caosdb.api.entity.v1alpha1.FileTransmissionSettings;
import org.caosdb.api.entity.v1alpha1.FileUploadRequest; import org.caosdb.api.entity.v1alpha1.FileUploadRequest;
import org.caosdb.api.entity.v1alpha1.FileUploadResponse; import org.caosdb.api.entity.v1alpha1.FileUploadResponse;
import org.caosdb.api.entity.v1alpha1.RegisterFileDownloadRequest;
import org.caosdb.api.entity.v1alpha1.RegisterFileDownloadResponse;
import org.caosdb.api.entity.v1alpha1.RegisterFileUploadRequest; import org.caosdb.api.entity.v1alpha1.RegisterFileUploadRequest;
import org.caosdb.api.entity.v1alpha1.RegisterFileUploadResponse; import org.caosdb.api.entity.v1alpha1.RegisterFileUploadResponse;
import org.caosdb.api.entity.v1alpha1.RegisterFileUploadResponse.Builder; import org.caosdb.api.entity.v1alpha1.RegisterFileUploadResponse.Builder;
...@@ -19,9 +19,59 @@ import org.caosdb.server.entity.FileProperties; ...@@ -19,9 +19,59 @@ import org.caosdb.server.entity.FileProperties;
public class FileTransmissionServiceImpl extends FileTransmissionServiceImplBase { public class FileTransmissionServiceImpl extends FileTransmissionServiceImplBase {
public class FileUploadStreamObserver implements StreamObserver<FileUploadRequest> {
private FileUpload fileUpload = null;
private FileTransmissionId fileTransmissionId = null;
private final StreamObserver<FileUploadResponse> outputStreamObserver;
private TransmissionStatus status;
public FileUploadStreamObserver(final StreamObserver<FileUploadResponse> outputStreamObserver) {
this.outputStreamObserver = outputStreamObserver;
}
@Override
public void onError(final Throwable throwable) {}
@Override
public void onCompleted() {
fileUpload.cleanUp();
final FileUploadResponse response = FileUploadResponse.newBuilder().setStatus(status).build();
outputStreamObserver.onNext(response);
outputStreamObserver.onCompleted();
}
@Override
public void onNext(final FileUploadRequest request) {
final FileChunk chunk = request.getChunk();
if (chunk.hasFileTransmissionId()) {
fileUpload =
fileUploadRegistration.getFileUpload(chunk.getFileTransmissionId().getRegistrationId());
fileTransmissionId = chunk.getFileTransmissionId();
} else {
try {
status = fileUpload.uploadChunk(fileTransmissionId.getFileId(), chunk);
} catch (final IOException e) {
status = TransmissionStatus.TRANSMISSION_STATUS_ERROR;
e.printStackTrace();
}
}
}
}
FileUploadRegistration fileUploadRegistration = new FileUploadRegistration(); FileUploadRegistration fileUploadRegistration = new FileUploadRegistration();
FileDownloadRegistration fileDownloadRegistration = new FileDownloadRegistration(); FileDownloadRegistration fileDownloadRegistration = new FileDownloadRegistration();
FileTransmissionId registerFileDownload(final String registration_id, final FileProperties fp)
throws Exception {
return fileDownloadRegistration.registerFileDownload(registration_id, fp);
}
FileDownload registerFileDownload(final FileTransmissionSettings settings) throws Exception {
return fileDownloadRegistration.registerFileDownload(settings);
}
public FileProperties getUploadFile(final FileTransmissionId uploadId) { public FileProperties getUploadFile(final FileTransmissionId uploadId) {
return fileUploadRegistration.getUploadFile(uploadId); return fileUploadRegistration.getUploadFile(uploadId);
} }
...@@ -36,8 +86,7 @@ public class FileTransmissionServiceImpl extends FileTransmissionServiceImplBase ...@@ -36,8 +86,7 @@ public class FileTransmissionServiceImpl extends FileTransmissionServiceImplBase
builder.setStatus(result.getRegistrationStatus()); builder.setStatus(result.getRegistrationStatus());
if (result.getRegistrationStatus() == RegistrationStatus.REGISTRATION_STATUS_ACCEPTED) { if (result.getRegistrationStatus() == RegistrationStatus.REGISTRATION_STATUS_ACCEPTED) {
builder.setRegistrationId(result.getId()); builder.setRegistrationId(result.getId());
builder.setMaxChunkSize(result.getMaxChunkSize()); builder.setUploadSettings(result.getTransmissionSettings());
builder.setMaxFileSize(result.getMaxFileSize());
} }
final RegisterFileUploadResponse response = builder.build(); final RegisterFileUploadResponse response = builder.build();
...@@ -51,45 +100,9 @@ public class FileTransmissionServiceImpl extends FileTransmissionServiceImplBase ...@@ -51,45 +100,9 @@ public class FileTransmissionServiceImpl extends FileTransmissionServiceImplBase
} }
@Override @Override
public void fileUpload( public StreamObserver<FileUploadRequest> fileUpload(
final FileUploadRequest request, final StreamObserver<FileUploadResponse> responseObserver) { final StreamObserver<FileUploadResponse> responseObserver) {
try { return new FileUploadStreamObserver(responseObserver);
final FileChunk chunk = request.getChunk();
final TransmissionStatus status = fileUploadRegistration.uploadChunk(chunk);
final FileUploadResponse response = FileUploadResponse.newBuilder().setStatus(status).build();
responseObserver.onNext(response);
responseObserver.onCompleted();
} catch (final Exception e) {
e.printStackTrace();
responseObserver.onError(e);
}
}
@Override
public void registerFileDownload(
final RegisterFileDownloadRequest request,
final StreamObserver<RegisterFileDownloadResponse> responseObserver) {
try {
final FileDownload result =
fileDownloadRegistration.registerFileDownload(
request.getFilesList(), request.getMaxChunkSize(), request.getMaxFileSize());
final RegisterFileDownloadResponse.Builder builder =
RegisterFileDownloadResponse.newBuilder();
builder.setStatus(result.getRegistrationStatus());
if (result.getRegistrationStatus() == RegistrationStatus.REGISTRATION_STATUS_ACCEPTED) {
builder.setRegistrationId(result.getId());
builder.addAllFiles(result.getFileHeaders());
}
final RegisterFileDownloadResponse response = builder.build();
responseObserver.onNext(response);
responseObserver.onCompleted();
} catch (final Exception e) {
e.printStackTrace();
responseObserver.onError(e);
}
} }
@Override @Override
......
...@@ -5,6 +5,9 @@ import java.io.File; ...@@ -5,6 +5,9 @@ import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import org.caosdb.api.entity.v1alpha1.FileChunk;
import org.caosdb.api.entity.v1alpha1.FileTransmissionSettings;
import org.caosdb.api.entity.v1alpha1.FileTransmissionSettings.Builder;
import org.caosdb.api.entity.v1alpha1.TransmissionStatus; import org.caosdb.api.entity.v1alpha1.TransmissionStatus;
import org.caosdb.server.FileSystem; import org.caosdb.server.FileSystem;
import org.caosdb.server.entity.FileProperties; import org.caosdb.server.entity.FileProperties;
...@@ -57,4 +60,17 @@ public class FileUpload extends FileTransmission { ...@@ -57,4 +60,17 @@ public class FileUpload extends FileTransmission {
return null; return null;
} }
} }
@Override
public FileTransmissionSettings getTransmissionSettings() {
final Builder builder = FileTransmissionSettings.newBuilder();
builder.setMaxChunkSize(getMaxChunkSize());
builder.setMaxFileSize(getMaxFileSize());
return builder.build();
}
public TransmissionStatus uploadChunk(final String fileId, final FileChunk chunk)
throws IOException {
return upload(fileId, chunk.getData());
}
} }
package org.caosdb.server.grpc; package org.caosdb.server.grpc;
import java.io.IOException;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import org.caosdb.api.entity.v1alpha1.FileChunk;
import org.caosdb.api.entity.v1alpha1.FileTransmissionId; import org.caosdb.api.entity.v1alpha1.FileTransmissionId;
import org.caosdb.api.entity.v1alpha1.TransmissionStatus;
import org.caosdb.server.entity.FileProperties; import org.caosdb.server.entity.FileProperties;
import org.caosdb.server.utils.Utils; import org.caosdb.server.utils.Utils;
...@@ -31,11 +28,7 @@ public class FileUploadRegistration { ...@@ -31,11 +28,7 @@ public class FileUploadRegistration {
return fileTransmission.getFile(fileId); return fileTransmission.getFile(fileId);
} }
public TransmissionStatus uploadChunk(final FileChunk chunk) throws IOException { public FileUpload getFileUpload(final String registrationId) {
final String fileId = chunk.getFileTransmissionId().getFileId(); return registeredUploads.get(registrationId);
final String registrationId = chunk.getFileTransmissionId().getRegistrationId();
final FileUpload fileTransmission = registeredUploads.get(registrationId);
return fileTransmission.upload(fileId, chunk.getData());
} }
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment