diff --git a/pom.xml b/pom.xml
index 5d538fa..7c0304e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -35,7 +35,7 @@
de.infinimotion
model-persistence
- 0.0.3
+ 0.0.38
@@ -43,14 +43,6 @@
io.quarkus
quarkus-arc
-
- io.quarkus
- quarkus-rest
-
-
- io.quarkus
- quarkus-rest-jackson
-
io.quarkus
quarkus-messaging-kafka
@@ -59,6 +51,10 @@
io.quarkus
quarkus-hibernate-orm
+
+ io.quarkus
+ quarkus-jdbc-postgresql
+
diff --git a/src/main/java/de/infinimotion/persistence/CommandProcessor.java b/src/main/java/de/infinimotion/persistence/CommandProcessor.java
new file mode 100644
index 0000000..7cd247c
--- /dev/null
+++ b/src/main/java/de/infinimotion/persistence/CommandProcessor.java
@@ -0,0 +1,58 @@
+package de.infinimotion.persistence;
+
+import de.infinimotion.model.persistence.*;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import jakarta.persistence.EntityManager;
+import jakarta.transaction.Transactional;
+import org.eclipse.microprofile.reactive.messaging.Incoming;
+import org.eclipse.microprofile.reactive.messaging.Outgoing;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+@ApplicationScoped
+public class CommandProcessor {
+
+ @Inject
+ EntityManager em;
+
+ private Map> processors
+ = Map.of(CommandListKinosaal.class.getSimpleName(), this::processListKinosaal,
+ CommandCreateKinosaal.class.getSimpleName(), this::processCreateKinosaal);
+
+ @Incoming("command")
+ @Outgoing("command-replies")
+ public CommandWrapper process(CommandWrapper request) throws IOException {
+ ThrowingFunction processor = processors.get(request.getType());
+ if (processor == null) {
+ CommandException e = new CommandException();
+ e.setException("unknown processor");
+ return e.serialize().copyIds(request);
+ }
+
+ return processor.apply(request).serialize().copyIds(request);
+ }
+
+ @Transactional
+ Command processListKinosaal(CommandWrapper wrapper) throws IOException {
+ CommandListKinosaal request = CommandListKinosaal.deserialize(wrapper);
+ List results = em.createQuery("SELECT k FROM Kinosaal k", Kinosaal.class).getResultList();
+ CommandListKinosaalResponse response = new CommandListKinosaalResponse();
+ response.setList(results);
+ return response;
+ }
+
+ @Transactional
+ Command processCreateKinosaal(CommandWrapper wrapper) throws IOException {
+ CommandCreateKinosaal request = CommandCreateKinosaal.deserialize(wrapper);
+ Kinosaal hall = new Kinosaal();
+ hall.setName(request.getName());
+ em.persist(hall);
+ CommandCreateKinosaalResponse response = new CommandCreateKinosaalResponse();
+ response.setHall(hall);
+ return response;
+ }
+
+}
diff --git a/src/main/java/de/infinimotion/persistence/GreetingResource.java b/src/main/java/de/infinimotion/persistence/GreetingResource.java
deleted file mode 100644
index 89cb233..0000000
--- a/src/main/java/de/infinimotion/persistence/GreetingResource.java
+++ /dev/null
@@ -1,36 +0,0 @@
-package de.infinimotion.persistence;
-
-import de.infinimotion.persistence.Eintrittskarte;
-import io.smallrye.reactive.messaging.annotations.Incomings;
-import jakarta.enterprise.context.ApplicationScoped;
-import jakarta.inject.Inject;
-import jakarta.ws.rs.GET;
-import jakarta.ws.rs.Path;
-import jakarta.ws.rs.QueryParam;
-import org.eclipse.microprofile.reactive.messaging.Channel;
-import org.eclipse.microprofile.reactive.messaging.Emitter;
-import org.eclipse.microprofile.reactive.messaging.Incoming;
-
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.logging.Logger;
-
-@ApplicationScoped
-@Path("/hello")
-public class GreetingResource {
-
- @Channel("test")
- Emitter testEmitter;
-
- @GET
- public Eintrittskarte hello(@QueryParam("code") String code) throws ExecutionException, InterruptedException, TimeoutException {
- Eintrittskarte ticket = new Eintrittskarte();
- ticket.setId((int) (Math.random() * Integer.MAX_VALUE));
- ticket.setCode(code);
- testEmitter.send(ticket.toString()).toCompletableFuture().get(10, TimeUnit.SECONDS);
- System.out.println("Sent " + ticket);
- return ticket;
- }
-
-}
diff --git a/src/main/java/de/infinimotion/persistence/ThrowingFunction.java b/src/main/java/de/infinimotion/persistence/ThrowingFunction.java
new file mode 100644
index 0000000..ec04237
--- /dev/null
+++ b/src/main/java/de/infinimotion/persistence/ThrowingFunction.java
@@ -0,0 +1,6 @@
+package de.infinimotion.persistence;
+
+@FunctionalInterface
+public interface ThrowingFunction {
+ R apply(T t) throws E;
+}
diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties
index b271a05..2de9d9f 100644
--- a/src/main/resources/application.properties
+++ b/src/main/resources/application.properties
@@ -1,7 +1,22 @@
-quarkus.http.port=8081
+quarkus.http.port=7081
+
+quarkus.index-dependency.model-persistence.group-id=de.infinimotion
+quarkus.index-dependency.model-persistence.artifact-id=model-persistence
+
%dev.kafka.bootstrap.servers=localhost:19092
+kafka.bootstrap.servers=redpanda-0:9092
+
kafka.security.protocol=SASL_PLAINTEXT
kafka.sasl.mechanism=SCRAM-SHA-256
-kafka.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="im-admin" password="a552855c0d842e90895121cf614c31f950086cab";
-kafka.bootstrap.servers=redpanda-0:9092
-mp.messaging.outgoing.test.connector=smallrye-kafka
\ No newline at end of file
+kafka.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="im-admin" password="a552855c0d842e90895121cf614c31f950086cab";
+
+mp.messaging.incoming.command.connector=smallrye-kafka
+mp.messaging.incoming.command.auto.offset.reset=earliest
+
+mp.messaging.outgoing.command-replies.connector=smallrye-kafka
+
+quarkus.hibernate-orm.schema-management.strategy=update
+quarkus.datasource.db-kind=postgresql
+quarkus.datasource.username=postgres
+quarkus.datasource.password=a552855c0d842e90895121cf614c31f950086cab
+quarkus.datasource.jdbc.url=jdbc:postgresql://localhost:5432/postgres
\ No newline at end of file