From 68f76aa244ddd45f683ea72ee5772249a199ebee Mon Sep 17 00:00:00 2001 From: Lennart Heinrich Date: Fri, 24 Oct 2025 00:48:18 +0200 Subject: [PATCH] transactions --- pom.xml | 7 +- .../persistence/CommandObserver.java | 42 +++++++--- .../processor/KinosaalProcessor.java | 3 - .../InitiateRollbackException.java | 4 + .../transaction/RollbackProcessor.java | 16 ++++ .../transaction/ThrowingRunnable.java | 6 ++ .../persistence/transaction/Transaction.java | 80 +++++++++++++++++++ 7 files changed, 141 insertions(+), 17 deletions(-) create mode 100644 src/main/java/de/infinimotion/persistence/transaction/InitiateRollbackException.java create mode 100644 src/main/java/de/infinimotion/persistence/transaction/RollbackProcessor.java create mode 100644 src/main/java/de/infinimotion/persistence/transaction/ThrowingRunnable.java create mode 100644 src/main/java/de/infinimotion/persistence/transaction/Transaction.java diff --git a/pom.xml b/pom.xml index 4160cb3..3178346 100644 --- a/pom.xml +++ b/pom.xml @@ -35,7 +35,7 @@ de.infinimotion model-persistence - 0.0.44 + 0.0.54 @@ -55,6 +55,11 @@ io.quarkus quarkus-jdbc-postgresql + + org.projectlombok + lombok + 1.18.32 + diff --git a/src/main/java/de/infinimotion/persistence/CommandObserver.java b/src/main/java/de/infinimotion/persistence/CommandObserver.java index e7b6b61..3348647 100644 --- a/src/main/java/de/infinimotion/persistence/CommandObserver.java +++ b/src/main/java/de/infinimotion/persistence/CommandObserver.java @@ -1,28 +1,44 @@ package de.infinimotion.persistence; import de.infinimotion.model.persistence.*; +import de.infinimotion.persistence.transaction.InitiateRollbackException; +import de.infinimotion.persistence.transaction.Transaction; import jakarta.enterprise.context.ApplicationScoped; +import org.eclipse.microprofile.reactive.messaging.Channel; +import org.eclipse.microprofile.reactive.messaging.Emitter; import org.eclipse.microprofile.reactive.messaging.Incoming; -import org.eclipse.microprofile.reactive.messaging.Outgoing; - -import java.io.IOException; @ApplicationScoped public class CommandObserver { + @Channel("command-replies") + Emitter replyEmitter; + @Incoming("command") - @Outgoing("command-replies") - public CommandWrapper process(CommandWrapper request) throws IOException { + public void process(CommandWrapper request) { System.out.println(request); - try { - return request.process().serialize().copyIds(request); - } catch (Throwable t) { - t.printStackTrace(); - CommandException e = new CommandException(); - e.setException(t.getMessage()); - return e.serialize().copyIds(request); - } + Transaction tx = Transaction.get(request.getTransaction()); + tx.queue(() -> { + try { + CommandWrapper response = request.process().serialize().copyIds(request); + if (request.isCommit()) { + tx.commit(replyEmitter, response); + } else { + replyEmitter.send(response); + } + } catch (InitiateRollbackException e) { + tx.rollback(); + replyEmitter.send(new CommandRollbackResponse().serialize().copyIds(request)); + } catch (Throwable t) { + tx.rollback(); + t.printStackTrace(); + CommandException e = new CommandException(); + e.setException(t.getMessage()); + replyEmitter.send(e.serialize().copyIds(request)); + } + }); } + } diff --git a/src/main/java/de/infinimotion/persistence/processor/KinosaalProcessor.java b/src/main/java/de/infinimotion/persistence/processor/KinosaalProcessor.java index f643db0..828dd15 100644 --- a/src/main/java/de/infinimotion/persistence/processor/KinosaalProcessor.java +++ b/src/main/java/de/infinimotion/persistence/processor/KinosaalProcessor.java @@ -5,7 +5,6 @@ import io.quarkus.arc.Unremovable; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; import jakarta.persistence.EntityManager; -import jakarta.transaction.Transactional; import java.util.List; @@ -16,7 +15,6 @@ public class KinosaalProcessor implements CommandListKinosaalProcessor, CommandC @Inject EntityManager em; - @Transactional public Command processCommandListKinosaal(CommandListKinosaal request) { List results = em.createQuery("SELECT k FROM Kinosaal k", Kinosaal.class).getResultList(); CommandListKinosaalResponse response = new CommandListKinosaalResponse(); @@ -24,7 +22,6 @@ public class KinosaalProcessor implements CommandListKinosaalProcessor, CommandC return response; } - @Transactional public Command processCommandCreateKinosaal(CommandCreateKinosaal request) { Kinosaal hall = new Kinosaal(); hall.setName(request.getName()); diff --git a/src/main/java/de/infinimotion/persistence/transaction/InitiateRollbackException.java b/src/main/java/de/infinimotion/persistence/transaction/InitiateRollbackException.java new file mode 100644 index 0000000..2051fec --- /dev/null +++ b/src/main/java/de/infinimotion/persistence/transaction/InitiateRollbackException.java @@ -0,0 +1,4 @@ +package de.infinimotion.persistence.transaction; + +public class InitiateRollbackException extends Exception { +} diff --git a/src/main/java/de/infinimotion/persistence/transaction/RollbackProcessor.java b/src/main/java/de/infinimotion/persistence/transaction/RollbackProcessor.java new file mode 100644 index 0000000..e125a0f --- /dev/null +++ b/src/main/java/de/infinimotion/persistence/transaction/RollbackProcessor.java @@ -0,0 +1,16 @@ +package de.infinimotion.persistence.transaction; + +import de.infinimotion.model.persistence.*; +import io.quarkus.arc.Unremovable; +import jakarta.enterprise.context.ApplicationScoped; + +@Unremovable +@ApplicationScoped +public class RollbackProcessor implements CommandRollbackProcessor { + + @Override + public Command processCommandRollback(CommandRollback rollback) throws InitiateRollbackException { + throw new InitiateRollbackException(); + } + +} diff --git a/src/main/java/de/infinimotion/persistence/transaction/ThrowingRunnable.java b/src/main/java/de/infinimotion/persistence/transaction/ThrowingRunnable.java new file mode 100644 index 0000000..ca3483e --- /dev/null +++ b/src/main/java/de/infinimotion/persistence/transaction/ThrowingRunnable.java @@ -0,0 +1,6 @@ +package de.infinimotion.persistence.transaction; + +@FunctionalInterface +public interface ThrowingRunnable { + void run() throws Exception; +} diff --git a/src/main/java/de/infinimotion/persistence/transaction/Transaction.java b/src/main/java/de/infinimotion/persistence/transaction/Transaction.java new file mode 100644 index 0000000..038590c --- /dev/null +++ b/src/main/java/de/infinimotion/persistence/transaction/Transaction.java @@ -0,0 +1,80 @@ +package de.infinimotion.persistence.transaction; + +import de.infinimotion.model.persistence.CommandWrapper; +import io.quarkus.narayana.jta.QuarkusTransaction; +import io.quarkus.narayana.jta.QuarkusTransactionException; +import lombok.Setter; +import lombok.SneakyThrows; +import org.eclipse.microprofile.reactive.messaging.Emitter; + +import java.util.Map; +import java.util.concurrent.*; + +public class Transaction extends Thread { + + private static final Map TRANSACTIONS = new ConcurrentHashMap<>(); + + public static Transaction get(String id) { + Transaction tx = TRANSACTIONS.computeIfAbsent(id, Transaction::new); + if (!tx.isAlive()) { + tx.start(); + } + return tx; + } + + private final String id; + private final BlockingQueue tasks = new LinkedBlockingQueue<>(); + + @Setter + private boolean rollback = false; + private Emitter replyEmitter; + private CommandWrapper response; + + private Transaction(String id) { + this.id = id; + } + + @SneakyThrows + @Override + public void run() { + try { + QuarkusTransaction.disallowingExisting().call(() -> { + while (TRANSACTIONS.containsKey(id)) { + tasks.take().run(); + } + if (rollback) { + throw new InitiateRollbackException(); + } + return null; + }); + } catch (QuarkusTransactionException e) { + if (!(e.getCause() instanceof InitiateRollbackException)) { + throw e; + } + } finally { + TRANSACTIONS.remove(id); + } + + if (!rollback) { + replyEmitter.send(response); + } + } + + public void queue(ThrowingRunnable task) { + tasks.add(task); + } + + @SneakyThrows + public void commit(Emitter replyEmitter, CommandWrapper response) { + TRANSACTIONS.remove(id); + this.replyEmitter = replyEmitter; + this.response = response; + } + + @SneakyThrows + public void rollback() { + rollback = true; + TRANSACTIONS.remove(id); + } + +}