diff --git a/pom.xml b/pom.xml
index 4175a62..e19f15e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -35,7 +35,7 @@
de.infinimotion
model-persistence
- 0.0.89
+ 0.0.101
@@ -60,6 +60,10 @@
lombok
1.18.32
+
+ io.quarkus
+ quarkus-mongodb-panache
+
diff --git a/src/main/java/de/infinimotion/persistence/CommandObserver.java b/src/main/java/de/infinimotion/persistence/CommandObserver.java
index 3348647..cf71684 100644
--- a/src/main/java/de/infinimotion/persistence/CommandObserver.java
+++ b/src/main/java/de/infinimotion/persistence/CommandObserver.java
@@ -1,9 +1,11 @@
package de.infinimotion.persistence;
import de.infinimotion.model.persistence.*;
+import de.infinimotion.persistence.processor.CommandResponseInterceptor;
import de.infinimotion.persistence.transaction.InitiateRollbackException;
import de.infinimotion.persistence.transaction.Transaction;
import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.eclipse.microprofile.reactive.messaging.Incoming;
@@ -12,6 +14,9 @@ import org.eclipse.microprofile.reactive.messaging.Incoming;
@ApplicationScoped
public class CommandObserver {
+ @Inject
+ CommandResponseInterceptor interceptor;
+
@Channel("command-replies")
Emitter replyEmitter;
@@ -22,6 +27,7 @@ public class CommandObserver {
tx.queue(() -> {
try {
CommandWrapper response = request.process().serialize().copyIds(request);
+ interceptor.intercept(response);
if (request.isCommit()) {
tx.commit(replyEmitter, response);
} else {
diff --git a/src/main/java/de/infinimotion/persistence/processor/CommandListStatisticsProcessor.java b/src/main/java/de/infinimotion/persistence/processor/CommandListStatisticsProcessor.java
new file mode 100644
index 0000000..091a67a
--- /dev/null
+++ b/src/main/java/de/infinimotion/persistence/processor/CommandListStatisticsProcessor.java
@@ -0,0 +1,20 @@
+package de.infinimotion.persistence.processor;
+
+import de.infinimotion.model.persistence.*;
+import io.quarkus.arc.Unremovable;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.transaction.Transactional;
+
+@Unremovable
+@ApplicationScoped
+public class CommandListStatisticsProcessor implements de.infinimotion.model.persistence.CommandListStatisticsProcessor {
+
+ @Transactional(Transactional.TxType.NOT_SUPPORTED)
+ @Override
+ public CommandListStatisticsResponse processCommandListStatistics(CommandListStatistics commandListStatistics) {
+ CommandListStatisticsResponse resp = new CommandListStatisticsResponse();
+ resp.setMovies(StatisticsFilm.findAll().list());
+ resp.setShows(StatisticsVorstellung.findAll().list());
+ return resp;
+ }
+}
diff --git a/src/main/java/de/infinimotion/persistence/processor/CommandResponseInterceptor.java b/src/main/java/de/infinimotion/persistence/processor/CommandResponseInterceptor.java
new file mode 100644
index 0000000..936026b
--- /dev/null
+++ b/src/main/java/de/infinimotion/persistence/processor/CommandResponseInterceptor.java
@@ -0,0 +1,160 @@
+package de.infinimotion.persistence.processor;
+
+import de.infinimotion.model.persistence.*;
+import io.vertx.core.Vertx;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Diese Klasse ist nicht besonders schön.
+ * Das liegt daran, dass die Verwendung von MongoDB nicht leicht in das bestehende Transaktionskonzept mit Postgres integriert werden kann.
+ * Die zusätzlichen Einschränkungen durch die Code-Generierung (sehr allgemeine Templates) würden eine schöne Lösung sehr kompliziert machen.
+ */
+@ApplicationScoped
+public class CommandResponseInterceptor {
+
+ @Inject
+ Vertx vertx;
+
+ public static final AtomicBoolean LOCK = new AtomicBoolean(true);
+
+ public void intercept(CommandWrapper response) {
+ vertx.executeBlocking(() -> {
+ synchronized (LOCK) {
+ if (response.getType().equals(CommandCreateEintrittskarteResponse.class.getSimpleName())) {
+ Eintrittskarte ticket = CommandCreateEintrittskarteResponse.deserialize(response).getEintrittskarte();
+ interceptCreate(ticket);
+ } else if (response.getType().equals(CommandUpdateBestellungResponse.class.getSimpleName())) {
+ Bestellung order = CommandUpdateBestellungResponse.deserialize(response).getBestellung();
+ interceptUpdate(order);
+ }
+ return null;
+ }
+ });
+
+ }
+
+ public void interceptCreate(Eintrittskarte ticket) {
+ int price = ticket.getSeat().getRow().getCategory().getPrice();
+ int showId = ticket.getShow().getId();
+ int movieId = ticket.getShow().getMovie().getId();
+ boolean active = checkOrderActive(ticket.getOrder());
+
+ Optional show = StatisticsVorstellung.find("show._id", showId).firstResultOptional();
+ Optional movie = StatisticsFilm.find("movie._id", movieId).firstResultOptional();
+
+ if (show.isEmpty()) {
+ StatisticsVorstellung newShow = new StatisticsVorstellung();
+ newShow.setShow(ticket.getShow());
+ newShow.setTickets(List.of(ticket));
+ if (active) {
+ newShow.setEarnings(price);
+ }
+ newShow.persist();
+ } else {
+ StatisticsVorstellung existingShow = show.orElseThrow();
+ existingShow.getTickets().add(ticket);
+ if (active) {
+ existingShow.setEarnings(existingShow.getEarnings() + price);
+ }
+ existingShow.update();
+ }
+
+ if (movie.isEmpty()) {
+ StatisticsFilm newMovie = new StatisticsFilm();
+ newMovie.setMovie(ticket.getShow().getMovie());
+ newMovie.setTickets(List.of(ticket));
+ if (active) {
+ newMovie.setEarnings(price);
+ }
+ newMovie.persist();
+ } else {
+ StatisticsFilm existingMovie = movie.orElseThrow();
+ existingMovie.getTickets().add(ticket);
+ if (active) {
+ existingMovie.setEarnings(existingMovie.getEarnings() + price);
+ }
+ existingMovie.update();
+ }
+ }
+
+// public void interceptUpdate(Eintrittskarte ticket) {
+// int price = ticket.getSeat().getRow().getCategory().getPrice();
+// int showId = ticket.getShow().getId();
+// int movieId = ticket.getShow().getMovie().getId();
+//
+// StatisticsVorstellung show = StatisticsVorstellung.find("show._id", showId).firstResult();
+// StatisticsFilm movie = StatisticsFilm.find("movie._id", movieId).firstResult();
+//
+// Optional existingTicketShow = show.getTickets().stream().filter(t -> Objects.equals(t.getId(), ticket.getId())).findAny();
+// int existingPriceShow = existingTicketShow
+// .map(Eintrittskarte::getSeat)
+// .map(Sitzplatz::getRow)
+// .map(Sitzreihe::getCategory)
+// .map(Sitzkategorie::getPrice)
+// .orElse(0);
+// show.getTickets().remove(existingTicketShow.orElse(null));
+// show.getTickets().add(ticket);
+// show.setEarnings(show.getEarnings() - existingPriceShow + price);
+// show.update();
+//
+// Optional existingTicketMovie = show.getTickets().stream().filter(t -> Objects.equals(t.getId(), ticket.getId())).findAny();
+// int existingPriceMovie = existingTicketMovie
+// .map(Eintrittskarte::getSeat)
+// .map(Sitzplatz::getRow)
+// .map(Sitzreihe::getCategory)
+// .map(Sitzkategorie::getPrice)
+// .orElse(0);
+// movie.getTickets().remove(existingTicketMovie.orElse(null));
+// movie.getTickets().add(ticket);
+// movie.setEarnings(movie.getEarnings() - existingPriceMovie + price);
+// movie.update();
+// }
+
+ public void interceptUpdate(Bestellung order) {
+ boolean orderActive = checkOrderActive(order);
+
+ Optional showOpt = StatisticsVorstellung.find("tickets.order._id", order.getId()).firstResultOptional();
+ if (showOpt.isPresent()) {
+ StatisticsVorstellung show = showOpt.orElseThrow();
+ show.getTickets().stream()
+ .filter(t -> t.getOrder().getId().equals(order.getId()))
+ .forEach(ticket -> {
+ boolean ticketActive = checkOrderActive(ticket.getOrder());
+ if (ticketActive && !orderActive) {
+ show.setEarnings(show.getEarnings() - ticket.getSeat().getRow().getCategory().getPrice());
+ } else if (!ticketActive && orderActive) {
+ show.setEarnings(show.getEarnings() + ticket.getSeat().getRow().getCategory().getPrice());
+ }
+ ticket.setOrder(order);
+ });
+ show.update();
+ }
+
+ Optional movieOpt = StatisticsFilm.find("tickets.order._id", order.getId()).firstResultOptional();
+ if (movieOpt.isPresent()) {
+ StatisticsFilm movie = movieOpt.orElseThrow();
+ movie.getTickets().stream()
+ .filter(t -> t.getOrder().getId().equals(order.getId()))
+ .forEach(ticket -> {
+ boolean ticketActive = checkOrderActive(ticket.getOrder());
+ if (ticketActive && !orderActive) {
+ movie.setEarnings(movie.getEarnings() - ticket.getSeat().getRow().getCategory().getPrice());
+ } else if (!ticketActive && orderActive) {
+ movie.setEarnings(movie.getEarnings() + ticket.getSeat().getRow().getCategory().getPrice());
+ }
+ ticket.setOrder(order);
+ });
+ movie.update();
+ }
+ }
+
+ private boolean checkOrderActive(Bestellung order) {
+ return order.getCancelled() == null && order.getBooked() != null;
+ }
+
+}
diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties
index 84a2c4e..ea7669a 100644
--- a/src/main/resources/application.properties
+++ b/src/main/resources/application.properties
@@ -18,4 +18,8 @@ quarkus.datasource.db-kind=postgresql
quarkus.datasource.username=postgres
quarkus.datasource.password=a552855c0d842e90895121cf614c31f950086cab
%dev.quarkus.datasource.jdbc.url=jdbc:postgresql://localhost:5432/postgres
-quarkus.datasource.jdbc.url=jdbc:postgresql://postgres:5432/postgres
\ No newline at end of file
+quarkus.datasource.jdbc.url=jdbc:postgresql://postgres:5432/postgres
+
+quarkus.mongodb.connection-string = mongodb://root:a552855c0d842e90895121cf614c31f950086cab@mongo:27017
+%dev.quarkus.mongodb.connection-string = mongodb://root:a552855c0d842e90895121cf614c31f950086cab@localhost:27017
+quarkus.mongodb.database = statistics
\ No newline at end of file