diff --git a/docker-compose.yml b/docker-compose.yml
index c291d92..258c1fb 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -27,8 +27,18 @@ services:
- redpanda_network
depends_on:
- redpanda-0
+ - postgres
+ #ports:
+ # - 8081:8081
+ postgres:
+ image: postgres
+ restart: always
+ # set shared memory limit when using docker compose
+ shm_size: 512mb
+ environment:
+ POSTGRES_PASSWORD: a552855c0d842e90895121cf614c31f950086cab
ports:
- - 8081:8081
+ - "127.0.0.1:5432:5432"
redpanda-0:
command:
- redpanda
diff --git a/pom.xml b/pom.xml
index b50ee48..62ef98d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -35,7 +35,7 @@
de.infinimotion
model-backend
- 0.0.2
+ 0.0.38
diff --git a/src/main/java/de/infinimotion/backend/BetterRequestReply.java b/src/main/java/de/infinimotion/backend/BetterRequestReply.java
new file mode 100644
index 0000000..a283b54
--- /dev/null
+++ b/src/main/java/de/infinimotion/backend/BetterRequestReply.java
@@ -0,0 +1,65 @@
+package de.infinimotion.backend;
+
+import de.infinimotion.model.backend.Command;
+import de.infinimotion.model.backend.CommandException;
+import de.infinimotion.model.backend.CommandWrapper;
+import jakarta.enterprise.context.ApplicationScoped;
+import org.eclipse.microprofile.reactive.messaging.Channel;
+import org.eclipse.microprofile.reactive.messaging.Emitter;
+import org.eclipse.microprofile.reactive.messaging.Incoming;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicReference;
+
+@ApplicationScoped
+public class BetterRequestReply {
+
+ @Channel("command")
+ Emitter emitter;
+
+ private static final Map> RESPONSES = new ConcurrentHashMap<>();
+
+ @Incoming("command-replies")
+ void processReply(CommandWrapper wrapper) {
+ AtomicReference responseRef = RESPONSES.get(wrapper.getRequest());
+ if (responseRef == null) {
+ return;
+ }
+
+ synchronized (responseRef) {
+ responseRef.set(wrapper);
+ responseRef.notifyAll();
+ }
+ }
+
+ CommandWrapper request(CommandWrapper wrapper) throws ExecutionException, InterruptedException, IOException {
+ wrapper.setRequest(UUID.randomUUID().toString());
+
+ AtomicReference responseRef = new AtomicReference<>();
+ synchronized (responseRef) {
+ RESPONSES.put(wrapper.getRequest(), responseRef);
+ try {
+ emitter.send(wrapper).toCompletableFuture().get();
+
+ CommandWrapper response;
+ do {
+ responseRef.wait();
+ response = responseRef.get();
+ } while (response == null);
+
+ if (CommandException.isType(response)) {
+ throw new RuntimeException(CommandException.deserialize(response).getException());
+ }
+ return response;
+ } finally {
+ RESPONSES.remove(wrapper.getRequest());
+ }
+ }
+ }
+
+}
diff --git a/src/main/java/de/infinimotion/backend/GreetingResource.java b/src/main/java/de/infinimotion/backend/GreetingResource.java
deleted file mode 100644
index 774a60e..0000000
--- a/src/main/java/de/infinimotion/backend/GreetingResource.java
+++ /dev/null
@@ -1,29 +0,0 @@
-package de.infinimotion.backend;
-
-import de.infinimotion.backend.Eintrittskarte;
-import io.smallrye.reactive.messaging.annotations.Incomings;
-import jakarta.enterprise.context.ApplicationScoped;
-import jakarta.ws.rs.GET;
-import jakarta.ws.rs.Path;
-import org.eclipse.microprofile.reactive.messaging.Incoming;
-
-import java.util.logging.Logger;
-
-@ApplicationScoped
-@Path("/hello")
-public class GreetingResource {
-
- @GET
- public Eintrittskarte hello() {
- Eintrittskarte ticket = new Eintrittskarte();
- ticket.setId(123456);
- ticket.setCode("ABCDEF");
- return ticket;
- }
-
- @Incoming("test")
- public void consume(final String test) {
- System.out.println(test);
- }
-
-}
diff --git a/src/main/java/de/infinimotion/backend/RequestResource.java b/src/main/java/de/infinimotion/backend/RequestResource.java
new file mode 100644
index 0000000..6e0effd
--- /dev/null
+++ b/src/main/java/de/infinimotion/backend/RequestResource.java
@@ -0,0 +1,45 @@
+package de.infinimotion.backend;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.PropertyAccessor;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import de.infinimotion.model.backend.*;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import jakarta.ws.rs.GET;
+import jakarta.ws.rs.Path;
+import jakarta.ws.rs.QueryParam;
+import org.eclipse.microprofile.reactive.messaging.*;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+@ApplicationScoped
+@Path("/request")
+public class RequestResource {
+
+ @Inject
+ BetterRequestReply requester;
+
+ @GET
+ @Path("/kinosaal")
+ public List listKinosaal() throws IOException, ExecutionException, InterruptedException {
+ CommandListKinosaal request = new CommandListKinosaal();
+
+ CommandWrapper response = requester.request(request.serialize().generateIds());
+ return CommandListKinosaalResponse.deserialize(response).getList();
+ }
+
+ @GET
+ @Path("/kinosaal/create")
+ public Kinosaal createKinosaal(@QueryParam("name") String name) throws IOException, ExecutionException, InterruptedException {
+ CommandCreateKinosaal request = new CommandCreateKinosaal();
+ request.setName(name);
+
+ CommandWrapper response = requester.request(request.serialize().generateIds());
+ return CommandCreateKinosaalResponse.deserialize(response).getHall();
+ }
+
+}
diff --git a/src/main/java/de/infinimotion/backend/Utils.java b/src/main/java/de/infinimotion/backend/Utils.java
new file mode 100644
index 0000000..93e3784
--- /dev/null
+++ b/src/main/java/de/infinimotion/backend/Utils.java
@@ -0,0 +1,9 @@
+package de.infinimotion.backend;
+
+import de.infinimotion.model.backend.Command;
+
+import java.util.UUID;
+
+public class Utils {
+
+}
diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties
index e7b503b..bf6983c 100644
--- a/src/main/resources/application.properties
+++ b/src/main/resources/application.properties
@@ -1,8 +1,13 @@
-quarkus.http.port=8080
+quarkus.http.port=7080
+
%dev.kafka.bootstrap.servers=localhost:19092
+kafka.bootstrap.servers=redpanda-0:9092
+
kafka.security.protocol=SASL_PLAINTEXT
kafka.sasl.mechanism=SCRAM-SHA-256
-kafka.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="im-admin" password="a552855c0d842e90895121cf614c31f950086cab";
-kafka.bootstrap.servers=redpanda-0:9092
-mp.messaging.incoming.test.connector=smallrye-kafka
-mp.messaging.incoming.test.auto.offset.reset=earliest
+kafka.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="im-admin" password="a552855c0d842e90895121cf614c31f950086cab";
+
+mp.messaging.outgoing.command.connector=smallrye-kafka
+
+mp.messaging.incoming.command-replies.connector=smallrye-kafka
+mp.messaging.incoming.command-replies.auto.offset.reset=earliest