Red Hat OpenShift Cloud-Based Messaging Using Quarkus and AMQ Online

Hello everyone! Here he is - our final post from the Quarkus series! (By the way, see our webinar “This is Quarkus - Kubernetes native Java framework . We will show how to start from scratch or transfer ready-made solutions)



In a previous post we looked at the appropriate tools with which you can quantify the improvements made as a result of Java modernization applications.

Starting with version 0.17.0, Quarkus supports the use of the Advanced Message Queuing Protocol ( AMQP ), which is an open standard for transferring business messages between applications or organizations.

Red Hat AMQ Online is a service built on the basis of the open EnMasse project and implements a messaging mechanism based on the Red Hat OpenShift platform . Learn more about how it works, see. Here (to EN) . Today we show how to combine AMQ Online and Quarkus to build a modern messaging system based on OpenShift using two new technologies related to message processing.

It is assumed that you have already deployed AMQ Online on the OpenShift platform (if not, see the installation guide ).

To get started, we will create the Quarkus application, which will be a simple order processing system using reactive messaging. This application will include an order generator that sends orders to the message queue with a fixed interval, as well as an order processor that will process messages from the queue and generate confirmations available for viewing in the browser.

After creating the application, we will show how to implement the messaging system configuration in it and use AMQ Online to initialize the resources we need on this system.

Quarkus app


Our Quarkus application runs on OpenShift and is a modified version of amqp-quickstart . A complete example of the client side can be found here .

Order generator


The generator every 5 seconds simply monotonously sends growing order identifiers to the “orders” address.

@ApplicationScoped
public class OrderGenerator {
 
    private int orderId = 1;
 
    @Outgoing("orders")
    public Flowable<Integer> generate() {
        return Flowable.interval(5, TimeUnit.SECONDS)
        .map(tick -> orderId++);
    }
}

Order handler


The order processor is even simpler, it just returns the confirmation identifier to the address “confirmations”.

@ApplicationScoped
public class OrderProcessor {
    @Incoming("orders")
    @Outgoing("confirmations")
    public Integer process(Integer order) {
        //       <img draggable="false" class="emoji" alt=":-)" src="https://s.w.org/images/core/emoji/11.2.0/svg/1f642.svg">
        return order * 2;
    }
}

Confirmation Resources


A confirmation resource is an HTTP endpoint for listing the confirmations generated by our application.

@Path("/confirmations")
public class ConfirmationResource {
 
    @Inject
    @Stream("confirmations") Publisher<Integer> orders;
 
    @GET
    @Produces(MediaType.TEXT_PLAIN)
    public String hello() {
        return "hello";
    }
 
 
    @GET
    @Path("/stream")
    @Produces(MediaType.SERVER_SENT_EVENTS)
    public Publisher<Integer> stream() {
        return orders;
    }
}

Customization


To connect to AMQ Online, our application will need some configuration data, namely: Quarkus connector configuration, AMQP endpoint information, and client credentials. It’s better, of course, to keep all the configuration data in one place, but we will specially separate them to show possible options for setting up the Quarkus application.

Connectors


The connector configuration can be provided at the compilation stage using the application properties file:

mp.messaging.outgoing.orders.connector=smallrye-amqp
mp.messaging.incoming.orders.connector=smallrye-amqp

In order not to complicate this, we will use the message queue only for the “orders” address. And the address “confirmations” in our application will use the queue in memory.

Endpoint AMQP


At the compilation stage, the host name and port number for the AMQP endpoint are unknown, so they need to be implemented. The endpoint can be set in the configmap, which is created by AMQ Online, so we will define them through the environment variables in the application manifest:

spec:
  template:
    spec:
      containers:
      - env:
        - name: AMQP_HOST
          valueFrom:
            configMapKeyRef:
              name: quarkus-config
              key: service.host
        - name: AMQP_PORT
          valueFrom:
            configMapKeyRef:
              name: quarkus-config
              key: service.port.amqp

Credentials


The service account token can be used to authenticate our application in OpenShift. To do this, you must first create a custom ConfigSource that will read the authentication token from the pod file system:

public class MessagingCredentialsConfigSource implements ConfigSource {
    private static final Set<String> propertyNames;
 
    static {
        propertyNames = new HashSet<>();
        propertyNames.add("amqp-username");
        propertyNames.add("amqp-password");
    }
 
    @Override
    public Set<String> getPropertyNames() {
        return propertyNames;
    }
 
