diff --git a/caosdb-proto b/caosdb-proto index daf705ef4e86c30f96b3aac429a66667f50c7b77..e6188445188a88c16c9a603060b40334e509ece6 160000 --- a/caosdb-proto +++ b/caosdb-proto @@ -1 +1 @@ -Subproject commit daf705ef4e86c30f96b3aac429a66667f50c7b77 +Subproject commit e6188445188a88c16c9a603060b40334e509ece6 diff --git a/pom.xml b/pom.xml index 35c1b840653127e467417af784101d41f611ebed..9a6afbb5a1b81caba9daec334df0f57f73bbc925 100644 --- a/pom.xml +++ b/pom.xml @@ -25,7 +25,7 @@ <modelVersion>4.0.0</modelVersion> <groupId>org.caosdb</groupId> <artifactId>caosdb-server</artifactId> - <version>0.5.0-GRPC0.0.9</version> + <version>0.5.0-GRPC0.0.10</version> <packaging>jar</packaging> <name>CaosDB Server</name> <scm> diff --git a/src/main/java/org/caosdb/server/grpc/EntityTransactionServiceImpl.java b/src/main/java/org/caosdb/server/grpc/EntityTransactionServiceImpl.java index cfec86466e4893c11c7936350a5fc9f79cd2e02e..349a2f4250890f2d5d7f2fe24a9ce5c52310eaf9 100644 --- a/src/main/java/org/caosdb/server/grpc/EntityTransactionServiceImpl.java +++ b/src/main/java/org/caosdb/server/grpc/EntityTransactionServiceImpl.java @@ -9,6 +9,7 @@ import java.util.UUID; import org.apache.shiro.SecurityUtils; import org.caosdb.api.entity.v1alpha1.Entity; import org.caosdb.api.entity.v1alpha1.Entity.Builder; +import org.caosdb.api.entity.v1alpha1.EntityRequest; import org.caosdb.api.entity.v1alpha1.EntityTransactionServiceGrpc.EntityTransactionServiceImplBase; import org.caosdb.api.entity.v1alpha1.IdResponse; import org.caosdb.api.entity.v1alpha1.MessageCode; @@ -25,6 +26,7 @@ import org.caosdb.server.datatype.GenericValue; import org.caosdb.server.datatype.Value; import org.caosdb.server.entity.DeleteEntity; import org.caosdb.server.entity.EntityInterface; +import org.caosdb.server.entity.FileProperties; import org.caosdb.server.entity.InsertEntity; import org.caosdb.server.entity.MagicTypes; import org.caosdb.server.entity.Message; @@ -43,6 +45,12 @@ import org.caosdb.server.utils.ServerMessages; public class EntityTransactionServiceImpl extends EntityTransactionServiceImplBase { + public EntityTransactionServiceImpl(final FileTransmissionServiceImpl fileTransmissionService) { + this.fileTransmissionService = fileTransmissionService; + } + + private final FileTransmissionServiceImpl fileTransmissionService; + public String getStringUnit(final EntityInterface entity) { for (final Property p : entity.getProperties()) { if (MagicTypes.UNIT.getId() == p.getId()) { @@ -319,12 +327,17 @@ public class EntityTransactionServiceImpl extends EntityTransactionServiceImplBa return builder.build(); } - private IdResponse insert(final Entity insertEntity) throws Exception { + private IdResponse insert(final EntityRequest entityRequest) throws Exception { final org.caosdb.api.entity.v1alpha1.IdResponse.Builder builder = IdResponse.newBuilder(); final WritableContainer container = new WritableContainer(SecurityUtils.getSubject(), getTimestamp(), getSRID(), null); - final InsertEntity entity = convert(insertEntity); + final InsertEntity entity = convert(entityRequest.getEntity()); + if (entityRequest.hasUploadId()) { + final FileProperties uploadFile = + fileTransmissionService.getUploadFile(entityRequest.getUploadId()); + container.addFile(uploadFile.getTmpIdentifyer(), uploadFile); + } container.add(entity); final WriteTransaction transaction = new WriteTransaction(container); transaction.execute(); diff --git a/src/main/java/org/caosdb/server/grpc/FileDownloadRegistration.java b/src/main/java/org/caosdb/server/grpc/FileDownloadRegistration.java new file mode 100644 index 0000000000000000000000000000000000000000..9da9be19beaae663e5544ef7489a92f8d750883f --- /dev/null +++ b/src/main/java/org/caosdb/server/grpc/FileDownloadRegistration.java @@ -0,0 +1,19 @@ +package org.caosdb.server.grpc; + +import java.util.List; +import org.caosdb.api.entity.v1alpha1.FileDownloadResponse; +import org.caosdb.api.entity.v1alpha1.FileIdentifier; + +public class FileDownloadRegistration { + + public FileTransmission registerFileDownload( + final List<FileIdentifier> filesList, final long maxChunkSize, final long maxFileSize) { + // TODO Auto-generated method stub + return null; + } + + public FileDownloadResponse downloadNextChunk(final String registrationId) { + // TODO Auto-generated method stub + return null; + } +} diff --git a/src/main/java/org/caosdb/server/grpc/FileTransmission.java b/src/main/java/org/caosdb/server/grpc/FileTransmission.java new file mode 100644 index 0000000000000000000000000000000000000000..fd36007cf9c15945a05acaddef199afdac163ff6 --- /dev/null +++ b/src/main/java/org/caosdb/server/grpc/FileTransmission.java @@ -0,0 +1,100 @@ +package org.caosdb.server.grpc; + +import com.google.protobuf.ByteString; +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.locks.ReentrantLock; +import org.caosdb.api.entity.v1alpha1.RegistrationStatus; +import org.caosdb.api.entity.v1alpha1.TransmissionStatus; +import org.caosdb.server.FileSystem; +import org.caosdb.server.entity.FileProperties; + +public class FileTransmission { + + ReentrantLock lock = new ReentrantLock(); + + Map<String, UploadBuffer> buffers = new HashMap<>(); + + private final String id; + private File tmpDir; + private final long createdTimestamp; + private long touchedTimestamp; + private final RegistrationStatus status; + + public FileTransmission(final String id) { + this.status = RegistrationStatus.REGISTRATION_STATUS_ACCEPTED; + this.createdTimestamp = System.currentTimeMillis(); + this.touchedTimestamp = createdTimestamp; + this.tmpDir = null; + this.id = id; + } + + public void cleanUp() { + if (tmpDir != null) { + org.apache.commons.io.FileUtils.deleteQuietly(tmpDir); + } + } + + private File getTmpDir() { + if (tmpDir == null) { + tmpDir = new File(FileSystem.getTmp() + id + "/"); + tmpDir.mkdirs(); + } + return tmpDir; + } + + public long getCreatedTimestamp() { + return createdTimestamp; + } + + public long getTouchedTimestamp() { + return this.touchedTimestamp; + } + + public void touch() { + this.touchedTimestamp = System.currentTimeMillis(); + } + + public String getId() { + return this.id; + } + + public RegistrationStatus getRegistrationStatus() { + return status; + } + + public long getMaxChunkSize() { + // 2^24, 16.78 MB + return 16777216; + } + + public long getMaxFileSize() { + // 2^30, 1.074 GB + return 1073741824; + } + + public TransmissionStatus upload(final String fileId, final int chunkNo, final ByteString data) + throws IOException { + synchronized (lock) { + return getFileBuffer(fileId).write(data); + } + } + + private UploadBuffer getFileBuffer(final String fileId) { + if (!buffers.containsKey(fileId)) { + buffers.put(fileId, new UploadBuffer(getTmpDir())); + } + return buffers.get(fileId); + } + + public FileProperties getFile(final String fileId) { + synchronized (lock) { + if (buffers.containsKey(fileId)) { + return buffers.get(fileId).toFileProperties(); + } + return null; + } + } +} diff --git a/src/main/java/org/caosdb/server/grpc/FileTransmissionServiceImpl.java b/src/main/java/org/caosdb/server/grpc/FileTransmissionServiceImpl.java new file mode 100644 index 0000000000000000000000000000000000000000..5f3c18822a91af723d60b4651a4f15390687c5df --- /dev/null +++ b/src/main/java/org/caosdb/server/grpc/FileTransmissionServiceImpl.java @@ -0,0 +1,109 @@ +package org.caosdb.server.grpc; + +import io.grpc.stub.StreamObserver; +import org.caosdb.api.entity.v1alpha1.FileChunk; +import org.caosdb.api.entity.v1alpha1.FileDownloadRequest; +import org.caosdb.api.entity.v1alpha1.FileDownloadResponse; +import org.caosdb.api.entity.v1alpha1.FileTransmissionId; +import org.caosdb.api.entity.v1alpha1.FileTransmissionServiceGrpc.FileTransmissionServiceImplBase; +import org.caosdb.api.entity.v1alpha1.FileUploadRequest; +import org.caosdb.api.entity.v1alpha1.FileUploadResponse; +import org.caosdb.api.entity.v1alpha1.RegisterFileDownloadRequest; +import org.caosdb.api.entity.v1alpha1.RegisterFileDownloadResponse; +import org.caosdb.api.entity.v1alpha1.RegisterFileUploadRequest; +import org.caosdb.api.entity.v1alpha1.RegisterFileUploadResponse; +import org.caosdb.api.entity.v1alpha1.RegisterFileUploadResponse.Builder; +import org.caosdb.api.entity.v1alpha1.RegistrationStatus; +import org.caosdb.api.entity.v1alpha1.TransmissionStatus; +import org.caosdb.server.entity.FileProperties; + +public class FileTransmissionServiceImpl extends FileTransmissionServiceImplBase { + + FileUploadRegistration fileUploadRegistration = new FileUploadRegistration(); + FileDownloadRegistration fileDownloadRegistration = new FileDownloadRegistration(); + + public FileProperties getUploadFile(final FileTransmissionId uploadId) { + return fileUploadRegistration.getUploadFile(uploadId); + } + + @Override + public void registerFileUpload( + final RegisterFileUploadRequest request, + final StreamObserver<RegisterFileUploadResponse> responseObserver) { + try { + final FileTransmission result = fileUploadRegistration.registerFileUpload(); + final Builder builder = RegisterFileUploadResponse.newBuilder(); + builder.setStatus(result.getRegistrationStatus()); + if (result.getRegistrationStatus() == RegistrationStatus.REGISTRATION_STATUS_ACCEPTED) { + builder.setRegistrationId(result.getId()); + builder.setMaxChunkSize(result.getMaxChunkSize()); + builder.setMaxFileSize(result.getMaxFileSize()); + } + + final RegisterFileUploadResponse response = builder.build(); + responseObserver.onNext(response); + responseObserver.onCompleted(); + + } catch (final Exception e) { + e.printStackTrace(); + responseObserver.onError(e); + } + } + + @Override + public void fileUpload( + final FileUploadRequest request, final StreamObserver<FileUploadResponse> responseObserver) { + try { + final FileChunk chunk = request.getChunk(); + + final TransmissionStatus status = fileUploadRegistration.uploadChunk(chunk); + final FileUploadResponse response = FileUploadResponse.newBuilder().setStatus(status).build(); + responseObserver.onNext(response); + responseObserver.onCompleted(); + } catch (final Exception e) { + e.printStackTrace(); + responseObserver.onError(e); + } + } + + @Override + public void registerFileDownload( + final RegisterFileDownloadRequest request, + final StreamObserver<RegisterFileDownloadResponse> responseObserver) { + try { + final FileTransmission result = + fileDownloadRegistration.registerFileDownload( + request.getFilesList(), request.getMaxChunkSize(), request.getMaxFileSize()); + final RegisterFileDownloadResponse.Builder builder = + RegisterFileDownloadResponse.newBuilder(); + builder.setStatus(result.getRegistrationStatus()); + if (result.getRegistrationStatus() == RegistrationStatus.REGISTRATION_STATUS_ACCEPTED) { + builder.setRegistrationId(result.getId()); + } + + final RegisterFileDownloadResponse response = builder.build(); + responseObserver.onNext(response); + responseObserver.onCompleted(); + + } catch (final Exception e) { + e.printStackTrace(); + responseObserver.onError(e); + } + } + + @Override + public void fileDownload( + final FileDownloadRequest request, + final StreamObserver<FileDownloadResponse> responseObserver) { + try { + final FileDownloadResponse response = + fileDownloadRegistration.downloadNextChunk(request.getRegistrationId()); + + responseObserver.onNext(response); + responseObserver.onCompleted(); + } catch (final Exception e) { + e.printStackTrace(); + responseObserver.onError(e); + } + } +} diff --git a/src/main/java/org/caosdb/server/grpc/FileUploadRegistration.java b/src/main/java/org/caosdb/server/grpc/FileUploadRegistration.java new file mode 100644 index 0000000000000000000000000000000000000000..29d0f95508d991bbb310adad58032e4ded42565b --- /dev/null +++ b/src/main/java/org/caosdb/server/grpc/FileUploadRegistration.java @@ -0,0 +1,40 @@ +package org.caosdb.server.grpc; + +import java.util.HashMap; +import java.util.Map; +import org.caosdb.api.entity.v1alpha1.FileChunk; +import org.caosdb.api.entity.v1alpha1.FileTransmissionId; +import org.caosdb.api.entity.v1alpha1.TransmissionStatus; +import org.caosdb.server.entity.FileProperties; +import org.caosdb.server.utils.Utils; + +public class FileUploadRegistration { + + private final Map<String, FileTransmission> registeredUploads = new HashMap<>(); + + public FileTransmission registerFileUpload() { + final FileTransmission result = new FileTransmission(Utils.getUID()); + register(result); + return result; + } + + private void register(final FileTransmission fileTransmission) { + registeredUploads.put(fileTransmission.getId(), fileTransmission); + } + + public FileProperties getUploadFile(final FileTransmissionId uploadId) { + final String fileId = uploadId.getFileId(); + final String registrationId = uploadId.getRegistrationId(); + final FileTransmission fileTransmission = registeredUploads.get(registrationId); + + return fileTransmission.getFile(fileId); + } + + public TransmissionStatus uploadChunk(final FileChunk chunk) { + final String fileId = chunk.getId().getFileTransmissionId().getFileId(); + final String registrationId = chunk.getId().getFileTransmissionId().getRegistrationId(); + + final FileTransmission fileTransmission = registeredUploads.get(registrationId); + return fileTransmission.upload(fileId, chunkNo, chunk.getData()); + } +} diff --git a/src/main/java/org/caosdb/server/grpc/GRPCServer.java b/src/main/java/org/caosdb/server/grpc/GRPCServer.java index 0fd6da7861c6a1f213acc29f7b0bf40d7218b318..51c061fa56671100fbc6ec460476ecc16c5a2ce4 100644 --- a/src/main/java/org/caosdb/server/grpc/GRPCServer.java +++ b/src/main/java/org/caosdb/server/grpc/GRPCServer.java @@ -117,8 +117,11 @@ public class GRPCServer { final GeneralInfoServiceImpl generalInfoService = new GeneralInfoServiceImpl(); services.add(ServerInterceptors.intercept(generalInfoService, authInterceptor)); + final FileTransmissionServiceImpl fileTransmissionService = new FileTransmissionServiceImpl(); + services.add(ServerInterceptors.intercept(fileTransmissionService, authInterceptor)); + final EntityTransactionServiceImpl entityTransactionService = - new EntityTransactionServiceImpl(); + new EntityTransactionServiceImpl(fileTransmissionService); services.add(ServerInterceptors.intercept(entityTransactionService, authInterceptor)); return services; diff --git a/src/main/java/org/caosdb/server/grpc/UploadBuffer.java b/src/main/java/org/caosdb/server/grpc/UploadBuffer.java new file mode 100644 index 0000000000000000000000000000000000000000..eae71e457fdf67e460c4adb3a04f76cdfdd32db8 --- /dev/null +++ b/src/main/java/org/caosdb/server/grpc/UploadBuffer.java @@ -0,0 +1,47 @@ +package org.caosdb.server.grpc; + +import com.google.protobuf.ByteString; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import org.caosdb.api.entity.v1alpha1.TransmissionStatus; +import org.caosdb.server.entity.FileProperties; + +public class UploadBuffer { + + TransmissionStatus status = TransmissionStatus.TRANSMISSION_STATUS_UNSPECIFIED; + private final File tmpFile; + + public UploadBuffer(final File tmpFile) { + this.tmpFile = tmpFile; + } + + public TransmissionStatus write(final ByteString data) throws IOException { + switch (status) { + case TRANSMISSION_STATUS_UNSPECIFIED: + status = TransmissionStatus.TRANSMISSION_STATUS_GO_ON; + break; + case TRANSMISSION_STATUS_GO_ON: + break; + default: + throw new RuntimeException("Wrong transmission state."); + } + try (final FileOutputStream fileOutputStream = new FileOutputStream(tmpFile, true)) { + data.writeTo(fileOutputStream); + } + return status; + } + + public FileProperties toFileProperties() { + switch (status) { + case TRANSMISSION_STATUS_SUCCESS: + break; + case TRANSMISSION_STATUS_GO_ON: + status = TransmissionStatus.TRANSMISSION_STATUS_SUCCESS; + break; + default: + throw new RuntimeException("Wrong transmission state."); + } + return new FileProperties(null, tmpFile.getAbsolutePath(), tmpFile.length()); + } +}