transactions

This commit is contained in:
2025-10-24 00:48:18 +02:00
parent 9d582793f4
commit 68f76aa244
7 changed files with 141 additions and 17 deletions

View File

@@ -35,7 +35,7 @@
<dependency> <dependency>
<groupId>de.infinimotion</groupId> <groupId>de.infinimotion</groupId>
<artifactId>model-persistence</artifactId> <artifactId>model-persistence</artifactId>
<version>0.0.44</version> <version>0.0.54</version>
</dependency> </dependency>
<!-- Quarkus --> <!-- Quarkus -->
@@ -55,6 +55,11 @@
<groupId>io.quarkus</groupId> <groupId>io.quarkus</groupId>
<artifactId>quarkus-jdbc-postgresql</artifactId> <artifactId>quarkus-jdbc-postgresql</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.32</version>
</dependency>
<!-- Test --> <!-- Test -->
<dependency> <dependency>

View File

@@ -1,28 +1,44 @@
package de.infinimotion.persistence; package de.infinimotion.persistence;
import de.infinimotion.model.persistence.*; import de.infinimotion.model.persistence.*;
import de.infinimotion.persistence.transaction.InitiateRollbackException;
import de.infinimotion.persistence.transaction.Transaction;
import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.context.ApplicationScoped;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.eclipse.microprofile.reactive.messaging.Incoming; import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import java.io.IOException;
@ApplicationScoped @ApplicationScoped
public class CommandObserver { public class CommandObserver {
@Channel("command-replies")
Emitter<CommandWrapper> replyEmitter;
@Incoming("command") @Incoming("command")
@Outgoing("command-replies") public void process(CommandWrapper request) {
public CommandWrapper process(CommandWrapper request) throws IOException {
System.out.println(request); System.out.println(request);
try { Transaction tx = Transaction.get(request.getTransaction());
return request.process().serialize().copyIds(request); tx.queue(() -> {
} catch (Throwable t) { try {
t.printStackTrace(); CommandWrapper response = request.process().serialize().copyIds(request);
CommandException e = new CommandException(); if (request.isCommit()) {
e.setException(t.getMessage()); tx.commit(replyEmitter, response);
return e.serialize().copyIds(request); } else {
} replyEmitter.send(response);
}
} catch (InitiateRollbackException e) {
tx.rollback();
replyEmitter.send(new CommandRollbackResponse().serialize().copyIds(request));
} catch (Throwable t) {
tx.rollback();
t.printStackTrace();
CommandException e = new CommandException();
e.setException(t.getMessage());
replyEmitter.send(e.serialize().copyIds(request));
}
});
} }
} }

View File

@@ -5,7 +5,6 @@ import io.quarkus.arc.Unremovable;
import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject; import jakarta.inject.Inject;
import jakarta.persistence.EntityManager; import jakarta.persistence.EntityManager;
import jakarta.transaction.Transactional;
import java.util.List; import java.util.List;
@@ -16,7 +15,6 @@ public class KinosaalProcessor implements CommandListKinosaalProcessor, CommandC
@Inject @Inject
EntityManager em; EntityManager em;
@Transactional
public Command processCommandListKinosaal(CommandListKinosaal request) { public Command processCommandListKinosaal(CommandListKinosaal request) {
List<Kinosaal> results = em.createQuery("SELECT k FROM Kinosaal k", Kinosaal.class).getResultList(); List<Kinosaal> results = em.createQuery("SELECT k FROM Kinosaal k", Kinosaal.class).getResultList();
CommandListKinosaalResponse response = new CommandListKinosaalResponse(); CommandListKinosaalResponse response = new CommandListKinosaalResponse();
@@ -24,7 +22,6 @@ public class KinosaalProcessor implements CommandListKinosaalProcessor, CommandC
return response; return response;
} }
@Transactional
public Command processCommandCreateKinosaal(CommandCreateKinosaal request) { public Command processCommandCreateKinosaal(CommandCreateKinosaal request) {
Kinosaal hall = new Kinosaal(); Kinosaal hall = new Kinosaal();
hall.setName(request.getName()); hall.setName(request.getName());

View File

@@ -0,0 +1,4 @@
package de.infinimotion.persistence.transaction;
public class InitiateRollbackException extends Exception {
}

View File

@@ -0,0 +1,16 @@
package de.infinimotion.persistence.transaction;
import de.infinimotion.model.persistence.*;
import io.quarkus.arc.Unremovable;
import jakarta.enterprise.context.ApplicationScoped;
@Unremovable
@ApplicationScoped
public class RollbackProcessor implements CommandRollbackProcessor {
@Override
public Command processCommandRollback(CommandRollback rollback) throws InitiateRollbackException {
throw new InitiateRollbackException();
}
}

View File

@@ -0,0 +1,6 @@
package de.infinimotion.persistence.transaction;
@FunctionalInterface
public interface ThrowingRunnable {
void run() throws Exception;
}

View File

@@ -0,0 +1,80 @@
package de.infinimotion.persistence.transaction;
import de.infinimotion.model.persistence.CommandWrapper;
import io.quarkus.narayana.jta.QuarkusTransaction;
import io.quarkus.narayana.jta.QuarkusTransactionException;
import lombok.Setter;
import lombok.SneakyThrows;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import java.util.Map;
import java.util.concurrent.*;
public class Transaction extends Thread {
private static final Map<String, Transaction> TRANSACTIONS = new ConcurrentHashMap<>();
public static Transaction get(String id) {
Transaction tx = TRANSACTIONS.computeIfAbsent(id, Transaction::new);
if (!tx.isAlive()) {
tx.start();
}
return tx;
}
private final String id;
private final BlockingQueue<ThrowingRunnable> tasks = new LinkedBlockingQueue<>();
@Setter
private boolean rollback = false;
private Emitter<CommandWrapper> replyEmitter;
private CommandWrapper response;
private Transaction(String id) {
this.id = id;
}
@SneakyThrows
@Override
public void run() {
try {
QuarkusTransaction.disallowingExisting().call(() -> {
while (TRANSACTIONS.containsKey(id)) {
tasks.take().run();
}
if (rollback) {
throw new InitiateRollbackException();
}
return null;
});
} catch (QuarkusTransactionException e) {
if (!(e.getCause() instanceof InitiateRollbackException)) {
throw e;
}
} finally {
TRANSACTIONS.remove(id);
}
if (!rollback) {
replyEmitter.send(response);
}
}
public void queue(ThrowingRunnable task) {
tasks.add(task);
}
@SneakyThrows
public void commit(Emitter<CommandWrapper> replyEmitter, CommandWrapper response) {
TRANSACTIONS.remove(id);
this.replyEmitter = replyEmitter;
this.response = response;
}
@SneakyThrows
public void rollback() {
rollback = true;
TRANSACTIONS.remove(id);
}
}