    @Override
    public Map<String, String> getProperties() {
        try {
            Map<String, String> properties = new HashMap<>();
            properties.put("amqp-username", "@@serviceaccount@@");
            properties.put("amqp-password", readTokenFromFile());
            return properties;
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }
 
    @Override
    public String getValue(String key) {
        if ("amqp-username".equals(key)) {
            return "@@serviceaccount@@";
        }
        if ("amqp-password".equals(key)) {
            try {
                return readTokenFromFile();
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        }
        return null;
    }
 
    @Override
    public String getName() {
        return "messaging-credentials-config";
    }
 
    private static String readTokenFromFile() throws IOException {
        return new String(Files.readAllBytes(Paths.get("/var/run/secrets/kubernetes.io/serviceaccount/token")), StandardCharsets.UTF_8);
    }
}

Assembly and deployment of the application


Since the application must be compiled into an executable file, a GraalVM virtual machine is required. For more information on how to set up an environment for this, see the related instructions in the Quarkus Guide .

Then, following the instructions there, you need to download the source, build and deploy our application:

git clone https://github.com/EnMasseProject/enmasse-example-clients
cd enmasse-example-clients/quarkus-example-client
oc new-project myapp
mvn -Pnative -Dfabric8.mode=openshift -Dfabric8.build.strategy=docker package fabric8:build fabric8:resource fabric8:apply

After these commands, the application will be deployed, but will not start until we configure the necessary messaging resources in AMQ Online.

Messaging setup


Now it remains to set the resources that our application needs in the messaging system. To do this, create: 1) the address space to initialize the endpoint of the messaging system; 2) the address to configure the addresses that we use in the application; 3) the user of the messaging system to set the credentials of the client.

Address space


The AddressSpace object in AMQ Online is a group of addresses that share connection endpoints, as well as authentication and authorization policies. When creating an address space, you can specify how the endpoints of the messaging system will be provided:

apiVersion: enmasse.io/v1beta1
kind: AddressSpace
metadata:
  name: quarkus-example
spec:
  type: brokered
  plan: brokered-single-broker
  endpoints:
  - name: messaging
    service: messaging
    exports:
    - name: quarkus-config
      kind: configmap

Addresses


Addresses are used to send and receive messages. Each address has a type that defines its semantics, as well as a plan that sets the number of reserved resources. The address can be determined, for example, like this:

apiVersion: enmasse.io/v1beta1
kind: Address
metadata:
  name: quarkus-example.orders
spec:
  address: orders
  type: queue
  plan: brokered-queue

Messaging user


To send and receive messages to your addresses only trusted applications could, you must create a user in the messaging system. For applications running on a cluster, clients can be authenticated using an OpenShift service account. The user "serviceaccount" can be defined, for example, as follows:

apiVersion: user.enmasse.io/v1beta1
kind: MessagingUser
metadata:
  name: quarkus-example.app
spec:
  username: system:serviceaccount:myapp:default
  authentication:
    type: serviceaccount
  authorization:
  - operations: ["send", "recv"]
    addresses: ["orders"]

Permissions for customizing the application


In order for AMQ Online to create the configmap that we used to implement the AMQP endpoint information, you must specify the role and role binding (Role and RoleBinding):

---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  name: quarkus-config
spec:
  rules:
  - apiGroups: [ "" ]
    resources: [ "configmaps" ]
    verbs: [ "create" ]
  - apiGroups: [ "" ]
    resources: [ "configmaps" ]
    resourceNames: [ "quarkus-config" ]
    verbs: [ "get", "update", "patch" ]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: quarkus-config
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: Role
  name: quarkus-config
subjects:
- kind: ServiceAccount
  name: address-space-controller
  namespace: amq-online-infra

How to apply configurations


You can apply the messaging configuration like this:

cd enmasse-example-clients/quarkus-example-client
oc project myapp
oc apply -f src/main/resources/k8s/addressspace
oc apply -f src/main/resources/k8s/address

Application verification


To make sure that the application has started, first of all, we will check if the corresponding addresses are created and active:

until [[ `oc get address quarkus-example.prices -o jsonpath='{.status.phase}'` == "Active" ]]; do echo "Not yet ready"; sleep 5; done

Then check the application route URL (just open this address in the browser):

echo "http://$(oc get route quarkus-example-client -o jsonpath='{.spec.host}')/prices.html"

It should be visible in the browser that tickets are periodically updated as messages are sent and received by AMQ Online.

To summarize


So, we wrote a Quarkus application that uses AMQP for messaging, set up this application to work on the Red Hat OpenShift platform, and also implemented its configuration based on the AMQ Online configuration. Then we created the manifests necessary to initialize the messaging system for our application.

This concludes the Quarkus series, but there are a lot of new and interesting things ahead, stay with us!

All Articles