diff --git a/src/main/java/org/caosdb/server/grpc/FileDownload.java b/src/main/java/org/caosdb/server/grpc/FileDownload.java index d17f05c10cdd54e9eca1aad884bdf7272003bf00..d4fdda381ef7a1dc828c487e1d9357709b030a5c 100644 --- a/src/main/java/org/caosdb/server/grpc/FileDownload.java +++ b/src/main/java/org/caosdb/server/grpc/FileDownload.java @@ -29,7 +29,10 @@ public class FileDownload extends FileTransmission { @Override public FileProperties getFile(final String fileId) { - return buffers.get(fileId).getFileProperties(); + synchronized (lock) { + touch(); + return buffers.get(fileId).getFileProperties(); + } } @Override @@ -38,13 +41,19 @@ public class FileDownload extends FileTransmission { } public String append(final FileProperties fp) { - final String id = Utils.getUID(); - buffers.put(id, new DownloadBuffer(fp)); - return id; + synchronized (lock) { + touch(); + final String id = Utils.getUID(); + buffers.put(id, new DownloadBuffer(fp)); + return id; + } } public FileDownloadResponse getNextChunk(final String fileId) throws FileNotFoundException, IOException { - return buffers.get(fileId).getNextChunk(); + synchronized (lock) { + touch(); + return buffers.get(fileId).getNextChunk(); + } } } diff --git a/src/main/java/org/caosdb/server/grpc/FileDownloadRegistration.java b/src/main/java/org/caosdb/server/grpc/FileDownloadRegistration.java index b5c1e3efc9c25e893056d7b7acb2e4da477e85e9..579495a10ac49f13c5c21475c75efbb99ac4042a 100644 --- a/src/main/java/org/caosdb/server/grpc/FileDownloadRegistration.java +++ b/src/main/java/org/caosdb/server/grpc/FileDownloadRegistration.java @@ -3,7 +3,10 @@ package org.caosdb.server.grpc; import java.io.FileNotFoundException; import java.io.IOException; import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; import java.util.Map; +import java.util.Map.Entry; import org.caosdb.api.entity.v1alpha1.FileDownloadResponse; import org.caosdb.api.entity.v1alpha1.FileTransmissionId; import org.caosdb.api.entity.v1alpha1.FileTransmissionSettings; @@ -16,11 +19,13 @@ public class FileDownloadRegistration { public FileTransmissionId registerFileDownload( final String registration_id, final FileProperties fp) { - final String file_id = registeredDownloads.get(registration_id).append(fp); - return FileTransmissionId.newBuilder() - .setRegistrationId(registration_id) - .setFileId(file_id) - .build(); + synchronized (registeredDownloads) { + final String file_id = registeredDownloads.get(registration_id).append(fp); + return FileTransmissionId.newBuilder() + .setRegistrationId(registration_id) + .setFileId(file_id) + .build(); + } } public FileDownload registerFileDownload(final FileTransmissionSettings settings) @@ -31,13 +36,36 @@ public class FileDownloadRegistration { } private void register(final FileDownload fileTransmission) { - registeredDownloads.put(fileTransmission.getId(), fileTransmission); + synchronized (registeredDownloads) { + registeredDownloads.put(fileTransmission.getId(), fileTransmission); + } } public FileDownloadResponse downloadNextChunk(final FileTransmissionId fileTransmissionId) throws FileNotFoundException, IOException { - return registeredDownloads - .get(fileTransmissionId.getRegistrationId()) - .getNextChunk(fileTransmissionId.getFileId()); + FileDownload next; + synchronized (registeredDownloads) { + next = registeredDownloads.get(fileTransmissionId.getRegistrationId()); + } + return next.getNextChunk(fileTransmissionId.getFileId()); + } + + public void cleanUp(final boolean all) { + synchronized (registeredDownloads) { + final List<String> cleanUp = new LinkedList<>(); + for (final Entry<String, FileDownload> entry : registeredDownloads.entrySet()) { + if (all || entry.getValue().isExpired()) { + cleanUp.add(entry.getKey()); + } + } + for (final String key : cleanUp) { + registeredDownloads.get(key).cleanUp(); + registeredDownloads.remove(key); + } + } + } + + public void cleanUp() { + cleanUp(false); } } diff --git a/src/main/java/org/caosdb/server/grpc/FileTransmission.java b/src/main/java/org/caosdb/server/grpc/FileTransmission.java index 09631b4efc26b6ae842bf03051ae7fe3c194f65c..5440b15cbbdb444f595ea69845300aad1ae0e0d3 100644 --- a/src/main/java/org/caosdb/server/grpc/FileTransmission.java +++ b/src/main/java/org/caosdb/server/grpc/FileTransmission.java @@ -55,4 +55,8 @@ public abstract class FileTransmission { public abstract FileProperties getFile(final String fileId); public abstract FileTransmissionSettings getTransmissionSettings(); + + public boolean isExpired() { + return System.currentTimeMillis() - touchedTimestamp > 10 * 60 * 1000; // older than 10 min + } } diff --git a/src/main/java/org/caosdb/server/grpc/FileTransmissionServiceImpl.java b/src/main/java/org/caosdb/server/grpc/FileTransmissionServiceImpl.java index 58b317b5e1b039ceb5ebd39ecd15ff560409493a..8007dcb1658d7dc6c76a0bf63efca4854045203c 100644 --- a/src/main/java/org/caosdb/server/grpc/FileTransmissionServiceImpl.java +++ b/src/main/java/org/caosdb/server/grpc/FileTransmissionServiceImpl.java @@ -15,7 +15,9 @@ import org.caosdb.api.entity.v1alpha1.RegisterFileUploadResponse; import org.caosdb.api.entity.v1alpha1.RegisterFileUploadResponse.Builder; import org.caosdb.api.entity.v1alpha1.RegistrationStatus; import org.caosdb.api.entity.v1alpha1.TransmissionStatus; +import org.caosdb.server.CaosDBServer; import org.caosdb.server.entity.FileProperties; +import org.caosdb.server.utils.CronJob; import org.caosdb.server.utils.Utils; public class FileTransmissionServiceImpl extends FileTransmissionServiceImplBase { @@ -64,8 +66,25 @@ public class FileTransmissionServiceImpl extends FileTransmissionServiceImplBase } } + public FileTransmissionServiceImpl() { + CaosDBServer.addPreShutdownHook( + () -> { + fileDownloadRegistration.cleanUp(true); + fileUploadRegistration.cleanUp(true); + }); + } + FileUploadRegistration fileUploadRegistration = new FileUploadRegistration(); FileDownloadRegistration fileDownloadRegistration = new FileDownloadRegistration(); + CronJob cleanUp = + new CronJob( + "FileTransmissionCleanUp", + () -> { + fileUploadRegistration.cleanUp(); + fileDownloadRegistration.cleanUp(); + }, + 60, + false); FileTransmissionId registerFileDownload(final String registration_id, final FileProperties fp) throws Exception { diff --git a/src/main/java/org/caosdb/server/grpc/FileUpload.java b/src/main/java/org/caosdb/server/grpc/FileUpload.java index eedca00d6a266693f6a70a3986692c7325a9a9c8..c417fd7d2b23841c916fad97a6a6678bd1cd8f21 100644 --- a/src/main/java/org/caosdb/server/grpc/FileUpload.java +++ b/src/main/java/org/caosdb/server/grpc/FileUpload.java @@ -40,11 +40,13 @@ public class FileUpload extends FileTransmission { public TransmissionStatus upload(final String fileId, final ByteString data) throws IOException { synchronized (lock) { + touch(); return getFileBuffer(fileId).write(data); } } protected UploadBuffer getFileBuffer(final String fileId) { + touch(); if (!buffers.containsKey(fileId)) { buffers.put(fileId, new UploadBuffer(getTmpDir().toPath().resolve(fileId).toFile())); } @@ -54,6 +56,7 @@ public class FileUpload extends FileTransmission { @Override public FileProperties getFile(final String fileId) { synchronized (lock) { + touch(); if (buffers.containsKey(fileId)) { return buffers.get(fileId).toFileProperties(fileId); } diff --git a/src/main/java/org/caosdb/server/grpc/FileUploadRegistration.java b/src/main/java/org/caosdb/server/grpc/FileUploadRegistration.java index 5612d08f300d3fcb97f9aa1831fdd9ad1806f12f..7e631fadc923f18d5302138c6fa6d494fff6bf13 100644 --- a/src/main/java/org/caosdb/server/grpc/FileUploadRegistration.java +++ b/src/main/java/org/caosdb/server/grpc/FileUploadRegistration.java @@ -1,7 +1,10 @@ package org.caosdb.server.grpc; import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; import java.util.Map; +import java.util.Map.Entry; import org.caosdb.api.entity.v1alpha1.FileTransmissionId; import org.caosdb.server.entity.FileProperties; import org.caosdb.server.utils.Utils; @@ -17,18 +20,45 @@ public class FileUploadRegistration { } private void register(final FileUpload fileTransmission) { - registeredUploads.put(fileTransmission.getId(), fileTransmission); + + synchronized (registeredUploads) { + registeredUploads.put(fileTransmission.getId(), fileTransmission); + } } public FileProperties getUploadFile(final FileTransmissionId uploadId) { final String fileId = uploadId.getFileId(); final String registrationId = uploadId.getRegistrationId(); - final FileTransmission fileTransmission = registeredUploads.get(registrationId); + final FileTransmission fileTransmission; + synchronized (registeredUploads) { + fileTransmission = registeredUploads.get(registrationId); + } return fileTransmission.getFile(fileId); } public FileUpload getFileUpload(final String registrationId) { - return registeredUploads.get(registrationId); + synchronized (registeredUploads) { + return registeredUploads.get(registrationId); + } + } + + public void cleanUp(final boolean all) { + synchronized (registeredUploads) { + final List<String> cleanUp = new LinkedList<>(); + for (final Entry<String, FileUpload> entry : registeredUploads.entrySet()) { + if (all || entry.getValue().isExpired()) { + cleanUp.add(entry.getKey()); + } + } + for (final String key : cleanUp) { + registeredUploads.get(key).cleanUp(); + registeredUploads.remove(key); + } + } + } + + public void cleanUp() { + cleanUp(false); } }