From 5706135e8dcf027e17c0b981200315b7c23e0878 Mon Sep 17 00:00:00 2001 From: Timm Fitschen <t.fitschen@indiscale.com> Date: Mon, 16 Aug 2021 02:27:12 +0200 Subject: [PATCH] WIP: files --- .../org/caosdb/server/grpc/FileDownload.java | 19 ++++++-- .../server/grpc/FileDownloadRegistration.java | 46 +++++++++++++++---- .../caosdb/server/grpc/FileTransmission.java | 4 ++ .../grpc/FileTransmissionServiceImpl.java | 19 ++++++++ .../org/caosdb/server/grpc/FileUpload.java | 3 ++ .../server/grpc/FileUploadRegistration.java | 36 +++++++++++++-- 6 files changed, 110 insertions(+), 17 deletions(-) diff --git a/src/main/java/org/caosdb/server/grpc/FileDownload.java b/src/main/java/org/caosdb/server/grpc/FileDownload.java index d17f05c1..d4fdda38 100644 --- a/src/main/java/org/caosdb/server/grpc/FileDownload.java +++ b/src/main/java/org/caosdb/server/grpc/FileDownload.java @@ -29,7 +29,10 @@ public class FileDownload extends FileTransmission { @Override public FileProperties getFile(final String fileId) { - return buffers.get(fileId).getFileProperties(); + synchronized (lock) { + touch(); + return buffers.get(fileId).getFileProperties(); + } } @Override @@ -38,13 +41,19 @@ public class FileDownload extends FileTransmission { } public String append(final FileProperties fp) { - final String id = Utils.getUID(); - buffers.put(id, new DownloadBuffer(fp)); - return id; + synchronized (lock) { + touch(); + final String id = Utils.getUID(); + buffers.put(id, new DownloadBuffer(fp)); + return id; + } } public FileDownloadResponse getNextChunk(final String fileId) throws FileNotFoundException, IOException { - return buffers.get(fileId).getNextChunk(); + synchronized (lock) { + touch(); + return buffers.get(fileId).getNextChunk(); + } } } diff --git a/src/main/java/org/caosdb/server/grpc/FileDownloadRegistration.java b/src/main/java/org/caosdb/server/grpc/FileDownloadRegistration.java index b5c1e3ef..579495a1 100644 --- a/src/main/java/org/caosdb/server/grpc/FileDownloadRegistration.java +++ b/src/main/java/org/caosdb/server/grpc/FileDownloadRegistration.java @@ -3,7 +3,10 @@ package org.caosdb.server.grpc; import java.io.FileNotFoundException; import java.io.IOException; import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; import java.util.Map; +import java.util.Map.Entry; import org.caosdb.api.entity.v1alpha1.FileDownloadResponse; import org.caosdb.api.entity.v1alpha1.FileTransmissionId; import org.caosdb.api.entity.v1alpha1.FileTransmissionSettings; @@ -16,11 +19,13 @@ public class FileDownloadRegistration { public FileTransmissionId registerFileDownload( final String registration_id, final FileProperties fp) { - final String file_id = registeredDownloads.get(registration_id).append(fp); - return FileTransmissionId.newBuilder() - .setRegistrationId(registration_id) - .setFileId(file_id) - .build(); + synchronized (registeredDownloads) { + final String file_id = registeredDownloads.get(registration_id).append(fp); + return FileTransmissionId.newBuilder() + .setRegistrationId(registration_id) + .setFileId(file_id) + .build(); + } } public FileDownload registerFileDownload(final FileTransmissionSettings settings) @@ -31,13 +36,36 @@ public class FileDownloadRegistration { } private void register(final FileDownload fileTransmission) { - registeredDownloads.put(fileTransmission.getId(), fileTransmission); + synchronized (registeredDownloads) { + registeredDownloads.put(fileTransmission.getId(), fileTransmission); + } } public FileDownloadResponse downloadNextChunk(final FileTransmissionId fileTransmissionId) throws FileNotFoundException, IOException { - return registeredDownloads - .get(fileTransmissionId.getRegistrationId()) - .getNextChunk(fileTransmissionId.getFileId()); + FileDownload next; + synchronized (registeredDownloads) { + next = registeredDownloads.get(fileTransmissionId.getRegistrationId()); + } + return next.getNextChunk(fileTransmissionId.getFileId()); + } + + public void cleanUp(final boolean all) { + synchronized (registeredDownloads) { + final List<String> cleanUp = new LinkedList<>(); + for (final Entry<String, FileDownload> entry : registeredDownloads.entrySet()) { + if (all || entry.getValue().isExpired()) { + cleanUp.add(entry.getKey()); + } + } + for (final String key : cleanUp) { + registeredDownloads.get(key).cleanUp(); + registeredDownloads.remove(key); + } + } + } + + public void cleanUp() { + cleanUp(false); } } diff --git a/src/main/java/org/caosdb/server/grpc/FileTransmission.java b/src/main/java/org/caosdb/server/grpc/FileTransmission.java index 09631b4e..5440b15c 100644 --- a/src/main/java/org/caosdb/server/grpc/FileTransmission.java +++ b/src/main/java/org/caosdb/server/grpc/FileTransmission.java @@ -55,4 +55,8 @@ public abstract class FileTransmission { public abstract FileProperties getFile(final String fileId); public abstract FileTransmissionSettings getTransmissionSettings(); + + public boolean isExpired() { + return System.currentTimeMillis() - touchedTimestamp > 10 * 60 * 1000; // older than 10 min + } } diff --git a/src/main/java/org/caosdb/server/grpc/FileTransmissionServiceImpl.java b/src/main/java/org/caosdb/server/grpc/FileTransmissionServiceImpl.java index 58b317b5..8007dcb1 100644 --- a/src/main/java/org/caosdb/server/grpc/FileTransmissionServiceImpl.java +++ b/src/main/java/org/caosdb/server/grpc/FileTransmissionServiceImpl.java @@ -15,7 +15,9 @@ import org.caosdb.api.entity.v1alpha1.RegisterFileUploadResponse; import org.caosdb.api.entity.v1alpha1.RegisterFileUploadResponse.Builder; import org.caosdb.api.entity.v1alpha1.RegistrationStatus; import org.caosdb.api.entity.v1alpha1.TransmissionStatus; +import org.caosdb.server.CaosDBServer; import org.caosdb.server.entity.FileProperties; +import org.caosdb.server.utils.CronJob; import org.caosdb.server.utils.Utils; public class FileTransmissionServiceImpl extends FileTransmissionServiceImplBase { @@ -64,8 +66,25 @@ public class FileTransmissionServiceImpl extends FileTransmissionServiceImplBase } } + public FileTransmissionServiceImpl() { + CaosDBServer.addPreShutdownHook( + () -> { + fileDownloadRegistration.cleanUp(true); + fileUploadRegistration.cleanUp(true); + }); + } + FileUploadRegistration fileUploadRegistration = new FileUploadRegistration(); FileDownloadRegistration fileDownloadRegistration = new FileDownloadRegistration(); + CronJob cleanUp = + new CronJob( + "FileTransmissionCleanUp", + () -> { + fileUploadRegistration.cleanUp(); + fileDownloadRegistration.cleanUp(); + }, + 60, + false); FileTransmissionId registerFileDownload(final String registration_id, final FileProperties fp) throws Exception { diff --git a/src/main/java/org/caosdb/server/grpc/FileUpload.java b/src/main/java/org/caosdb/server/grpc/FileUpload.java index eedca00d..c417fd7d 100644 --- a/src/main/java/org/caosdb/server/grpc/FileUpload.java +++ b/src/main/java/org/caosdb/server/grpc/FileUpload.java @@ -40,11 +40,13 @@ public class FileUpload extends FileTransmission { public TransmissionStatus upload(final String fileId, final ByteString data) throws IOException { synchronized (lock) { + touch(); return getFileBuffer(fileId).write(data); } } protected UploadBuffer getFileBuffer(final String fileId) { + touch(); if (!buffers.containsKey(fileId)) { buffers.put(fileId, new UploadBuffer(getTmpDir().toPath().resolve(fileId).toFile())); } @@ -54,6 +56,7 @@ public class FileUpload extends FileTransmission { @Override public FileProperties getFile(final String fileId) { synchronized (lock) { + touch(); if (buffers.containsKey(fileId)) { return buffers.get(fileId).toFileProperties(fileId); } diff --git a/src/main/java/org/caosdb/server/grpc/FileUploadRegistration.java b/src/main/java/org/caosdb/server/grpc/FileUploadRegistration.java index 5612d08f..7e631fad 100644 --- a/src/main/java/org/caosdb/server/grpc/FileUploadRegistration.java +++ b/src/main/java/org/caosdb/server/grpc/FileUploadRegistration.java @@ -1,7 +1,10 @@ package org.caosdb.server.grpc; import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; import java.util.Map; +import java.util.Map.Entry; import org.caosdb.api.entity.v1alpha1.FileTransmissionId; import org.caosdb.server.entity.FileProperties; import org.caosdb.server.utils.Utils; @@ -17,18 +20,45 @@ public class FileUploadRegistration { } private void register(final FileUpload fileTransmission) { - registeredUploads.put(fileTransmission.getId(), fileTransmission); + + synchronized (registeredUploads) { + registeredUploads.put(fileTransmission.getId(), fileTransmission); + } } public FileProperties getUploadFile(final FileTransmissionId uploadId) { final String fileId = uploadId.getFileId(); final String registrationId = uploadId.getRegistrationId(); - final FileTransmission fileTransmission = registeredUploads.get(registrationId); + final FileTransmission fileTransmission; + synchronized (registeredUploads) { + fileTransmission = registeredUploads.get(registrationId); + } return fileTransmission.getFile(fileId); } public FileUpload getFileUpload(final String registrationId) { - return registeredUploads.get(registrationId); + synchronized (registeredUploads) { + return registeredUploads.get(registrationId); + } + } + + public void cleanUp(final boolean all) { + synchronized (registeredUploads) { + final List<String> cleanUp = new LinkedList<>(); + for (final Entry<String, FileUpload> entry : registeredUploads.entrySet()) { + if (all || entry.getValue().isExpired()) { + cleanUp.add(entry.getKey()); + } + } + for (final String key : cleanUp) { + registeredUploads.get(key).cleanUp(); + registeredUploads.remove(key); + } + } + } + + public void cleanUp() { + cleanUp(false); } } -- GitLab