From a50503d2122d62bdbb085d71d3f0a997bfd5b062 Mon Sep 17 00:00:00 2001 From: Lennart Heinrich Date: Sat, 18 Oct 2025 00:50:34 +0200 Subject: [PATCH] Initial RequestResource --- docker-compose.yml | 12 +++- pom.xml | 2 +- .../backend/BetterRequestReply.java | 65 +++++++++++++++++++ .../backend/GreetingResource.java | 29 --------- .../infinimotion/backend/RequestResource.java | 45 +++++++++++++ .../java/de/infinimotion/backend/Utils.java | 9 +++ src/main/resources/application.properties | 15 +++-- 7 files changed, 141 insertions(+), 36 deletions(-) create mode 100644 src/main/java/de/infinimotion/backend/BetterRequestReply.java delete mode 100644 src/main/java/de/infinimotion/backend/GreetingResource.java create mode 100644 src/main/java/de/infinimotion/backend/RequestResource.java create mode 100644 src/main/java/de/infinimotion/backend/Utils.java diff --git a/docker-compose.yml b/docker-compose.yml index c291d92..258c1fb 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -27,8 +27,18 @@ services: - redpanda_network depends_on: - redpanda-0 + - postgres + #ports: + # - 8081:8081 + postgres: + image: postgres + restart: always + # set shared memory limit when using docker compose + shm_size: 512mb + environment: + POSTGRES_PASSWORD: a552855c0d842e90895121cf614c31f950086cab ports: - - 8081:8081 + - "127.0.0.1:5432:5432" redpanda-0: command: - redpanda diff --git a/pom.xml b/pom.xml index b50ee48..62ef98d 100644 --- a/pom.xml +++ b/pom.xml @@ -35,7 +35,7 @@ de.infinimotion model-backend - 0.0.2 + 0.0.38 diff --git a/src/main/java/de/infinimotion/backend/BetterRequestReply.java b/src/main/java/de/infinimotion/backend/BetterRequestReply.java new file mode 100644 index 0000000..a283b54 --- /dev/null +++ b/src/main/java/de/infinimotion/backend/BetterRequestReply.java @@ -0,0 +1,65 @@ +package de.infinimotion.backend; + +import de.infinimotion.model.backend.Command; +import de.infinimotion.model.backend.CommandException; +import de.infinimotion.model.backend.CommandWrapper; +import jakarta.enterprise.context.ApplicationScoped; +import org.eclipse.microprofile.reactive.messaging.Channel; +import org.eclipse.microprofile.reactive.messaging.Emitter; +import org.eclipse.microprofile.reactive.messaging.Incoming; + +import java.io.IOException; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicReference; + +@ApplicationScoped +public class BetterRequestReply { + + @Channel("command") + Emitter emitter; + + private static final Map> RESPONSES = new ConcurrentHashMap<>(); + + @Incoming("command-replies") + void processReply(CommandWrapper wrapper) { + AtomicReference responseRef = RESPONSES.get(wrapper.getRequest()); + if (responseRef == null) { + return; + } + + synchronized (responseRef) { + responseRef.set(wrapper); + responseRef.notifyAll(); + } + } + + CommandWrapper request(CommandWrapper wrapper) throws ExecutionException, InterruptedException, IOException { + wrapper.setRequest(UUID.randomUUID().toString()); + + AtomicReference responseRef = new AtomicReference<>(); + synchronized (responseRef) { + RESPONSES.put(wrapper.getRequest(), responseRef); + try { + emitter.send(wrapper).toCompletableFuture().get(); + + CommandWrapper response; + do { + responseRef.wait(); + response = responseRef.get(); + } while (response == null); + + if (CommandException.isType(response)) { + throw new RuntimeException(CommandException.deserialize(response).getException()); + } + return response; + } finally { + RESPONSES.remove(wrapper.getRequest()); + } + } + } + +} diff --git a/src/main/java/de/infinimotion/backend/GreetingResource.java b/src/main/java/de/infinimotion/backend/GreetingResource.java deleted file mode 100644 index 774a60e..0000000 --- a/src/main/java/de/infinimotion/backend/GreetingResource.java +++ /dev/null @@ -1,29 +0,0 @@ -package de.infinimotion.backend; - -import de.infinimotion.backend.Eintrittskarte; -import io.smallrye.reactive.messaging.annotations.Incomings; -import jakarta.enterprise.context.ApplicationScoped; -import jakarta.ws.rs.GET; -import jakarta.ws.rs.Path; -import org.eclipse.microprofile.reactive.messaging.Incoming; - -import java.util.logging.Logger; - -@ApplicationScoped -@Path("/hello") -public class GreetingResource { - - @GET - public Eintrittskarte hello() { - Eintrittskarte ticket = new Eintrittskarte(); - ticket.setId(123456); - ticket.setCode("ABCDEF"); - return ticket; - } - - @Incoming("test") - public void consume(final String test) { - System.out.println(test); - } - -} diff --git a/src/main/java/de/infinimotion/backend/RequestResource.java b/src/main/java/de/infinimotion/backend/RequestResource.java new file mode 100644 index 0000000..6e0effd --- /dev/null +++ b/src/main/java/de/infinimotion/backend/RequestResource.java @@ -0,0 +1,45 @@ +package de.infinimotion.backend; + +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.PropertyAccessor; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import de.infinimotion.model.backend.*; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import jakarta.ws.rs.GET; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.QueryParam; +import org.eclipse.microprofile.reactive.messaging.*; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.ExecutionException; + +@ApplicationScoped +@Path("/request") +public class RequestResource { + + @Inject + BetterRequestReply requester; + + @GET + @Path("/kinosaal") + public List listKinosaal() throws IOException, ExecutionException, InterruptedException { + CommandListKinosaal request = new CommandListKinosaal(); + + CommandWrapper response = requester.request(request.serialize().generateIds()); + return CommandListKinosaalResponse.deserialize(response).getList(); + } + + @GET + @Path("/kinosaal/create") + public Kinosaal createKinosaal(@QueryParam("name") String name) throws IOException, ExecutionException, InterruptedException { + CommandCreateKinosaal request = new CommandCreateKinosaal(); + request.setName(name); + + CommandWrapper response = requester.request(request.serialize().generateIds()); + return CommandCreateKinosaalResponse.deserialize(response).getHall(); + } + +} diff --git a/src/main/java/de/infinimotion/backend/Utils.java b/src/main/java/de/infinimotion/backend/Utils.java new file mode 100644 index 0000000..93e3784 --- /dev/null +++ b/src/main/java/de/infinimotion/backend/Utils.java @@ -0,0 +1,9 @@ +package de.infinimotion.backend; + +import de.infinimotion.model.backend.Command; + +import java.util.UUID; + +public class Utils { + +} diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index e7b503b..bf6983c 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -1,8 +1,13 @@ -quarkus.http.port=8080 +quarkus.http.port=7080 + %dev.kafka.bootstrap.servers=localhost:19092 +kafka.bootstrap.servers=redpanda-0:9092 + kafka.security.protocol=SASL_PLAINTEXT kafka.sasl.mechanism=SCRAM-SHA-256 -kafka.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="im-admin" password="a552855c0d842e90895121cf614c31f950086cab"; -kafka.bootstrap.servers=redpanda-0:9092 -mp.messaging.incoming.test.connector=smallrye-kafka -mp.messaging.incoming.test.auto.offset.reset=earliest +kafka.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="im-admin" password="a552855c0d842e90895121cf614c31f950086cab"; + +mp.messaging.outgoing.command.connector=smallrye-kafka + +mp.messaging.incoming.command-replies.connector=smallrye-kafka +mp.messaging.incoming.command-replies.auto.offset.reset=earliest