Initial CommandProcessor
This commit is contained in:
14
pom.xml
14
pom.xml
@@ -35,7 +35,7 @@
|
|||||||
<dependency>
|
<dependency>
|
||||||
<groupId>de.infinimotion</groupId>
|
<groupId>de.infinimotion</groupId>
|
||||||
<artifactId>model-persistence</artifactId>
|
<artifactId>model-persistence</artifactId>
|
||||||
<version>0.0.3</version>
|
<version>0.0.38</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
<!-- Quarkus -->
|
<!-- Quarkus -->
|
||||||
@@ -43,14 +43,6 @@
|
|||||||
<groupId>io.quarkus</groupId>
|
<groupId>io.quarkus</groupId>
|
||||||
<artifactId>quarkus-arc</artifactId>
|
<artifactId>quarkus-arc</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
|
||||||
<groupId>io.quarkus</groupId>
|
|
||||||
<artifactId>quarkus-rest</artifactId>
|
|
||||||
</dependency>
|
|
||||||
<dependency>
|
|
||||||
<groupId>io.quarkus</groupId>
|
|
||||||
<artifactId>quarkus-rest-jackson</artifactId>
|
|
||||||
</dependency>
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>io.quarkus</groupId>
|
<groupId>io.quarkus</groupId>
|
||||||
<artifactId>quarkus-messaging-kafka</artifactId>
|
<artifactId>quarkus-messaging-kafka</artifactId>
|
||||||
@@ -59,6 +51,10 @@
|
|||||||
<groupId>io.quarkus</groupId>
|
<groupId>io.quarkus</groupId>
|
||||||
<artifactId>quarkus-hibernate-orm</artifactId>
|
<artifactId>quarkus-hibernate-orm</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>io.quarkus</groupId>
|
||||||
|
<artifactId>quarkus-jdbc-postgresql</artifactId>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
<!-- Test -->
|
<!-- Test -->
|
||||||
<dependency>
|
<dependency>
|
||||||
|
|||||||
@@ -0,0 +1,58 @@
|
|||||||
|
package de.infinimotion.persistence;
|
||||||
|
|
||||||
|
import de.infinimotion.model.persistence.*;
|
||||||
|
import jakarta.enterprise.context.ApplicationScoped;
|
||||||
|
import jakarta.inject.Inject;
|
||||||
|
import jakarta.persistence.EntityManager;
|
||||||
|
import jakarta.transaction.Transactional;
|
||||||
|
import org.eclipse.microprofile.reactive.messaging.Incoming;
|
||||||
|
import org.eclipse.microprofile.reactive.messaging.Outgoing;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
@ApplicationScoped
|
||||||
|
public class CommandProcessor {
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
EntityManager em;
|
||||||
|
|
||||||
|
private Map<String, ThrowingFunction<CommandWrapper, Command, IOException>> processors
|
||||||
|
= Map.of(CommandListKinosaal.class.getSimpleName(), this::processListKinosaal,
|
||||||
|
CommandCreateKinosaal.class.getSimpleName(), this::processCreateKinosaal);
|
||||||
|
|
||||||
|
@Incoming("command")
|
||||||
|
@Outgoing("command-replies")
|
||||||
|
public CommandWrapper process(CommandWrapper request) throws IOException {
|
||||||
|
ThrowingFunction<CommandWrapper, Command, IOException> processor = processors.get(request.getType());
|
||||||
|
if (processor == null) {
|
||||||
|
CommandException e = new CommandException();
|
||||||
|
e.setException("unknown processor");
|
||||||
|
return e.serialize().copyIds(request);
|
||||||
|
}
|
||||||
|
|
||||||
|
return processor.apply(request).serialize().copyIds(request);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Transactional
|
||||||
|
Command processListKinosaal(CommandWrapper wrapper) throws IOException {
|
||||||
|
CommandListKinosaal request = CommandListKinosaal.deserialize(wrapper);
|
||||||
|
List<Kinosaal> results = em.createQuery("SELECT k FROM Kinosaal k", Kinosaal.class).getResultList();
|
||||||
|
CommandListKinosaalResponse response = new CommandListKinosaalResponse();
|
||||||
|
response.setList(results);
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Transactional
|
||||||
|
Command processCreateKinosaal(CommandWrapper wrapper) throws IOException {
|
||||||
|
CommandCreateKinosaal request = CommandCreateKinosaal.deserialize(wrapper);
|
||||||
|
Kinosaal hall = new Kinosaal();
|
||||||
|
hall.setName(request.getName());
|
||||||
|
em.persist(hall);
|
||||||
|
CommandCreateKinosaalResponse response = new CommandCreateKinosaalResponse();
|
||||||
|
response.setHall(hall);
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@@ -1,36 +0,0 @@
|
|||||||
package de.infinimotion.persistence;
|
|
||||||
|
|
||||||
import de.infinimotion.persistence.Eintrittskarte;
|
|
||||||
import io.smallrye.reactive.messaging.annotations.Incomings;
|
|
||||||
import jakarta.enterprise.context.ApplicationScoped;
|
|
||||||
import jakarta.inject.Inject;
|
|
||||||
import jakarta.ws.rs.GET;
|
|
||||||
import jakarta.ws.rs.Path;
|
|
||||||
import jakarta.ws.rs.QueryParam;
|
|
||||||
import org.eclipse.microprofile.reactive.messaging.Channel;
|
|
||||||
import org.eclipse.microprofile.reactive.messaging.Emitter;
|
|
||||||
import org.eclipse.microprofile.reactive.messaging.Incoming;
|
|
||||||
|
|
||||||
import java.util.concurrent.ExecutionException;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
import java.util.concurrent.TimeoutException;
|
|
||||||
import java.util.logging.Logger;
|
|
||||||
|
|
||||||
@ApplicationScoped
|
|
||||||
@Path("/hello")
|
|
||||||
public class GreetingResource {
|
|
||||||
|
|
||||||
@Channel("test")
|
|
||||||
Emitter<String> testEmitter;
|
|
||||||
|
|
||||||
@GET
|
|
||||||
public Eintrittskarte hello(@QueryParam("code") String code) throws ExecutionException, InterruptedException, TimeoutException {
|
|
||||||
Eintrittskarte ticket = new Eintrittskarte();
|
|
||||||
ticket.setId((int) (Math.random() * Integer.MAX_VALUE));
|
|
||||||
ticket.setCode(code);
|
|
||||||
testEmitter.send(ticket.toString()).toCompletableFuture().get(10, TimeUnit.SECONDS);
|
|
||||||
System.out.println("Sent " + ticket);
|
|
||||||
return ticket;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
@@ -0,0 +1,6 @@
|
|||||||
|
package de.infinimotion.persistence;
|
||||||
|
|
||||||
|
@FunctionalInterface
|
||||||
|
public interface ThrowingFunction<T, R, E extends Exception> {
|
||||||
|
R apply(T t) throws E;
|
||||||
|
}
|
||||||
@@ -1,7 +1,22 @@
|
|||||||
quarkus.http.port=8081
|
quarkus.http.port=7081
|
||||||
|
|
||||||
|
quarkus.index-dependency.model-persistence.group-id=de.infinimotion
|
||||||
|
quarkus.index-dependency.model-persistence.artifact-id=model-persistence
|
||||||
|
|
||||||
%dev.kafka.bootstrap.servers=localhost:19092
|
%dev.kafka.bootstrap.servers=localhost:19092
|
||||||
|
kafka.bootstrap.servers=redpanda-0:9092
|
||||||
|
|
||||||
kafka.security.protocol=SASL_PLAINTEXT
|
kafka.security.protocol=SASL_PLAINTEXT
|
||||||
kafka.sasl.mechanism=SCRAM-SHA-256
|
kafka.sasl.mechanism=SCRAM-SHA-256
|
||||||
kafka.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="im-admin" password="a552855c0d842e90895121cf614c31f950086cab";
|
kafka.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="im-admin" password="a552855c0d842e90895121cf614c31f950086cab";
|
||||||
kafka.bootstrap.servers=redpanda-0:9092
|
|
||||||
mp.messaging.outgoing.test.connector=smallrye-kafka
|
mp.messaging.incoming.command.connector=smallrye-kafka
|
||||||
|
mp.messaging.incoming.command.auto.offset.reset=earliest
|
||||||
|
|
||||||
|
mp.messaging.outgoing.command-replies.connector=smallrye-kafka
|
||||||
|
|
||||||
|
quarkus.hibernate-orm.schema-management.strategy=update
|
||||||
|
quarkus.datasource.db-kind=postgresql
|
||||||
|
quarkus.datasource.username=postgres
|
||||||
|
quarkus.datasource.password=a552855c0d842e90895121cf614c31f950086cab
|
||||||
|
quarkus.datasource.jdbc.url=jdbc:postgresql://localhost:5432/postgres
|
||||||
Reference in New Issue
Block a user