Red Hat OpenShift Cloud-basiertes Messaging mit Quarkus und AMQ Online

Hallo alle zusammen! Hier ist er - unser letzter Beitrag aus der Quarkus-Serie! ( Weitere Informationen finden Sie in unserem Webinar „Dies ist das native Java-Framework von Quarkus - Kubernetes“. Wir werden zeigen, wie Sie bei Null anfangen oder vorgefertigte Lösungen übertragen können.)



In einem früheren Beitrag haben wir uns die geeigneten Tools angesehen, mit denen Sie die Verbesserungen quantifizieren können, die durch die Modernisierung von Java erzielt wurden Anwendungen.

Ab Version 0.17.0 unterstützt Quarkus die Verwendung des Advanced Message Queuing Protocol ( AMQP ), eines offenen Standards für die Übertragung von Geschäftsnachrichten zwischen Anwendungen oder Organisationen.

Red Hat AMQ Online ist ein Dienst, der auf dem offenen EnMasse- Projekt basiert und einen Messaging-Mechanismus implementiert, der auf der Red Hat OpenShift- Plattform basiert . Weitere Informationen zur Funktionsweise finden Sie unter Hier (zu EN) . Heute werden wir zeigen, wie AMQ Online und Quarkus kombiniert werden können, um ein modernes OpenShift-basiertes Messaging-System mit zwei neuen Technologien für die Nachrichtenverarbeitung aufzubauen.

Es wird davon ausgegangen, dass Sie AMQ Online bereits auf der OpenShift-Plattform bereitgestellt haben (falls nicht, lesen Sie das Installationshandbuch ).

Zunächst erstellen wir die Quarkus-Anwendung, bei der es sich um ein einfaches Auftragsabwicklungssystem mit reaktivem Messaging handelt. Diese Anwendung enthält einen Auftragsgenerator, der Aufträge mit einem festen Intervall an die Nachrichtenwarteschlange sendet, sowie einen Auftragsprozessor, der Nachrichten aus der Warteschlange verarbeitet und Bestätigungen generiert, die im Browser angezeigt werden können.

Nach dem Erstellen der Anwendung zeigen wir, wie die Konfiguration des Messagingsystems darin implementiert wird, und verwenden AMQ Online, um die auf diesem System benötigten Ressourcen zu initialisieren.

Quarkus App


Unsere Quarkus-Anwendung läuft unter OpenShift und ist eine modifizierte Version von amqp-quickstart . Ein vollständiges Beispiel der Client-Seite finden Sie hier .

Generator bestellen


Der Generator sendet alle 5 Sekunden einfach monoton wachsende Auftragskennungen an die Adresse „Aufträge“.

@ApplicationScoped
public class OrderGenerator {
 
    private int orderId = 1;
 
    @Outgoing("orders")
    public Flowable<Integer> generate() {
        return Flowable.interval(5, TimeUnit.SECONDS)
        .map(tick -> orderId++);
    }
}

Bestellabwickler


Der Auftragsabwickler ist noch einfacher, er gibt lediglich die Bestätigungskennung an die Adresse „Bestätigungen“ zurück.

@ApplicationScoped
public class OrderProcessor {
    @Incoming("orders")
    @Outgoing("confirmations")
    public Integer process(Integer order) {
        //       <img draggable="false" class="emoji" alt=":-)" src="https://s.w.org/images/core/emoji/11.2.0/svg/1f642.svg">
        return order * 2;
    }
}

Bestätigungsressourcen


Eine Bestätigungsressource ist ein HTTP-Endpunkt zum Auflisten der von unserer Anwendung generierten Bestätigungen.

@Path("/confirmations")
public class ConfirmationResource {
 
    @Inject
    @Stream("confirmations") Publisher<Integer> orders;
 
    @GET
    @Produces(MediaType.TEXT_PLAIN)
    public String hello() {
        return "hello";
    }
 
 
    @GET
    @Path("/stream")
    @Produces(MediaType.SERVER_SENT_EVENTS)
    public Publisher<Integer> stream() {
        return orders;
    }
}

Anpassung


Um eine Verbindung zu AMQ Online herzustellen, benötigt unsere Anwendung einige Konfigurationsdaten, nämlich: Quarkus-Connector-Konfiguration, AMQP-Endpunktinformationen und Client-Anmeldeinformationen. Es ist natürlich besser, alle Konfigurationsdaten an einem Ort zu speichern, aber wir werden sie speziell trennen, um mögliche Optionen zum Einrichten der Quarkus-Anwendung aufzuzeigen.

