From 5800839e2a0c16d8201da507ebb50fbf589f0cce Mon Sep 17 00:00:00 2001 From: Timm Fitschen <t.fitschen@indiscale.com> Date: Mon, 9 Aug 2021 23:00:47 +0200 Subject: [PATCH] WIP:files --- caosdb-proto | 2 +- pom.xml | 2 +- .../caosdb/server/grpc/DownloadBuffer.java | 85 ++++++++------ .../grpc/EntityTransactionServiceImpl.java | 106 ++++++++++++------ .../org/caosdb/server/grpc/FileDownload.java | 85 +++++--------- .../server/grpc/FileDownloadRegistration.java | 27 +++-- .../caosdb/server/grpc/FileTransmission.java | 3 + .../grpc/FileTransmissionServiceImpl.java | 99 +++++++++------- .../org/caosdb/server/grpc/FileUpload.java | 16 +++ .../server/grpc/FileUploadRegistration.java | 11 +- 10 files changed, 247 insertions(+), 189 deletions(-) diff --git a/caosdb-proto b/caosdb-proto index d78d4c76..ab549416 160000 --- a/caosdb-proto +++ b/caosdb-proto @@ -1 +1 @@ -Subproject commit d78d4c76f5cd08dd418e1ab42183d1172cb0b383 +Subproject commit ab5494165946c032325a2d37dec1d563ffc8a959 diff --git a/pom.xml b/pom.xml index 1396ba39..d83023c3 100644 --- a/pom.xml +++ b/pom.xml @@ -25,7 +25,7 @@ <modelVersion>4.0.0</modelVersion> <groupId>org.caosdb</groupId> <artifactId>caosdb-server</artifactId> - <version>0.5.0-GRPC0.0.10</version> + <version>0.5.0-GRPC0.0.11</version> <packaging>jar</packaging> <name>CaosDB Server</name> <scm> diff --git a/src/main/java/org/caosdb/server/grpc/DownloadBuffer.java b/src/main/java/org/caosdb/server/grpc/DownloadBuffer.java index dded9903..11deea8e 100644 --- a/src/main/java/org/caosdb/server/grpc/DownloadBuffer.java +++ b/src/main/java/org/caosdb/server/grpc/DownloadBuffer.java @@ -1,55 +1,68 @@ package org.caosdb.server.grpc; -import org.caosdb.api.entity.v1alpha1.FileDescriptor; -import org.caosdb.api.entity.v1alpha1.FileDescriptor.Builder; -import org.caosdb.api.entity.v1alpha1.FileDownloadHeader; -import org.caosdb.api.entity.v1alpha1.FileIdentifier; -import org.caosdb.server.entity.EntityInterface; +import com.google.protobuf.ByteString; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.channels.FileChannel.MapMode; +import org.caosdb.api.entity.v1alpha1.FileChunk; +import org.caosdb.api.entity.v1alpha1.FileDownloadResponse; +import org.caosdb.api.entity.v1alpha1.TransmissionStatus; import org.caosdb.server.entity.FileProperties; -import org.caosdb.server.entity.Message; public class DownloadBuffer { - private final EntityInterface entity; - private final FileIdentifier fileIdentifier; + private final FileProperties file_properties; + private FileInputStream fileInputStream; + private FileChannel fileChannel; - public DownloadBuffer(final FileIdentifier fileIdentifier, final EntityInterface entity) { - this.fileIdentifier = fileIdentifier; - this.entity = entity; + public DownloadBuffer(final FileProperties file_properties) { + this.file_properties = file_properties; + this.fileInputStream = null; } public FileProperties getFileProperties() { - return entity.getFileProperties(); + return file_properties; } - public FileDownloadHeader getFileDownloadHeader() { - final FileDownloadHeader.Builder builder = FileDownloadHeader.newBuilder(); - if (entity.hasMessage(Message.MessageType.Error.toString())) { - builder.addAllErrors( - EntityTransactionServiceImpl.convert( - entity.getMessages(Message.MessageType.Error.toString()))); + public FileDownloadResponse getNextChunk() throws FileNotFoundException, IOException { + if (fileChannel == null) { + fileInputStream = new FileInputStream(file_properties.getFile()); + fileChannel = fileInputStream.getChannel(); } - if (entity.hasMessage(Message.MessageType.Warning.toString())) { - builder.addAllWarnings( - EntityTransactionServiceImpl.convert( - entity.getMessages(Message.MessageType.Warning.toString()))); + final MappedByteBuffer map = + fileChannel.map(MapMode.READ_ONLY, fileChannel.position(), getChunkSize()); + + final FileChunk.Builder builder = FileChunk.newBuilder(); + builder.setData(ByteString.copyFrom(map)); + + final TransmissionStatus status; + if (fileInputStream.available() > 0) { + status = TransmissionStatus.TRANSMISSION_STATUS_GO_ON; + } else { + status = TransmissionStatus.TRANSMISSION_STATUS_SUCCESS; + cleanUp(); } - if (entity.hasMessage(Message.MessageType.Info.toString())) { - builder.addAllInfos( - EntityTransactionServiceImpl.convert( - entity.getMessages(Message.MessageType.Info.toString()))); + return FileDownloadResponse.newBuilder().setChunk(builder).setStatus(status).build(); + } + + public void cleanUp() { + try { + if (fileChannel != null && fileChannel.isOpen()) { + fileChannel.close(); + } + if (fileInputStream != null) { + fileInputStream.close(); + } + } catch (final IOException e) { + e.printStackTrace(); } - builder.setId(fileIdentifier); - builder.setFileDescriptor(convert(entity.getFileProperties())); - return builder.build(); } - private FileDescriptor convert(final FileProperties fileProperties) { - final Builder builder = FileDescriptor.newBuilder(); - builder.setEntityId(entity.getId().toString()); - builder.setPath(entity.getFileProperties().getPath()); - builder.setSize(entity.getFileProperties().getSize()); - // TODO Hash - return builder.build(); + private int getChunkSize() { + // 16kB + return 16384; } } diff --git a/src/main/java/org/caosdb/server/grpc/EntityTransactionServiceImpl.java b/src/main/java/org/caosdb/server/grpc/EntityTransactionServiceImpl.java index 4a334ca1..e374cc44 100644 --- a/src/main/java/org/caosdb/server/grpc/EntityTransactionServiceImpl.java +++ b/src/main/java/org/caosdb/server/grpc/EntityTransactionServiceImpl.java @@ -8,21 +8,27 @@ import java.util.LinkedList; import java.util.List; import java.util.UUID; import org.apache.shiro.SecurityUtils; +import org.caosdb.api.entity.v1alpha1.DeleteRequest; +import org.caosdb.api.entity.v1alpha1.DeleteResponse; import org.caosdb.api.entity.v1alpha1.Entity; import org.caosdb.api.entity.v1alpha1.Entity.Builder; import org.caosdb.api.entity.v1alpha1.EntityRequest; +import org.caosdb.api.entity.v1alpha1.EntityResponse; import org.caosdb.api.entity.v1alpha1.EntityTransactionServiceGrpc.EntityTransactionServiceImplBase; import org.caosdb.api.entity.v1alpha1.FileDescriptor; import org.caosdb.api.entity.v1alpha1.IdResponse; +import org.caosdb.api.entity.v1alpha1.InsertRequest; +import org.caosdb.api.entity.v1alpha1.InsertResponse; import org.caosdb.api.entity.v1alpha1.MessageCode; import org.caosdb.api.entity.v1alpha1.MultiTransactionRequest; import org.caosdb.api.entity.v1alpha1.MultiTransactionResponse; import org.caosdb.api.entity.v1alpha1.Parent; -import org.caosdb.api.entity.v1alpha1.QueryOrIdRequest; import org.caosdb.api.entity.v1alpha1.RetrieveResponse; import org.caosdb.api.entity.v1alpha1.TransactionRequest; import org.caosdb.api.entity.v1alpha1.TransactionRequest.WrappedRequestsCase; import org.caosdb.api.entity.v1alpha1.TransactionResponse; +import org.caosdb.api.entity.v1alpha1.UpdateRequest; +import org.caosdb.api.entity.v1alpha1.UpdateResponse; import org.caosdb.api.entity.v1alpha1.Version; import org.caosdb.server.datatype.GenericValue; import org.caosdb.server.datatype.Value; @@ -64,39 +70,44 @@ public class EntityTransactionServiceImpl extends EntityTransactionServiceImplBa return null; } - public Entity convert(final EntityInterface from) { - final Builder builder = Entity.newBuilder(); + public EntityResponse.Builder convert(final EntityInterface from) { + + final Builder entityBuilder = Entity.newBuilder(); if (from.getRole() != null) { - builder.setRole(from.getRole().toString()); + entityBuilder.setRole(from.getRole().toString()); } if (from.hasId()) { - builder.setId(from.getId().toString()); + entityBuilder.setId(from.getId().toString()); } if (from.hasVersion()) { - builder.setVersion(convert(from.getVersion())); + entityBuilder.setVersion(convert(from.getVersion())); } if (from.hasName()) { - builder.setName(from.getName()); + entityBuilder.setName(from.getName()); } if (from.hasDescription()) { - builder.setDescription(from.getDescription()); + entityBuilder.setDescription(from.getDescription()); } if (from.hasDatatype()) { - builder.setDatatype(from.getDatatype().getName()); + entityBuilder.setDatatype(from.getDatatype().getName()); } if (from.hasUnit()) { - builder.setUnit(getStringUnit(from)); + entityBuilder.setUnit(getStringUnit(from)); } if (from.hasProperties()) { - builder.addAllProperties(convert(from.getProperties())); + entityBuilder.addAllProperties(convert(from.getProperties())); } if (from.hasParents()) { - builder.addAllParents(convert(from.getParents())); + entityBuilder.addAllParents(convert(from.getParents())); } - appendMessages(from, builder); - return builder.build(); + final EntityResponse.Builder responseBuilder = EntityResponse.newBuilder(); + responseBuilder.setEntity(entityBuilder); + + appendMessages(from, responseBuilder); + + return responseBuilder; } public static Iterable<? extends org.caosdb.api.entity.v1alpha1.Message> convert( @@ -223,17 +234,25 @@ public class EntityTransactionServiceImpl extends EntityTransactionServiceImplBa final RetrieveContainer container = new RetrieveContainer( SecurityUtils.getSubject(), getTimestamp(), getSRID(), new HashMap<>()); + FileDownload file_download = null; for (final TransactionRequest sub_request : request.getRequestsList()) { + final boolean fileDownload = sub_request.getRetrieveRequest().getRegisterFileDownload(); if (sub_request.getRetrieveRequest().hasQuery() && !sub_request.getRetrieveRequest().getQuery().getQuery().isBlank()) { final String query = sub_request.getRetrieveRequest().getQuery().getQuery(); container.getFlags().put("query", query); + if (fileDownload) { + container.getFlags().put("download_files", "true"); + } } else { final String id = sub_request.getRetrieveRequest().getId(); if (!id.isBlank()) { try { final RetrieveEntity entity = new RetrieveEntity(getId(id)); + if (fileDownload) { + entity.setFlag("download_files", "true"); + } container.add(entity); } catch (final NumberFormatException e) { // We handle this after the retrieval @@ -250,10 +269,21 @@ public class EntityTransactionServiceImpl extends EntityTransactionServiceImplBa .addResponsesBuilder() .setRetrieveResponse(RetrieveResponse.newBuilder().setCountResult(count)); } else { + final boolean download_files_container = container.getFlags().containsKey("download_files"); for (final EntityInterface entity : container) { + final EntityResponse.Builder entityResponse = convert(entity); + if ((download_files_container || entity.getFlags().containsKey("download_files")) + && entity.hasFileProperties()) { + if (file_download == null) { + file_download = fileTransmissionService.registerFileDownload(null); + } + entityResponse.setDownloadId( + fileTransmissionService.registerFileDownload( + file_download.getId(), entity.getFileProperties())); + } builder .addResponsesBuilder() - .setRetrieveResponse(RetrieveResponse.newBuilder().setEntity(convert(entity))); + .setRetrieveResponse(RetrieveResponse.newBuilder().setEntityResponse(entityResponse)); } } @@ -270,17 +300,17 @@ public class EntityTransactionServiceImpl extends EntityTransactionServiceImplBa builder.addResponses( TransactionResponse.newBuilder() .setRetrieveResponse( - RetrieveResponse.newBuilder().setEntity(entityDoesNotExist(id)))); + RetrieveResponse.newBuilder().setEntityResponse(entityDoesNotExist(id)))); } } } return builder.build(); } - private Entity entityDoesNotExist(final String id) { - return Entity.newBuilder() + private EntityResponse entityDoesNotExist(final String id) { + return EntityResponse.newBuilder() .addErrors(convert(ServerMessages.ENTITY_DOES_NOT_EXIST)) - .setId(id) + .setEntity(Entity.newBuilder().setId(id)) .build(); } @@ -331,7 +361,7 @@ public class EntityTransactionServiceImpl extends EntityTransactionServiceImplBa } } - private IdResponse delete(final QueryOrIdRequest deleteRequest) throws Exception { + private DeleteResponse delete(final DeleteRequest deleteRequest) throws Exception { final String id = deleteRequest.getId(); final org.caosdb.api.entity.v1alpha1.IdResponse.Builder builder = IdResponse.newBuilder(); @@ -349,36 +379,44 @@ public class EntityTransactionServiceImpl extends EntityTransactionServiceImplBa } catch (final NumberFormatException e) { // ID wasn't an integer - the server doesn't support string id's yet, so that entity cannot // exist. - builder.addEntityErrors(convert(ServerMessages.ENTITY_DOES_NOT_EXIST)); + builder.addErrors(convert(ServerMessages.ENTITY_DOES_NOT_EXIST)); } - return builder.build(); + return DeleteResponse.newBuilder().setIdResponse(builder).build(); } - private IdResponse insert(final EntityRequest entityRequest) throws Exception { + private InsertResponse insert(final InsertRequest insertRequest) throws Exception { + final EntityRequest entityRequest = insertRequest.getEntityRequest(); final Entity insertEntity = entityRequest.getEntity(); final InsertEntity entity = new InsertEntity( insertEntity.getName().isEmpty() ? null : insertEntity.getName(), Role.parse(insertEntity.getRole())); - return write(convert(insertEntity, entity), entityRequest); + return InsertResponse.newBuilder() + .setIdResponse(write(convert(insertEntity, entity), entityRequest)) + .build(); } - private IdResponse update(final EntityRequest entityRequest) throws Exception { + private UpdateResponse update(final UpdateRequest updateRequest) throws Exception { + final EntityRequest entityRequest = updateRequest.getEntityRequest(); final Entity updateEntity = entityRequest.getEntity(); final String id = updateEntity.getId(); try { final UpdateEntity entity = new UpdateEntity(getId(id), Role.parse(updateEntity.getRole())); entity.setName(updateEntity.getName().isEmpty() ? null : updateEntity.getName()); - return write(convert(updateEntity, entity), entityRequest); + return UpdateResponse.newBuilder() + .setIdResponse(write(convert(updateEntity, entity), entityRequest)) + .build(); } catch (final NumberFormatException e) { // ID wasn't an integer - the server doesn't support string id's yet, so that entity cannot // exist. - return IdResponse.newBuilder() - .setId(id) - .addEntityErrors(convert(ServerMessages.ENTITY_DOES_NOT_EXIST)) + return UpdateResponse.newBuilder() + .setIdResponse( + IdResponse.newBuilder() + .setId(id) + .addErrors(convert(ServerMessages.ENTITY_DOES_NOT_EXIST))) .build(); } } @@ -407,7 +445,8 @@ public class EntityTransactionServiceImpl extends EntityTransactionServiceImplBa } private void appendMessages( - final EntityInterface from, final org.caosdb.api.entity.v1alpha1.Entity.Builder builder) { + final EntityInterface from, + final org.caosdb.api.entity.v1alpha1.EntityResponse.Builder builder) { if (from.hasMessage(Message.MessageType.Error.toString())) { builder.addAllErrors(convert(from.getMessages(Message.MessageType.Error.toString()))); } @@ -422,14 +461,13 @@ public class EntityTransactionServiceImpl extends EntityTransactionServiceImplBa private void appendMessages( final EntityInterface from, final org.caosdb.api.entity.v1alpha1.IdResponse.Builder builder) { if (from.hasMessage(Message.MessageType.Error.toString())) { - builder.addAllEntityErrors(convert(from.getMessages(Message.MessageType.Error.toString()))); + builder.addAllErrors(convert(from.getMessages(Message.MessageType.Error.toString()))); } if (from.hasMessage(Message.MessageType.Warning.toString())) { - builder.addAllEntityWarnings( - convert(from.getMessages(Message.MessageType.Warning.toString()))); + builder.addAllWarnings(convert(from.getMessages(Message.MessageType.Warning.toString()))); } if (from.hasMessage(Message.MessageType.Info.toString())) { - builder.addAllEntityInfos(convert(from.getMessages(Message.MessageType.Info.toString()))); + builder.addAllInfos(convert(from.getMessages(Message.MessageType.Info.toString()))); } } diff --git a/src/main/java/org/caosdb/server/grpc/FileDownload.java b/src/main/java/org/caosdb/server/grpc/FileDownload.java index 297dcaba..d17f05c1 100644 --- a/src/main/java/org/caosdb/server/grpc/FileDownload.java +++ b/src/main/java/org/caosdb/server/grpc/FileDownload.java @@ -1,81 +1,50 @@ package org.caosdb.server.grpc; +import java.io.FileNotFoundException; +import java.io.IOException; import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; import java.util.Map; -import org.apache.shiro.SecurityUtils; -import org.caosdb.api.entity.v1alpha1.FileDownloadHeader; -import org.caosdb.api.entity.v1alpha1.FileIdentifier; -import org.caosdb.server.entity.Entity; +import org.caosdb.api.entity.v1alpha1.FileDownloadResponse; +import org.caosdb.api.entity.v1alpha1.FileTransmissionSettings; import org.caosdb.server.entity.FileProperties; -import org.caosdb.server.entity.RetrieveEntity; -import org.caosdb.server.entity.container.RetrieveContainer; -import org.caosdb.server.entity.container.TransactionContainer; -import org.caosdb.server.transaction.Retrieve; -import org.caosdb.server.transaction.RetrieveSparseEntityByPath; -import org.caosdb.server.transaction.Transaction; +import org.caosdb.server.utils.Utils; public class FileDownload extends FileTransmission { Map<String, DownloadBuffer> buffers = new HashMap<>(); + private final FileTransmissionSettings settings; - public FileDownload(final String id, final List<FileIdentifier> filesList) throws Exception { + public FileDownload(final FileTransmissionSettings settings, final String id) throws Exception { super(id); - final TransactionContainer byPathContainer = new TransactionContainer(); - final RetrieveContainer byIdContainer = - new RetrieveContainer(SecurityUtils.getSubject(), createdTimestamp, id, new HashMap<>()); - - for (final FileIdentifier fileId : filesList) { - final String entity_id = fileId.getEntityId(); - final String path = fileId.getPath(); - if (!entity_id.isBlank()) { - final Entity e = new RetrieveEntity(entity_id); - addBufferById(fileId, entity_id, e); - byIdContainer.add(e); - } else if (!path.isBlank()) { - final Entity e = new RetrieveEntity(0); - addBufferByPath(fileId, path, e); - final FileProperties fp = new FileProperties(null, path, null); - e.setFileProperties(fp); - byPathContainer.add(e); - } - } - // TODO wrap into one transaction - if (!byPathContainer.isEmpty()) { - final Transaction<?> tByPath = new RetrieveSparseEntityByPath(byPathContainer); - tByPath.execute(); - } - if (!byIdContainer.isEmpty()) { - final Transaction<?> tById = new Retrieve(byIdContainer); - tById.execute(); - } + this.settings = settings; } - private void addBufferByPath( - final FileIdentifier fileIdentifier, final String path, final Entity entity) { - buffers.put("path:" + path, new DownloadBuffer(fileIdentifier, entity)); + @Override + public void cleanUp() { + buffers.forEach( + (id, buffer) -> { + buffer.cleanUp(); + }); } - private void addBufferById( - final FileIdentifier fileIdentifier, final String entity_id, final Entity entity) { - buffers.put("entity_id:" + entity_id, new DownloadBuffer(fileIdentifier, entity)); + @Override + public FileProperties getFile(final String fileId) { + return buffers.get(fileId).getFileProperties(); } @Override - public void cleanUp() {} + public FileTransmissionSettings getTransmissionSettings() { + return settings; + } - @Override - public FileProperties getFile(final String fileId) { - return buffers.get(fileId).getFileProperties(); + public String append(final FileProperties fp) { + final String id = Utils.getUID(); + buffers.put(id, new DownloadBuffer(fp)); + return id; } - public Iterable<? extends FileDownloadHeader> getFileHeaders() { - final List<FileDownloadHeader> result = new LinkedList<>(); - buffers.forEach( - (file_id, buffer) -> { - result.add(buffer.getFileDownloadHeader()); - }); - return result; + public FileDownloadResponse getNextChunk(final String fileId) + throws FileNotFoundException, IOException { + return buffers.get(fileId).getNextChunk(); } } diff --git a/src/main/java/org/caosdb/server/grpc/FileDownloadRegistration.java b/src/main/java/org/caosdb/server/grpc/FileDownloadRegistration.java index f9052c19..b5c1e3ef 100644 --- a/src/main/java/org/caosdb/server/grpc/FileDownloadRegistration.java +++ b/src/main/java/org/caosdb/server/grpc/FileDownloadRegistration.java @@ -1,21 +1,31 @@ package org.caosdb.server.grpc; +import java.io.FileNotFoundException; +import java.io.IOException; import java.util.HashMap; -import java.util.List; import java.util.Map; import org.caosdb.api.entity.v1alpha1.FileDownloadResponse; -import org.caosdb.api.entity.v1alpha1.FileIdentifier; import org.caosdb.api.entity.v1alpha1.FileTransmissionId; +import org.caosdb.api.entity.v1alpha1.FileTransmissionSettings; +import org.caosdb.server.entity.FileProperties; import org.caosdb.server.utils.Utils; public class FileDownloadRegistration { private final Map<String, FileDownload> registeredDownloads = new HashMap<>(); - public FileDownload registerFileDownload( - final List<FileIdentifier> filesList, final long maxChunkSize, final long maxFileSize) + public FileTransmissionId registerFileDownload( + final String registration_id, final FileProperties fp) { + final String file_id = registeredDownloads.get(registration_id).append(fp); + return FileTransmissionId.newBuilder() + .setRegistrationId(registration_id) + .setFileId(file_id) + .build(); + } + + public FileDownload registerFileDownload(final FileTransmissionSettings settings) throws Exception { - final FileDownload result = new FileDownload(Utils.getUID(), filesList); + final FileDownload result = new FileDownload(settings, Utils.getUID()); register(result); return result; } @@ -24,7 +34,10 @@ public class FileDownloadRegistration { registeredDownloads.put(fileTransmission.getId(), fileTransmission); } - public FileDownloadResponse downloadNextChunk(final FileTransmissionId fileTransmissionId) { - return null; + public FileDownloadResponse downloadNextChunk(final FileTransmissionId fileTransmissionId) + throws FileNotFoundException, IOException { + return registeredDownloads + .get(fileTransmissionId.getRegistrationId()) + .getNextChunk(fileTransmissionId.getFileId()); } } diff --git a/src/main/java/org/caosdb/server/grpc/FileTransmission.java b/src/main/java/org/caosdb/server/grpc/FileTransmission.java index 65769e2e..09631b4e 100644 --- a/src/main/java/org/caosdb/server/grpc/FileTransmission.java +++ b/src/main/java/org/caosdb/server/grpc/FileTransmission.java @@ -1,6 +1,7 @@ package org.caosdb.server.grpc; import java.util.concurrent.locks.ReentrantLock; +import org.caosdb.api.entity.v1alpha1.FileTransmissionSettings; import org.caosdb.api.entity.v1alpha1.RegistrationStatus; import org.caosdb.server.entity.FileProperties; @@ -52,4 +53,6 @@ public abstract class FileTransmission { } public abstract FileProperties getFile(final String fileId); + + public abstract FileTransmissionSettings getTransmissionSettings(); } diff --git a/src/main/java/org/caosdb/server/grpc/FileTransmissionServiceImpl.java b/src/main/java/org/caosdb/server/grpc/FileTransmissionServiceImpl.java index e50ba2d4..69533bcc 100644 --- a/src/main/java/org/caosdb/server/grpc/FileTransmissionServiceImpl.java +++ b/src/main/java/org/caosdb/server/grpc/FileTransmissionServiceImpl.java @@ -1,15 +1,15 @@ package org.caosdb.server.grpc; import io.grpc.stub.StreamObserver; +import java.io.IOException; import org.caosdb.api.entity.v1alpha1.FileChunk; import org.caosdb.api.entity.v1alpha1.FileDownloadRequest; import org.caosdb.api.entity.v1alpha1.FileDownloadResponse; import org.caosdb.api.entity.v1alpha1.FileTransmissionId; import org.caosdb.api.entity.v1alpha1.FileTransmissionServiceGrpc.FileTransmissionServiceImplBase; +import org.caosdb.api.entity.v1alpha1.FileTransmissionSettings; import org.caosdb.api.entity.v1alpha1.FileUploadRequest; import org.caosdb.api.entity.v1alpha1.FileUploadResponse; -import org.caosdb.api.entity.v1alpha1.RegisterFileDownloadRequest; -import org.caosdb.api.entity.v1alpha1.RegisterFileDownloadResponse; import org.caosdb.api.entity.v1alpha1.RegisterFileUploadRequest; import org.caosdb.api.entity.v1alpha1.RegisterFileUploadResponse; import org.caosdb.api.entity.v1alpha1.RegisterFileUploadResponse.Builder; @@ -19,9 +19,59 @@ import org.caosdb.server.entity.FileProperties; public class FileTransmissionServiceImpl extends FileTransmissionServiceImplBase { + public class FileUploadStreamObserver implements StreamObserver<FileUploadRequest> { + + private FileUpload fileUpload = null; + private FileTransmissionId fileTransmissionId = null; + private final StreamObserver<FileUploadResponse> outputStreamObserver; + private TransmissionStatus status; + + public FileUploadStreamObserver(final StreamObserver<FileUploadResponse> outputStreamObserver) { + this.outputStreamObserver = outputStreamObserver; + } + + @Override + public void onError(final Throwable throwable) {} + + @Override + public void onCompleted() { + fileUpload.cleanUp(); + final FileUploadResponse response = FileUploadResponse.newBuilder().setStatus(status).build(); + outputStreamObserver.onNext(response); + outputStreamObserver.onCompleted(); + } + + @Override + public void onNext(final FileUploadRequest request) { + final FileChunk chunk = request.getChunk(); + if (chunk.hasFileTransmissionId()) { + fileUpload = + fileUploadRegistration.getFileUpload(chunk.getFileTransmissionId().getRegistrationId()); + fileTransmissionId = chunk.getFileTransmissionId(); + } else { + + try { + status = fileUpload.uploadChunk(fileTransmissionId.getFileId(), chunk); + } catch (final IOException e) { + status = TransmissionStatus.TRANSMISSION_STATUS_ERROR; + e.printStackTrace(); + } + } + } + } + FileUploadRegistration fileUploadRegistration = new FileUploadRegistration(); FileDownloadRegistration fileDownloadRegistration = new FileDownloadRegistration(); + FileTransmissionId registerFileDownload(final String registration_id, final FileProperties fp) + throws Exception { + return fileDownloadRegistration.registerFileDownload(registration_id, fp); + } + + FileDownload registerFileDownload(final FileTransmissionSettings settings) throws Exception { + return fileDownloadRegistration.registerFileDownload(settings); + } + public FileProperties getUploadFile(final FileTransmissionId uploadId) { return fileUploadRegistration.getUploadFile(uploadId); } @@ -36,8 +86,7 @@ public class FileTransmissionServiceImpl extends FileTransmissionServiceImplBase builder.setStatus(result.getRegistrationStatus()); if (result.getRegistrationStatus() == RegistrationStatus.REGISTRATION_STATUS_ACCEPTED) { builder.setRegistrationId(result.getId()); - builder.setMaxChunkSize(result.getMaxChunkSize()); - builder.setMaxFileSize(result.getMaxFileSize()); + builder.setUploadSettings(result.getTransmissionSettings()); } final RegisterFileUploadResponse response = builder.build(); @@ -51,45 +100,9 @@ public class FileTransmissionServiceImpl extends FileTransmissionServiceImplBase } @Override - public void fileUpload( - final FileUploadRequest request, final StreamObserver<FileUploadResponse> responseObserver) { - try { - final FileChunk chunk = request.getChunk(); - - final TransmissionStatus status = fileUploadRegistration.uploadChunk(chunk); - final FileUploadResponse response = FileUploadResponse.newBuilder().setStatus(status).build(); - responseObserver.onNext(response); - responseObserver.onCompleted(); - } catch (final Exception e) { - e.printStackTrace(); - responseObserver.onError(e); - } - } - - @Override - public void registerFileDownload( - final RegisterFileDownloadRequest request, - final StreamObserver<RegisterFileDownloadResponse> responseObserver) { - try { - final FileDownload result = - fileDownloadRegistration.registerFileDownload( - request.getFilesList(), request.getMaxChunkSize(), request.getMaxFileSize()); - final RegisterFileDownloadResponse.Builder builder = - RegisterFileDownloadResponse.newBuilder(); - builder.setStatus(result.getRegistrationStatus()); - if (result.getRegistrationStatus() == RegistrationStatus.REGISTRATION_STATUS_ACCEPTED) { - builder.setRegistrationId(result.getId()); - builder.addAllFiles(result.getFileHeaders()); - } - - final RegisterFileDownloadResponse response = builder.build(); - responseObserver.onNext(response); - responseObserver.onCompleted(); - - } catch (final Exception e) { - e.printStackTrace(); - responseObserver.onError(e); - } + public StreamObserver<FileUploadRequest> fileUpload( + final StreamObserver<FileUploadResponse> responseObserver) { + return new FileUploadStreamObserver(responseObserver); } @Override diff --git a/src/main/java/org/caosdb/server/grpc/FileUpload.java b/src/main/java/org/caosdb/server/grpc/FileUpload.java index dc43227e..eedca00d 100644 --- a/src/main/java/org/caosdb/server/grpc/FileUpload.java +++ b/src/main/java/org/caosdb/server/grpc/FileUpload.java @@ -5,6 +5,9 @@ import java.io.File; import java.io.IOException; import java.util.HashMap; import java.util.Map; +import org.caosdb.api.entity.v1alpha1.FileChunk; +import org.caosdb.api.entity.v1alpha1.FileTransmissionSettings; +import org.caosdb.api.entity.v1alpha1.FileTransmissionSettings.Builder; import org.caosdb.api.entity.v1alpha1.TransmissionStatus; import org.caosdb.server.FileSystem; import org.caosdb.server.entity.FileProperties; @@ -57,4 +60,17 @@ public class FileUpload extends FileTransmission { return null; } } + + @Override + public FileTransmissionSettings getTransmissionSettings() { + final Builder builder = FileTransmissionSettings.newBuilder(); + builder.setMaxChunkSize(getMaxChunkSize()); + builder.setMaxFileSize(getMaxFileSize()); + return builder.build(); + } + + public TransmissionStatus uploadChunk(final String fileId, final FileChunk chunk) + throws IOException { + return upload(fileId, chunk.getData()); + } } diff --git a/src/main/java/org/caosdb/server/grpc/FileUploadRegistration.java b/src/main/java/org/caosdb/server/grpc/FileUploadRegistration.java index 074c201c..5612d08f 100644 --- a/src/main/java/org/caosdb/server/grpc/FileUploadRegistration.java +++ b/src/main/java/org/caosdb/server/grpc/FileUploadRegistration.java @@ -1,11 +1,8 @@ package org.caosdb.server.grpc; -import java.io.IOException; import java.util.HashMap; import java.util.Map; -import org.caosdb.api.entity.v1alpha1.FileChunk; import org.caosdb.api.entity.v1alpha1.FileTransmissionId; -import org.caosdb.api.entity.v1alpha1.TransmissionStatus; import org.caosdb.server.entity.FileProperties; import org.caosdb.server.utils.Utils; @@ -31,11 +28,7 @@ public class FileUploadRegistration { return fileTransmission.getFile(fileId); } - public TransmissionStatus uploadChunk(final FileChunk chunk) throws IOException { - final String fileId = chunk.getFileTransmissionId().getFileId(); - final String registrationId = chunk.getFileTransmissionId().getRegistrationId(); - - final FileUpload fileTransmission = registeredUploads.get(registrationId); - return fileTransmission.upload(fileId, chunk.getData()); + public FileUpload getFileUpload(final String registrationId) { + return registeredUploads.get(registrationId); } } -- GitLab