Initial RequestResource
This commit is contained in:
@@ -0,0 +1,65 @@
|
||||
package de.infinimotion.backend;
|
||||
|
||||
import de.infinimotion.model.backend.Command;
|
||||
import de.infinimotion.model.backend.CommandException;
|
||||
import de.infinimotion.model.backend.CommandWrapper;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import org.eclipse.microprofile.reactive.messaging.Channel;
|
||||
import org.eclipse.microprofile.reactive.messaging.Emitter;
|
||||
import org.eclipse.microprofile.reactive.messaging.Incoming;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
@ApplicationScoped
|
||||
public class BetterRequestReply {
|
||||
|
||||
@Channel("command")
|
||||
Emitter<CommandWrapper> emitter;
|
||||
|
||||
private static final Map<String, AtomicReference<CommandWrapper>> RESPONSES = new ConcurrentHashMap<>();
|
||||
|
||||
@Incoming("command-replies")
|
||||
void processReply(CommandWrapper wrapper) {
|
||||
AtomicReference<CommandWrapper> responseRef = RESPONSES.get(wrapper.getRequest());
|
||||
if (responseRef == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
synchronized (responseRef) {
|
||||
responseRef.set(wrapper);
|
||||
responseRef.notifyAll();
|
||||
}
|
||||
}
|
||||
|
||||
CommandWrapper request(CommandWrapper wrapper) throws ExecutionException, InterruptedException, IOException {
|
||||
wrapper.setRequest(UUID.randomUUID().toString());
|
||||
|
||||
AtomicReference<CommandWrapper> responseRef = new AtomicReference<>();
|
||||
synchronized (responseRef) {
|
||||
RESPONSES.put(wrapper.getRequest(), responseRef);
|
||||
try {
|
||||
emitter.send(wrapper).toCompletableFuture().get();
|
||||
|
||||
CommandWrapper response;
|
||||
do {
|
||||
responseRef.wait();
|
||||
response = responseRef.get();
|
||||
} while (response == null);
|
||||
|
||||
if (CommandException.isType(response)) {
|
||||
throw new RuntimeException(CommandException.deserialize(response).getException());
|
||||
}
|
||||
return response;
|
||||
} finally {
|
||||
RESPONSES.remove(wrapper.getRequest());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,29 +0,0 @@
|
||||
package de.infinimotion.backend;
|
||||
|
||||
import de.infinimotion.backend.Eintrittskarte;
|
||||
import io.smallrye.reactive.messaging.annotations.Incomings;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.ws.rs.GET;
|
||||
import jakarta.ws.rs.Path;
|
||||
import org.eclipse.microprofile.reactive.messaging.Incoming;
|
||||
|
||||
import java.util.logging.Logger;
|
||||
|
||||
@ApplicationScoped
|
||||
@Path("/hello")
|
||||
public class GreetingResource {
|
||||
|
||||
@GET
|
||||
public Eintrittskarte hello() {
|
||||
Eintrittskarte ticket = new Eintrittskarte();
|
||||
ticket.setId(123456);
|
||||
ticket.setCode("ABCDEF");
|
||||
return ticket;
|
||||
}
|
||||
|
||||
@Incoming("test")
|
||||
public void consume(final String test) {
|
||||
System.out.println(test);
|
||||
}
|
||||
|
||||
}
|
||||
45
src/main/java/de/infinimotion/backend/RequestResource.java
Normal file
45
src/main/java/de/infinimotion/backend/RequestResource.java
Normal file
@@ -0,0 +1,45 @@
|
||||
package de.infinimotion.backend;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonAutoDetect;
|
||||
import com.fasterxml.jackson.annotation.PropertyAccessor;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.SerializationFeature;
|
||||
import de.infinimotion.model.backend.*;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.ws.rs.GET;
|
||||
import jakarta.ws.rs.Path;
|
||||
import jakarta.ws.rs.QueryParam;
|
||||
import org.eclipse.microprofile.reactive.messaging.*;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
@ApplicationScoped
|
||||
@Path("/request")
|
||||
public class RequestResource {
|
||||
|
||||
@Inject
|
||||
BetterRequestReply requester;
|
||||
|
||||
@GET
|
||||
@Path("/kinosaal")
|
||||
public List<Kinosaal> listKinosaal() throws IOException, ExecutionException, InterruptedException {
|
||||
CommandListKinosaal request = new CommandListKinosaal();
|
||||
|
||||
CommandWrapper response = requester.request(request.serialize().generateIds());
|
||||
return CommandListKinosaalResponse.deserialize(response).getList();
|
||||
}
|
||||
|
||||
@GET
|
||||
@Path("/kinosaal/create")
|
||||
public Kinosaal createKinosaal(@QueryParam("name") String name) throws IOException, ExecutionException, InterruptedException {
|
||||
CommandCreateKinosaal request = new CommandCreateKinosaal();
|
||||
request.setName(name);
|
||||
|
||||
CommandWrapper response = requester.request(request.serialize().generateIds());
|
||||
return CommandCreateKinosaalResponse.deserialize(response).getHall();
|
||||
}
|
||||
|
||||
}
|
||||
9
src/main/java/de/infinimotion/backend/Utils.java
Normal file
9
src/main/java/de/infinimotion/backend/Utils.java
Normal file
@@ -0,0 +1,9 @@
|
||||
package de.infinimotion.backend;
|
||||
|
||||
import de.infinimotion.model.backend.Command;
|
||||
|
||||
import java.util.UUID;
|
||||
|
||||
public class Utils {
|
||||
|
||||
}
|
||||
@@ -1,8 +1,13 @@
|
||||
quarkus.http.port=8080
|
||||
quarkus.http.port=7080
|
||||
|
||||
%dev.kafka.bootstrap.servers=localhost:19092
|
||||
kafka.bootstrap.servers=redpanda-0:9092
|
||||
|
||||
kafka.security.protocol=SASL_PLAINTEXT
|
||||
kafka.sasl.mechanism=SCRAM-SHA-256
|
||||
kafka.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="im-admin" password="a552855c0d842e90895121cf614c31f950086cab";
|
||||
kafka.bootstrap.servers=redpanda-0:9092
|
||||
mp.messaging.incoming.test.connector=smallrye-kafka
|
||||
mp.messaging.incoming.test.auto.offset.reset=earliest
|
||||
kafka.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="im-admin" password="a552855c0d842e90895121cf614c31f950086cab";
|
||||
|
||||
mp.messaging.outgoing.command.connector=smallrye-kafka
|
||||
|
||||
mp.messaging.incoming.command-replies.connector=smallrye-kafka
|
||||
mp.messaging.incoming.command-replies.auto.offset.reset=earliest
|
||||
|
||||
Reference in New Issue
Block a user