Anschlüsse


Die Connector-Konfiguration kann in der Kompilierungsphase mithilfe der Anwendungseigenschaftendatei bereitgestellt werden:

mp.messaging.outgoing.orders.connector=smallrye-amqp
mp.messaging.incoming.orders.connector=smallrye-amqp

Um dies nicht zu erschweren, verwenden wir die Nachrichtenwarteschlange nur für die Adresse "Bestellungen". Und die Adresse "Bestätigungen" in unserer Anwendung verwendet die Warteschlange im Speicher.

Endpunkt AMQP


In der Kompilierungsphase sind der Hostname und die Portnummer für den AMQP-Endpunkt unbekannt, daher müssen sie implementiert werden. Der Endpunkt kann in der Konfigurationskarte festgelegt werden, die von AMQ Online erstellt wird. Daher definieren wir sie über die Umgebungsvariablen im Anwendungsmanifest:

spec:
  template:
    spec:
      containers:
      - env:
        - name: AMQP_HOST
          valueFrom:
            configMapKeyRef:
              name: quarkus-config
              key: service.host
        - name: AMQP_PORT
          valueFrom:
            configMapKeyRef:
              name: quarkus-config
              key: service.port.amqp

Referenzen


Mit dem Dienstkonto-Token können Sie unsere Anwendung in OpenShift authentifizieren. Dazu müssen Sie zuerst eine benutzerdefinierte ConfigSource erstellen, die das Authentifizierungstoken aus dem Pod-Dateisystem liest:

public class MessagingCredentialsConfigSource implements ConfigSource {
    private static final Set<String> propertyNames;
 
    static {
        propertyNames = new HashSet<>();
        propertyNames.add("amqp-username");
        propertyNames.add("amqp-password");
    }
 
    @Override
    public Set<String> getPropertyNames() {
        return propertyNames;
    }
 
    @Override
    public Map<String, String> getProperties() {
        try {
            Map<String, String> properties = new HashMap<>();
            properties.put("amqp-username", "@@serviceaccount@@");
            properties.put("amqp-password", readTokenFromFile());
            return properties;
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }
 
    @Override
    public String getValue(String key) {
        if ("amqp-username".equals(key)) {
            return "@@serviceaccount@@";
        }
        if ("amqp-password".equals(key)) {
            try {
                return readTokenFromFile();
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        }
        return null;
    }
 
    @Override
    public String getName() {
        return "messaging-credentials-config";
    }
 
    private static String readTokenFromFile() throws IOException {
        return new String(Files.readAllBytes(Paths.get("/var/run/secrets/kubernetes.io/serviceaccount/token")), StandardCharsets.UTF_8);
    }
}

Zusammenstellung und Bereitstellung der Anwendung


Da die Anwendung in eine ausführbare Datei kompiliert werden muss, ist eine virtuelle GraalVM-Maschine erforderlich. Weitere Informationen zum Einrichten einer Umgebung hierfür finden Sie in den entsprechenden Anweisungen im Quarkus-Handbuch .

Befolgen Sie dann die dortigen Anweisungen, um die Quelle herunterzuladen, unsere Anwendung zu erstellen und bereitzustellen:

git clone https://github.com/EnMasseProject/enmasse-example-clients
cd enmasse-example-clients/quarkus-example-client
oc new-project myapp
mvn -Pnative -Dfabric8.mode=openshift -Dfabric8.build.strategy=docker package fabric8:build fabric8:resource fabric8:apply

Nach diesen Befehlen wird die Anwendung bereitgestellt, jedoch erst gestartet, wenn die erforderlichen Messaging-Ressourcen in AMQ Online konfiguriert wurden.

Messaging-Setup


Jetzt müssen die Ressourcen festgelegt werden, die unsere Anwendung im Messagingsystem benötigt. Erstellen Sie dazu: 1) den Adressraum zum Initialisieren des Endpunkts des Nachrichtensystems; 2) die Adresse zum Konfigurieren der Adressen, die wir in der Anwendung verwenden; 3) der Benutzer des Nachrichtensystems, um die Anmeldeinformationen des Clients festzulegen.

Adressraum


Das AddressSpace-Objekt in AMQ Online ist eine Gruppe von Adressen, die Verbindungsendpunkte sowie Authentifizierungs- und Autorisierungsrichtlinien gemeinsam nutzen. Beim Erstellen eines Adressraums können Sie angeben, wie die Endpunkte des Nachrichtensystems bereitgestellt werden sollen:

apiVersion: enmasse.io/v1beta1
kind: AddressSpace
metadata:
  name: quarkus-example
spec:
  type: brokered
  plan: brokered-single-broker
  endpoints:
  - name: messaging
    service: messaging
    exports:
    - name: quarkus-config
      kind: configmap

Adressen


Adressen werden zum Senden und Empfangen von Nachrichten verwendet. Jede Adresse hat einen Typ, der ihre Semantik definiert, sowie einen Plan, der die Anzahl der reservierten Ressourcen festlegt. Die Adresse kann zum Beispiel folgendermaßen bestimmt werden:

apiVersion: enmasse.io/v1beta1
kind: Address
metadata:
  name: quarkus-example.orders
spec:
  address: orders
  type: queue
  plan: brokered-queue

Messaging-Benutzer


Um Nachrichten an Ihre Adressen zu senden und zu empfangen, die nur vertrauenswürdige Anwendungen können, müssen Sie einen Benutzer im Nachrichtensystem erstellen. Für Anwendungen, die in einem Cluster ausgeführt werden, können Clients mithilfe eines OpenShift-Dienstkontos authentifiziert werden. Der Benutzer "serviceaccount" kann beispielsweise wie folgt definiert werden:

apiVersion: user.enmasse.io/v1beta1
kind: MessagingUser
metadata:
  name: quarkus-example.app
spec:
  username: system:serviceaccount:myapp:default
  authentication:
    type: serviceaccount
  authorization:
  - operations: ["send", "recv"]
    addresses: ["orders"]

Berechtigungen zum Anpassen der Anwendung


Damit AMQ Online die Konfigurationszuordnung erstellen kann, mit der wir die AMQP-Endpunktinformationen implementiert haben, müssen Sie die Rolle und die Rollenbindung (Role and RoleBinding) angeben:

---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  name: quarkus-config
spec:
  rules:
  - apiGroups: [ "" ]
    resources: [ "configmaps" ]
    verbs: [ "create" ]
  - apiGroups: [ "" ]
    resources: [ "configmaps" ]
    resourceNames: [ "quarkus-config" ]
    verbs: [ "get", "update", "patch" ]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: quarkus-config
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: Role
  name: quarkus-config
subjects:
- kind: ServiceAccount
  name: address-space-controller
  namespace: amq-online-infra

So wenden Sie Konfigurationen an


Sie können die Messaging-Konfiguration folgendermaßen anwenden:

cd enmasse-example-clients/quarkus-example-client
oc project myapp
oc apply -f src/main/resources/k8s/addressspace
oc apply -f src/main/resources/k8s/address

Antragsüberprüfung


Um sicherzustellen, dass die Anwendung gestartet wurde, prüfen wir zunächst, ob die entsprechenden Adressen erstellt und aktiv sind:

until [[ `oc get address quarkus-example.prices -o jsonpath='{.status.phase}'` == "Active" ]]; do echo "Not yet ready"; sleep 5; done

Überprüfen Sie dann die URL der Anwendungsroute (öffnen Sie einfach diese Adresse im Browser):

echo "http://$(oc get route quarkus-example-client -o jsonpath='{.spec.host}')/prices.html"

Im Browser sollte sichtbar sein, dass Tickets regelmäßig aktualisiert werden, wenn Nachrichten von AMQ Online gesendet und empfangen werden.

Zusammenfassen


Daher haben wir eine Quarkus-Anwendung geschrieben, die AMQP für Messaging verwendet, diese Anwendung für die Arbeit auf der Red Hat OpenShift-Plattform eingerichtet und ihre Konfiguration basierend auf der AMQ Online-Konfiguration implementiert. Anschließend haben wir die Manifeste erstellt, die zum Initialisieren des Messagingsystems für unsere Anwendung erforderlich sind.

Damit ist die Quarkus-Serie abgeschlossen, aber es liegen noch viele neue und interessante Dinge vor uns. Bleiben Sie bei uns!

All Articles