Skip to content
Snippets Groups Projects
Verified Commit d4bc97b6 authored by Timm Fitschen's avatar Timm Fitschen
Browse files

fix deadlock

parent a2f0f4e3
No related branches found
No related tags found
1 merge request!4f-fix-deadlock -> dev
Pipeline #5338 failed
...@@ -26,6 +26,7 @@ ...@@ -26,6 +26,7 @@
package org.caosdb.server.database; package org.caosdb.server.database;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import org.caosdb.server.database.access.Access; import org.caosdb.server.database.access.Access;
import org.caosdb.server.database.access.AccessControlAccess; import org.caosdb.server.database.access.AccessControlAccess;
...@@ -56,7 +57,7 @@ import org.caosdb.server.utils.Releasable; ...@@ -56,7 +57,7 @@ import org.caosdb.server.utils.Releasable;
class ReadAccessSemaphore extends Semaphore implements Releasable { class ReadAccessSemaphore extends Semaphore implements Releasable {
private static final long serialVersionUID = 4384921156838881337L; private static final long serialVersionUID = 4384921156838881337L;
private int acquired = 0; // how many threads have read access private AtomicInteger acquired = new AtomicInteger(0); // how many threads have read access
Semaphore writersBlock = Semaphore writersBlock =
new Semaphore(1, true); // This semaphore is blocked as long as there are any new Semaphore(1, true); // This semaphore is blocked as long as there are any
// unreleased read permits. // unreleased read permits.
...@@ -73,10 +74,9 @@ class ReadAccessSemaphore extends Semaphore implements Releasable { ...@@ -73,10 +74,9 @@ class ReadAccessSemaphore extends Semaphore implements Releasable {
@Override @Override
public void acquire() throws InterruptedException { public void acquire() throws InterruptedException {
super.acquire(); // Protect the next few lines super.acquire(); // Protect the next few lines
if (this.acquired == 0) { if (this.acquired.getAndIncrement() == 0) {
this.writersBlock.acquire(); this.writersBlock.acquire();
} }
this.acquired++;
super.release(); super.release();
} }
...@@ -87,9 +87,7 @@ class ReadAccessSemaphore extends Semaphore implements Releasable { ...@@ -87,9 +87,7 @@ class ReadAccessSemaphore extends Semaphore implements Releasable {
*/ */
@Override @Override
public void release() { public void release() {
this.acquired--; if (this.acquired.decrementAndGet() == 0) { // Last permit: release
if (this.acquired <= 0) { // Last permit: release
this.acquired = 0;
if (this.writersBlock.availablePermits() <= 0) { if (this.writersBlock.availablePermits() <= 0) {
this.writersBlock.release(); this.writersBlock.release();
} }
......
...@@ -22,6 +22,10 @@ ...@@ -22,6 +22,10 @@
*/ */
package org.caosdb.server.database; package org.caosdb.server.database;
import static org.junit.Assert.assertFalse;
import java.util.LinkedList;
import java.util.List;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
...@@ -29,6 +33,85 @@ public class DatabaseAccessManagerTest { ...@@ -29,6 +33,85 @@ public class DatabaseAccessManagerTest {
public static final ReadAccessSemaphore readAccess = new ReadAccessSemaphore(); public static final ReadAccessSemaphore readAccess = new ReadAccessSemaphore();
public static final WriteAccessLock writeAccess = new WriteAccessLock(readAccess); public static final WriteAccessLock writeAccess = new WriteAccessLock(readAccess);
Thread createReadThread(long wait, String name) {
return new Thread(
new Runnable() {
@Override
public void run() {
try {
readAccess.acquire();
Thread.sleep(wait);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
readAccess.release();
}
System.out.println(Thread.currentThread().getName() + " terminated");
}
},
name);
}
Thread createWriteThread(long wait, String name) {
return new Thread(
new Runnable() {
@Override
public void run() {
try {
writeAccess.reserve();
Thread.sleep(wait);
writeAccess.lockInterruptibly();
Thread.sleep(wait);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
writeAccess.release();
}
System.out.println(Thread.currentThread().getName() + " terminated");
}
},
name);
}
@Test
public void testDeadLock() throws InterruptedException {
List<Thread> ts = new LinkedList<>();
for (int i = 0; i < 1000; i++) {
Thread t1 = createReadThread(1, "Ra" + i);
Thread t2 = createReadThread(2, "Rb" + i);
Thread t3 = createReadThread(3, "Rc" + i);
Thread t5 = createReadThread(5, "Rd" + i);
Thread t7 = createReadThread(7, "Re" + i);
Thread t11 = createReadThread(11, "Rf" + i);
Thread w5 = createWriteThread(2, "W" + i);
t1.start();
t2.start();
w5.start();
t3.start();
t5.start();
t7.start();
t11.start();
ts.add(t1);
ts.add(t2);
ts.add(t3);
ts.add(t5);
ts.add(t7);
ts.add(t11);
ts.add(w5);
}
for (Thread t : ts) {
t.join();
}
for (Thread t : ts) {
assertFalse(t.isAlive());
}
}
/** /**
* Two read-, two write-threads. The read-threads request read access, the write-threads request * Two read-, two write-threads. The read-threads request read access, the write-threads request
* write access.<br> * write access.<br>
......
...@@ -70,8 +70,8 @@ public class QueryTest { ...@@ -70,8 +70,8 @@ public class QueryTest {
/** /**
* Assure that {@link WriteTransaction#commit()} calls {@link Query#clearCache()}. * Assure that {@link WriteTransaction#commit()} calls {@link Query#clearCache()}.
* *
* Since currently the cache shall be cleared whenever there is a commit. * <p>Since currently the cache shall be cleared whenever there is a commit.
* */ */
@Test @Test
public void testEtagChangesAfterWrite() { public void testEtagChangesAfterWrite() {
String old = Query.getETag(); String old = Query.getETag();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment