Skip to content
Snippets Groups Projects
Verified Commit 5706135e authored by Timm Fitschen's avatar Timm Fitschen
Browse files

WIP: files

parent 430f51e3
No related branches found
No related tags found
3 merge requests!44Release 0.6,!43Merge f-GRPC-main to dev,!26F grpc f files
Pipeline #12172 passed
......@@ -29,8 +29,11 @@ public class FileDownload extends FileTransmission {
@Override
public FileProperties getFile(final String fileId) {
synchronized (lock) {
touch();
return buffers.get(fileId).getFileProperties();
}
}
@Override
public FileTransmissionSettings getTransmissionSettings() {
......@@ -38,13 +41,19 @@ public class FileDownload extends FileTransmission {
}
public String append(final FileProperties fp) {
synchronized (lock) {
touch();
final String id = Utils.getUID();
buffers.put(id, new DownloadBuffer(fp));
return id;
}
}
public FileDownloadResponse getNextChunk(final String fileId)
throws FileNotFoundException, IOException {
synchronized (lock) {
touch();
return buffers.get(fileId).getNextChunk();
}
}
}
......@@ -3,7 +3,10 @@ package org.caosdb.server.grpc;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.caosdb.api.entity.v1alpha1.FileDownloadResponse;
import org.caosdb.api.entity.v1alpha1.FileTransmissionId;
import org.caosdb.api.entity.v1alpha1.FileTransmissionSettings;
......@@ -16,12 +19,14 @@ public class FileDownloadRegistration {
public FileTransmissionId registerFileDownload(
final String registration_id, final FileProperties fp) {
synchronized (registeredDownloads) {
final String file_id = registeredDownloads.get(registration_id).append(fp);
return FileTransmissionId.newBuilder()
.setRegistrationId(registration_id)
.setFileId(file_id)
.build();
}
}
public FileDownload registerFileDownload(final FileTransmissionSettings settings)
throws Exception {
......@@ -31,13 +36,36 @@ public class FileDownloadRegistration {
}
private void register(final FileDownload fileTransmission) {
synchronized (registeredDownloads) {
registeredDownloads.put(fileTransmission.getId(), fileTransmission);
}
}
public FileDownloadResponse downloadNextChunk(final FileTransmissionId fileTransmissionId)
throws FileNotFoundException, IOException {
return registeredDownloads
.get(fileTransmissionId.getRegistrationId())
.getNextChunk(fileTransmissionId.getFileId());
FileDownload next;
synchronized (registeredDownloads) {
next = registeredDownloads.get(fileTransmissionId.getRegistrationId());
}
return next.getNextChunk(fileTransmissionId.getFileId());
}
public void cleanUp(final boolean all) {
synchronized (registeredDownloads) {
final List<String> cleanUp = new LinkedList<>();
for (final Entry<String, FileDownload> entry : registeredDownloads.entrySet()) {
if (all || entry.getValue().isExpired()) {
cleanUp.add(entry.getKey());
}
}
for (final String key : cleanUp) {
registeredDownloads.get(key).cleanUp();
registeredDownloads.remove(key);
}
}
}
public void cleanUp() {
cleanUp(false);
}
}
......@@ -55,4 +55,8 @@ public abstract class FileTransmission {
public abstract FileProperties getFile(final String fileId);
public abstract FileTransmissionSettings getTransmissionSettings();
public boolean isExpired() {
return System.currentTimeMillis() - touchedTimestamp > 10 * 60 * 1000; // older than 10 min
}
}
......@@ -15,7 +15,9 @@ import org.caosdb.api.entity.v1alpha1.RegisterFileUploadResponse;
import org.caosdb.api.entity.v1alpha1.RegisterFileUploadResponse.Builder;
import org.caosdb.api.entity.v1alpha1.RegistrationStatus;
import org.caosdb.api.entity.v1alpha1.TransmissionStatus;
import org.caosdb.server.CaosDBServer;
import org.caosdb.server.entity.FileProperties;
import org.caosdb.server.utils.CronJob;
import org.caosdb.server.utils.Utils;
public class FileTransmissionServiceImpl extends FileTransmissionServiceImplBase {
......@@ -64,8 +66,25 @@ public class FileTransmissionServiceImpl extends FileTransmissionServiceImplBase
}
}
public FileTransmissionServiceImpl() {
CaosDBServer.addPreShutdownHook(
() -> {
fileDownloadRegistration.cleanUp(true);
fileUploadRegistration.cleanUp(true);
});
}
FileUploadRegistration fileUploadRegistration = new FileUploadRegistration();
FileDownloadRegistration fileDownloadRegistration = new FileDownloadRegistration();
CronJob cleanUp =
new CronJob(
"FileTransmissionCleanUp",
() -> {
fileUploadRegistration.cleanUp();
fileDownloadRegistration.cleanUp();
},
60,
false);
FileTransmissionId registerFileDownload(final String registration_id, final FileProperties fp)
throws Exception {
......
......@@ -40,11 +40,13 @@ public class FileUpload extends FileTransmission {
public TransmissionStatus upload(final String fileId, final ByteString data) throws IOException {
synchronized (lock) {
touch();
return getFileBuffer(fileId).write(data);
}
}
protected UploadBuffer getFileBuffer(final String fileId) {
touch();
if (!buffers.containsKey(fileId)) {
buffers.put(fileId, new UploadBuffer(getTmpDir().toPath().resolve(fileId).toFile()));
}
......@@ -54,6 +56,7 @@ public class FileUpload extends FileTransmission {
@Override
public FileProperties getFile(final String fileId) {
synchronized (lock) {
touch();
if (buffers.containsKey(fileId)) {
return buffers.get(fileId).toFileProperties(fileId);
}
......
package org.caosdb.server.grpc;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.caosdb.api.entity.v1alpha1.FileTransmissionId;
import org.caosdb.server.entity.FileProperties;
import org.caosdb.server.utils.Utils;
......@@ -17,18 +20,45 @@ public class FileUploadRegistration {
}
private void register(final FileUpload fileTransmission) {
synchronized (registeredUploads) {
registeredUploads.put(fileTransmission.getId(), fileTransmission);
}
}
public FileProperties getUploadFile(final FileTransmissionId uploadId) {
final String fileId = uploadId.getFileId();
final String registrationId = uploadId.getRegistrationId();
final FileTransmission fileTransmission = registeredUploads.get(registrationId);
final FileTransmission fileTransmission;
synchronized (registeredUploads) {
fileTransmission = registeredUploads.get(registrationId);
}
return fileTransmission.getFile(fileId);
}
public FileUpload getFileUpload(final String registrationId) {
synchronized (registeredUploads) {
return registeredUploads.get(registrationId);
}
}
public void cleanUp(final boolean all) {
synchronized (registeredUploads) {
final List<String> cleanUp = new LinkedList<>();
for (final Entry<String, FileUpload> entry : registeredUploads.entrySet()) {
if (all || entry.getValue().isExpired()) {
cleanUp.add(entry.getKey());
}
}
for (final String key : cleanUp) {
registeredUploads.get(key).cleanUp();
registeredUploads.remove(key);
}
}
}
public void cleanUp() {
cleanUp(false);
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment