Skip to content
Snippets Groups Projects
Verified Commit feb6cfa9 authored by Timm Fitschen's avatar Timm Fitschen
Browse files

WIP: upload/download files

parent 4944f340
No related branches found
No related tags found
3 merge requests!44Release 0.6,!43Merge f-GRPC-main to dev,!26F grpc f files
Pipeline #11200 failed
caosdb-proto @ e6188445
Subproject commit daf705ef4e86c30f96b3aac429a66667f50c7b77 Subproject commit e6188445188a88c16c9a603060b40334e509ece6
...@@ -25,7 +25,7 @@ ...@@ -25,7 +25,7 @@
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<groupId>org.caosdb</groupId> <groupId>org.caosdb</groupId>
<artifactId>caosdb-server</artifactId> <artifactId>caosdb-server</artifactId>
<version>0.5.0-GRPC0.0.9</version> <version>0.5.0-GRPC0.0.10</version>
<packaging>jar</packaging> <packaging>jar</packaging>
<name>CaosDB Server</name> <name>CaosDB Server</name>
<scm> <scm>
......
...@@ -9,6 +9,7 @@ import java.util.UUID; ...@@ -9,6 +9,7 @@ import java.util.UUID;
import org.apache.shiro.SecurityUtils; import org.apache.shiro.SecurityUtils;
import org.caosdb.api.entity.v1alpha1.Entity; import org.caosdb.api.entity.v1alpha1.Entity;
import org.caosdb.api.entity.v1alpha1.Entity.Builder; import org.caosdb.api.entity.v1alpha1.Entity.Builder;
import org.caosdb.api.entity.v1alpha1.EntityRequest;
import org.caosdb.api.entity.v1alpha1.EntityTransactionServiceGrpc.EntityTransactionServiceImplBase; import org.caosdb.api.entity.v1alpha1.EntityTransactionServiceGrpc.EntityTransactionServiceImplBase;
import org.caosdb.api.entity.v1alpha1.IdResponse; import org.caosdb.api.entity.v1alpha1.IdResponse;
import org.caosdb.api.entity.v1alpha1.MessageCode; import org.caosdb.api.entity.v1alpha1.MessageCode;
...@@ -25,6 +26,7 @@ import org.caosdb.server.datatype.GenericValue; ...@@ -25,6 +26,7 @@ import org.caosdb.server.datatype.GenericValue;
import org.caosdb.server.datatype.Value; import org.caosdb.server.datatype.Value;
import org.caosdb.server.entity.DeleteEntity; import org.caosdb.server.entity.DeleteEntity;
import org.caosdb.server.entity.EntityInterface; import org.caosdb.server.entity.EntityInterface;
import org.caosdb.server.entity.FileProperties;
import org.caosdb.server.entity.InsertEntity; import org.caosdb.server.entity.InsertEntity;
import org.caosdb.server.entity.MagicTypes; import org.caosdb.server.entity.MagicTypes;
import org.caosdb.server.entity.Message; import org.caosdb.server.entity.Message;
...@@ -43,6 +45,12 @@ import org.caosdb.server.utils.ServerMessages; ...@@ -43,6 +45,12 @@ import org.caosdb.server.utils.ServerMessages;
public class EntityTransactionServiceImpl extends EntityTransactionServiceImplBase { public class EntityTransactionServiceImpl extends EntityTransactionServiceImplBase {
public EntityTransactionServiceImpl(final FileTransmissionServiceImpl fileTransmissionService) {
this.fileTransmissionService = fileTransmissionService;
}
private final FileTransmissionServiceImpl fileTransmissionService;
public String getStringUnit(final EntityInterface entity) { public String getStringUnit(final EntityInterface entity) {
for (final Property p : entity.getProperties()) { for (final Property p : entity.getProperties()) {
if (MagicTypes.UNIT.getId() == p.getId()) { if (MagicTypes.UNIT.getId() == p.getId()) {
...@@ -319,12 +327,17 @@ public class EntityTransactionServiceImpl extends EntityTransactionServiceImplBa ...@@ -319,12 +327,17 @@ public class EntityTransactionServiceImpl extends EntityTransactionServiceImplBa
return builder.build(); return builder.build();
} }
private IdResponse insert(final Entity insertEntity) throws Exception { private IdResponse insert(final EntityRequest entityRequest) throws Exception {
final org.caosdb.api.entity.v1alpha1.IdResponse.Builder builder = IdResponse.newBuilder(); final org.caosdb.api.entity.v1alpha1.IdResponse.Builder builder = IdResponse.newBuilder();
final WritableContainer container = final WritableContainer container =
new WritableContainer(SecurityUtils.getSubject(), getTimestamp(), getSRID(), null); new WritableContainer(SecurityUtils.getSubject(), getTimestamp(), getSRID(), null);
final InsertEntity entity = convert(insertEntity); final InsertEntity entity = convert(entityRequest.getEntity());
if (entityRequest.hasUploadId()) {
final FileProperties uploadFile =
fileTransmissionService.getUploadFile(entityRequest.getUploadId());
container.addFile(uploadFile.getTmpIdentifyer(), uploadFile);
}
container.add(entity); container.add(entity);
final WriteTransaction transaction = new WriteTransaction(container); final WriteTransaction transaction = new WriteTransaction(container);
transaction.execute(); transaction.execute();
......
package org.caosdb.server.grpc;
import java.util.List;
import org.caosdb.api.entity.v1alpha1.FileDownloadResponse;
import org.caosdb.api.entity.v1alpha1.FileIdentifier;
public class FileDownloadRegistration {
public FileTransmission registerFileDownload(
final List<FileIdentifier> filesList, final long maxChunkSize, final long maxFileSize) {
// TODO Auto-generated method stub
return null;
}
public FileDownloadResponse downloadNextChunk(final String registrationId) {
// TODO Auto-generated method stub
return null;
}
}
package org.caosdb.server.grpc;
import com.google.protobuf.ByteString;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;
import org.caosdb.api.entity.v1alpha1.RegistrationStatus;
import org.caosdb.api.entity.v1alpha1.TransmissionStatus;
import org.caosdb.server.FileSystem;
import org.caosdb.server.entity.FileProperties;
public class FileTransmission {
ReentrantLock lock = new ReentrantLock();
Map<String, UploadBuffer> buffers = new HashMap<>();
private final String id;
private File tmpDir;
private final long createdTimestamp;
private long touchedTimestamp;
private final RegistrationStatus status;
public FileTransmission(final String id) {
this.status = RegistrationStatus.REGISTRATION_STATUS_ACCEPTED;
this.createdTimestamp = System.currentTimeMillis();
this.touchedTimestamp = createdTimestamp;
this.tmpDir = null;
this.id = id;
}
public void cleanUp() {
if (tmpDir != null) {
org.apache.commons.io.FileUtils.deleteQuietly(tmpDir);
}
}
private File getTmpDir() {
if (tmpDir == null) {
tmpDir = new File(FileSystem.getTmp() + id + "/");
tmpDir.mkdirs();
}
return tmpDir;
}
public long getCreatedTimestamp() {
return createdTimestamp;
}
public long getTouchedTimestamp() {
return this.touchedTimestamp;
}
public void touch() {
this.touchedTimestamp = System.currentTimeMillis();
}
public String getId() {
return this.id;
}
public RegistrationStatus getRegistrationStatus() {
return status;
}
public long getMaxChunkSize() {
// 2^24, 16.78 MB
return 16777216;
}
public long getMaxFileSize() {
// 2^30, 1.074 GB
return 1073741824;
}
public TransmissionStatus upload(final String fileId, final int chunkNo, final ByteString data)
throws IOException {
synchronized (lock) {
return getFileBuffer(fileId).write(data);
}
}
private UploadBuffer getFileBuffer(final String fileId) {
if (!buffers.containsKey(fileId)) {
buffers.put(fileId, new UploadBuffer(getTmpDir()));
}
return buffers.get(fileId);
}
public FileProperties getFile(final String fileId) {
synchronized (lock) {
if (buffers.containsKey(fileId)) {
return buffers.get(fileId).toFileProperties();
}
return null;
}
}
}
package org.caosdb.server.grpc;
import io.grpc.stub.StreamObserver;
import org.caosdb.api.entity.v1alpha1.FileChunk;
import org.caosdb.api.entity.v1alpha1.FileDownloadRequest;
import org.caosdb.api.entity.v1alpha1.FileDownloadResponse;
import org.caosdb.api.entity.v1alpha1.FileTransmissionId;
import org.caosdb.api.entity.v1alpha1.FileTransmissionServiceGrpc.FileTransmissionServiceImplBase;
import org.caosdb.api.entity.v1alpha1.FileUploadRequest;
import org.caosdb.api.entity.v1alpha1.FileUploadResponse;
import org.caosdb.api.entity.v1alpha1.RegisterFileDownloadRequest;
import org.caosdb.api.entity.v1alpha1.RegisterFileDownloadResponse;
import org.caosdb.api.entity.v1alpha1.RegisterFileUploadRequest;
import org.caosdb.api.entity.v1alpha1.RegisterFileUploadResponse;
import org.caosdb.api.entity.v1alpha1.RegisterFileUploadResponse.Builder;
import org.caosdb.api.entity.v1alpha1.RegistrationStatus;
import org.caosdb.api.entity.v1alpha1.TransmissionStatus;
import org.caosdb.server.entity.FileProperties;
public class FileTransmissionServiceImpl extends FileTransmissionServiceImplBase {
FileUploadRegistration fileUploadRegistration = new FileUploadRegistration();
FileDownloadRegistration fileDownloadRegistration = new FileDownloadRegistration();
public FileProperties getUploadFile(final FileTransmissionId uploadId) {
return fileUploadRegistration.getUploadFile(uploadId);
}
@Override
public void registerFileUpload(
final RegisterFileUploadRequest request,
final StreamObserver<RegisterFileUploadResponse> responseObserver) {
try {
final FileTransmission result = fileUploadRegistration.registerFileUpload();
final Builder builder = RegisterFileUploadResponse.newBuilder();
builder.setStatus(result.getRegistrationStatus());
if (result.getRegistrationStatus() == RegistrationStatus.REGISTRATION_STATUS_ACCEPTED) {
builder.setRegistrationId(result.getId());
builder.setMaxChunkSize(result.getMaxChunkSize());
builder.setMaxFileSize(result.getMaxFileSize());
}
final RegisterFileUploadResponse response = builder.build();
responseObserver.onNext(response);
responseObserver.onCompleted();
} catch (final Exception e) {
e.printStackTrace();
responseObserver.onError(e);
}
}
@Override
public void fileUpload(
final FileUploadRequest request, final StreamObserver<FileUploadResponse> responseObserver) {
try {
final FileChunk chunk = request.getChunk();
final TransmissionStatus status = fileUploadRegistration.uploadChunk(chunk);
final FileUploadResponse response = FileUploadResponse.newBuilder().setStatus(status).build();
responseObserver.onNext(response);
responseObserver.onCompleted();
} catch (final Exception e) {
e.printStackTrace();
responseObserver.onError(e);
}
}
@Override
public void registerFileDownload(
final RegisterFileDownloadRequest request,
final StreamObserver<RegisterFileDownloadResponse> responseObserver) {
try {
final FileTransmission result =
fileDownloadRegistration.registerFileDownload(
request.getFilesList(), request.getMaxChunkSize(), request.getMaxFileSize());
final RegisterFileDownloadResponse.Builder builder =
RegisterFileDownloadResponse.newBuilder();
builder.setStatus(result.getRegistrationStatus());
if (result.getRegistrationStatus() == RegistrationStatus.REGISTRATION_STATUS_ACCEPTED) {
builder.setRegistrationId(result.getId());
}
final RegisterFileDownloadResponse response = builder.build();
responseObserver.onNext(response);
responseObserver.onCompleted();
} catch (final Exception e) {
e.printStackTrace();
responseObserver.onError(e);
}
}
@Override
public void fileDownload(
final FileDownloadRequest request,
final StreamObserver<FileDownloadResponse> responseObserver) {
try {
final FileDownloadResponse response =
fileDownloadRegistration.downloadNextChunk(request.getRegistrationId());
responseObserver.onNext(response);
responseObserver.onCompleted();
} catch (final Exception e) {
e.printStackTrace();
responseObserver.onError(e);
}
}
}
package org.caosdb.server.grpc;
import java.util.HashMap;
import java.util.Map;
import org.caosdb.api.entity.v1alpha1.FileChunk;
import org.caosdb.api.entity.v1alpha1.FileTransmissionId;
import org.caosdb.api.entity.v1alpha1.TransmissionStatus;
import org.caosdb.server.entity.FileProperties;
import org.caosdb.server.utils.Utils;
public class FileUploadRegistration {
private final Map<String, FileTransmission> registeredUploads = new HashMap<>();
public FileTransmission registerFileUpload() {
final FileTransmission result = new FileTransmission(Utils.getUID());
register(result);
return result;
}
private void register(final FileTransmission fileTransmission) {
registeredUploads.put(fileTransmission.getId(), fileTransmission);
}
public FileProperties getUploadFile(final FileTransmissionId uploadId) {
final String fileId = uploadId.getFileId();
final String registrationId = uploadId.getRegistrationId();
final FileTransmission fileTransmission = registeredUploads.get(registrationId);
return fileTransmission.getFile(fileId);
}
public TransmissionStatus uploadChunk(final FileChunk chunk) {
final String fileId = chunk.getId().getFileTransmissionId().getFileId();
final String registrationId = chunk.getId().getFileTransmissionId().getRegistrationId();
final FileTransmission fileTransmission = registeredUploads.get(registrationId);
return fileTransmission.upload(fileId, chunkNo, chunk.getData());
}
}
...@@ -117,8 +117,11 @@ public class GRPCServer { ...@@ -117,8 +117,11 @@ public class GRPCServer {
final GeneralInfoServiceImpl generalInfoService = new GeneralInfoServiceImpl(); final GeneralInfoServiceImpl generalInfoService = new GeneralInfoServiceImpl();
services.add(ServerInterceptors.intercept(generalInfoService, authInterceptor)); services.add(ServerInterceptors.intercept(generalInfoService, authInterceptor));
final FileTransmissionServiceImpl fileTransmissionService = new FileTransmissionServiceImpl();
services.add(ServerInterceptors.intercept(fileTransmissionService, authInterceptor));
final EntityTransactionServiceImpl entityTransactionService = final EntityTransactionServiceImpl entityTransactionService =
new EntityTransactionServiceImpl(); new EntityTransactionServiceImpl(fileTransmissionService);
services.add(ServerInterceptors.intercept(entityTransactionService, authInterceptor)); services.add(ServerInterceptors.intercept(entityTransactionService, authInterceptor));
return services; return services;
......
package org.caosdb.server.grpc;
import com.google.protobuf.ByteString;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import org.caosdb.api.entity.v1alpha1.TransmissionStatus;
import org.caosdb.server.entity.FileProperties;
public class UploadBuffer {
TransmissionStatus status = TransmissionStatus.TRANSMISSION_STATUS_UNSPECIFIED;
private final File tmpFile;
public UploadBuffer(final File tmpFile) {
this.tmpFile = tmpFile;
}
public TransmissionStatus write(final ByteString data) throws IOException {
switch (status) {
case TRANSMISSION_STATUS_UNSPECIFIED:
status = TransmissionStatus.TRANSMISSION_STATUS_GO_ON;
break;
case TRANSMISSION_STATUS_GO_ON:
break;
default:
throw new RuntimeException("Wrong transmission state.");
}
try (final FileOutputStream fileOutputStream = new FileOutputStream(tmpFile, true)) {
data.writeTo(fileOutputStream);
}
return status;
}
public FileProperties toFileProperties() {
switch (status) {
case TRANSMISSION_STATUS_SUCCESS:
break;
case TRANSMISSION_STATUS_GO_ON:
status = TransmissionStatus.TRANSMISSION_STATUS_SUCCESS;
break;
default:
throw new RuntimeException("Wrong transmission state.");
}
return new FileProperties(null, tmpFile.getAbsolutePath(), tmpFile.length());
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment