diff --git a/pom.xml b/pom.xml
index 4160cb3..3178346 100644
--- a/pom.xml
+++ b/pom.xml
@@ -35,7 +35,7 @@
de.infinimotion
model-persistence
- 0.0.44
+ 0.0.54
@@ -55,6 +55,11 @@
io.quarkus
quarkus-jdbc-postgresql
+
+ org.projectlombok
+ lombok
+ 1.18.32
+
diff --git a/src/main/java/de/infinimotion/persistence/CommandObserver.java b/src/main/java/de/infinimotion/persistence/CommandObserver.java
index e7b6b61..3348647 100644
--- a/src/main/java/de/infinimotion/persistence/CommandObserver.java
+++ b/src/main/java/de/infinimotion/persistence/CommandObserver.java
@@ -1,28 +1,44 @@
package de.infinimotion.persistence;
import de.infinimotion.model.persistence.*;
+import de.infinimotion.persistence.transaction.InitiateRollbackException;
+import de.infinimotion.persistence.transaction.Transaction;
import jakarta.enterprise.context.ApplicationScoped;
+import org.eclipse.microprofile.reactive.messaging.Channel;
+import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.eclipse.microprofile.reactive.messaging.Incoming;
-import org.eclipse.microprofile.reactive.messaging.Outgoing;
-
-import java.io.IOException;
@ApplicationScoped
public class CommandObserver {
+ @Channel("command-replies")
+ Emitter replyEmitter;
+
@Incoming("command")
- @Outgoing("command-replies")
- public CommandWrapper process(CommandWrapper request) throws IOException {
+ public void process(CommandWrapper request) {
System.out.println(request);
- try {
- return request.process().serialize().copyIds(request);
- } catch (Throwable t) {
- t.printStackTrace();
- CommandException e = new CommandException();
- e.setException(t.getMessage());
- return e.serialize().copyIds(request);
- }
+ Transaction tx = Transaction.get(request.getTransaction());
+ tx.queue(() -> {
+ try {
+ CommandWrapper response = request.process().serialize().copyIds(request);
+ if (request.isCommit()) {
+ tx.commit(replyEmitter, response);
+ } else {
+ replyEmitter.send(response);
+ }
+ } catch (InitiateRollbackException e) {
+ tx.rollback();
+ replyEmitter.send(new CommandRollbackResponse().serialize().copyIds(request));
+ } catch (Throwable t) {
+ tx.rollback();
+ t.printStackTrace();
+ CommandException e = new CommandException();
+ e.setException(t.getMessage());
+ replyEmitter.send(e.serialize().copyIds(request));
+ }
+ });
}
+
}
diff --git a/src/main/java/de/infinimotion/persistence/processor/KinosaalProcessor.java b/src/main/java/de/infinimotion/persistence/processor/KinosaalProcessor.java
index f643db0..828dd15 100644
--- a/src/main/java/de/infinimotion/persistence/processor/KinosaalProcessor.java
+++ b/src/main/java/de/infinimotion/persistence/processor/KinosaalProcessor.java
@@ -5,7 +5,6 @@ import io.quarkus.arc.Unremovable;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.persistence.EntityManager;
-import jakarta.transaction.Transactional;
import java.util.List;
@@ -16,7 +15,6 @@ public class KinosaalProcessor implements CommandListKinosaalProcessor, CommandC
@Inject
EntityManager em;
- @Transactional
public Command processCommandListKinosaal(CommandListKinosaal request) {
List results = em.createQuery("SELECT k FROM Kinosaal k", Kinosaal.class).getResultList();
CommandListKinosaalResponse response = new CommandListKinosaalResponse();
@@ -24,7 +22,6 @@ public class KinosaalProcessor implements CommandListKinosaalProcessor, CommandC
return response;
}
- @Transactional
public Command processCommandCreateKinosaal(CommandCreateKinosaal request) {
Kinosaal hall = new Kinosaal();
hall.setName(request.getName());
diff --git a/src/main/java/de/infinimotion/persistence/transaction/InitiateRollbackException.java b/src/main/java/de/infinimotion/persistence/transaction/InitiateRollbackException.java
new file mode 100644
index 0000000..2051fec
--- /dev/null
+++ b/src/main/java/de/infinimotion/persistence/transaction/InitiateRollbackException.java
@@ -0,0 +1,4 @@
+package de.infinimotion.persistence.transaction;
+
+public class InitiateRollbackException extends Exception {
+}
diff --git a/src/main/java/de/infinimotion/persistence/transaction/RollbackProcessor.java b/src/main/java/de/infinimotion/persistence/transaction/RollbackProcessor.java
new file mode 100644
index 0000000..e125a0f
--- /dev/null
+++ b/src/main/java/de/infinimotion/persistence/transaction/RollbackProcessor.java
@@ -0,0 +1,16 @@
+package de.infinimotion.persistence.transaction;
+
+import de.infinimotion.model.persistence.*;
+import io.quarkus.arc.Unremovable;
+import jakarta.enterprise.context.ApplicationScoped;
+
+@Unremovable
+@ApplicationScoped
+public class RollbackProcessor implements CommandRollbackProcessor {
+
+ @Override
+ public Command processCommandRollback(CommandRollback rollback) throws InitiateRollbackException {
+ throw new InitiateRollbackException();
+ }
+
+}
diff --git a/src/main/java/de/infinimotion/persistence/transaction/ThrowingRunnable.java b/src/main/java/de/infinimotion/persistence/transaction/ThrowingRunnable.java
new file mode 100644
index 0000000..ca3483e
--- /dev/null
+++ b/src/main/java/de/infinimotion/persistence/transaction/ThrowingRunnable.java
@@ -0,0 +1,6 @@
+package de.infinimotion.persistence.transaction;
+
+@FunctionalInterface
+public interface ThrowingRunnable {
+ void run() throws Exception;
+}
diff --git a/src/main/java/de/infinimotion/persistence/transaction/Transaction.java b/src/main/java/de/infinimotion/persistence/transaction/Transaction.java
new file mode 100644
index 0000000..038590c
--- /dev/null
+++ b/src/main/java/de/infinimotion/persistence/transaction/Transaction.java
@@ -0,0 +1,80 @@
+package de.infinimotion.persistence.transaction;
+
+import de.infinimotion.model.persistence.CommandWrapper;
+import io.quarkus.narayana.jta.QuarkusTransaction;
+import io.quarkus.narayana.jta.QuarkusTransactionException;
+import lombok.Setter;
+import lombok.SneakyThrows;
+import org.eclipse.microprofile.reactive.messaging.Emitter;
+
+import java.util.Map;
+import java.util.concurrent.*;
+
+public class Transaction extends Thread {
+
+ private static final Map TRANSACTIONS = new ConcurrentHashMap<>();
+
+ public static Transaction get(String id) {
+ Transaction tx = TRANSACTIONS.computeIfAbsent(id, Transaction::new);
+ if (!tx.isAlive()) {
+ tx.start();
+ }
+ return tx;
+ }
+
+ private final String id;
+ private final BlockingQueue tasks = new LinkedBlockingQueue<>();
+
+ @Setter
+ private boolean rollback = false;
+ private Emitter replyEmitter;
+ private CommandWrapper response;
+
+ private Transaction(String id) {
+ this.id = id;
+ }
+
+ @SneakyThrows
+ @Override
+ public void run() {
+ try {
+ QuarkusTransaction.disallowingExisting().call(() -> {
+ while (TRANSACTIONS.containsKey(id)) {
+ tasks.take().run();
+ }
+ if (rollback) {
+ throw new InitiateRollbackException();
+ }
+ return null;
+ });
+ } catch (QuarkusTransactionException e) {
+ if (!(e.getCause() instanceof InitiateRollbackException)) {
+ throw e;
+ }
+ } finally {
+ TRANSACTIONS.remove(id);
+ }
+
+ if (!rollback) {
+ replyEmitter.send(response);
+ }
+ }
+
+ public void queue(ThrowingRunnable task) {
+ tasks.add(task);
+ }
+
+ @SneakyThrows
+ public void commit(Emitter replyEmitter, CommandWrapper response) {
+ TRANSACTIONS.remove(id);
+ this.replyEmitter = replyEmitter;
+ this.response = response;
+ }
+
+ @SneakyThrows
+ public void rollback() {
+ rollback = true;
+ TRANSACTIONS.remove(id);
+ }
+
+}