使用Quarkus和AMQ Online的Red Hat OpenShift基于云的消息传递

大家好!他在这里-我们来自Quarkus系列的最后一篇文章! (顺便说一下,看我们的网络研讨会“这是Quarkus - Kubernetes本地Java框架我们将展示如何从头开始或转让现成的解决方案开始)



前面的文章中,我们看了一下,使用它可以量化为现代化的Java的结果作出的改进相应的工具应用程序。

从版本0.17.0开始,Quarkus支持使用高级消息队列协议(AMQP),该协议是在应用程序或组织之间传输业务消息的开放标准。

红帽AMQ在线服务是基于开放的EnMasse项目构建的服务,并基于红帽OpenShift平台实现了消息传递机制。要了解有关其工作原理的更多信息,请参阅此处(至EN)。今天,我们展示了如何结合使用AMQ Online和Quarkus使用两种与消息处理相关的新技术来构建基于OpenShift的现代消息传递系统。

假定您已经在OpenShift平台上部署了AMQ Online(如果没有,请参阅安装指南)。

首先,我们将创建Quarkus应用程序,它将是一个使用反应式消息传递的简单订单处理系统。该应用程序将包括一个以固定间隔将订单发送到消息队列的订单生成器,以及一个将处理来自队列的消息并生成可在浏览器中查看的确认的订单处理器。

创建应用程序后,我们将展示如何在其中实现消息传递系统配置以及如何使用AMQ Online初始化此系统上所需的资源。

Quarkus应用


我们的Quarkus应用程序在OpenShift上运行,并且是amqp-quickstart的修改版本可以在这里找到客户端的完整示例

订单生成器


生成器每5秒简单地单调地将增长的订单标识符发送到“订单”地址。

@ApplicationScoped
public class OrderGenerator {
 
    private int orderId = 1;
 
    @Outgoing("orders")
    public Flowable<Integer> generate() {
        return Flowable.interval(5, TimeUnit.SECONDS)
        .map(tick -> orderId++);
    }
}

订单处理程序


订单处理器甚至更简单,它只是将确认标识符返回到地址“ confirmations”。

@ApplicationScoped
public class OrderProcessor {
    @Incoming("orders")
    @Outgoing("confirmations")
    public Integer process(Integer order) {
        //       <img draggable="false" class="emoji" alt=":-)" src="https://s.w.org/images/core/emoji/11.2.0/svg/1f642.svg">
        return order * 2;
    }
}

确认资源


确认资源是HTTP端点,用于列出我们的应用程序生成的确认。

@Path("/confirmations")
public class ConfirmationResource {
 
    @Inject
    @Stream("confirmations") Publisher<Integer> orders;
 
    @GET
    @Produces(MediaType.TEXT_PLAIN)
    public String hello() {
        return "hello";
    }
 
 
    @GET
    @Path("/stream")
    @Produces(MediaType.SERVER_SENT_EVENTS)
    public Publisher<Integer> stream() {
        return orders;
    }
}

客制化


要连接到AMQ Online,我们的应用程序将需要一些配置数据,即:Quarkus连接器配置,AMQP端点信息和客户端凭据。当然,最好将所有配置数据都保存在一个地方,但是我们将特别分离它们,以显示用于设置Quarkus应用程序的可能选项。

连接器


可以在编译阶段使用应用程序属性文件提供连接器配置:

mp.messaging.outgoing.orders.connector=smallrye-amqp
mp.messaging.incoming.orders.connector=smallrye-amqp

为了不使问题复杂化,我们仅将消息队列用于“订单”地址。应用程序中的地址“确认”将使用内存中的队列。

端点AMQP


在编译阶段,AMQP端点的主机名和端口号未知,因此需要实现它们。可以在由AMQ Online创建的configmap中设置端点,因此我们将通过应用清单中的环境变量来定义它们:

spec:
  template:
    spec:
      containers:
      - env:
        - name: AMQP_HOST
          valueFrom:
            configMapKeyRef:
              name: quarkus-config
              key: service.host
        - name: AMQP_PORT
          valueFrom:
            configMapKeyRef:
              name: quarkus-config
              key: service.port.amqp

证书


服务帐户令牌可用于在OpenShift中验证我们的应用程序。为此,您必须首先创建一个自定义ConfigSource,它将从pod文件系统中读取身份验证令牌:

public class MessagingCredentialsConfigSource implements ConfigSource {
    private static final Set<String> propertyNames;
 
    static {
        propertyNames = new HashSet<>();
        propertyNames.add("amqp-username");
        propertyNames.add("amqp-password");
    }
 
    @Override
    public Set<String> getPropertyNames() {
        return propertyNames;
    }
 
    @Override
    public Map<String, String> getProperties() {
        try {
            Map<String, String> properties = new HashMap<>();
            properties.put("amqp-username", "@@serviceaccount@@");
            properties.put("amqp-password", readTokenFromFile());
            return properties;
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }
 
    @Override
    public String getValue(String key) {
        if ("amqp-username".equals(key)) {
            return "@@serviceaccount@@";
        }
        if ("amqp-password".equals(key)) {
            try {
                return readTokenFromFile();
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        }
        return null;
    }
 
    @Override
    public String getName() {
        return "messaging-credentials-config";
    }
 
    private static String readTokenFromFile() throws IOException {
        return new String(Files.readAllBytes(Paths.get("/var/run/secrets/kubernetes.io/serviceaccount/token")), StandardCharsets.UTF_8);
    }
}

组装和部署应用程序


由于必须将应用程序编译为可执行文件,因此需要GraalVM虚拟机。有关如何为此设置环境的更多信息,请参见Quarkus Guide中的相关说明

然后,按照那里的说明进行操作,您需要下载源代码,构建并部署我们的应用程序:

git clone https://github.com/EnMasseProject/enmasse-example-clients
cd enmasse-example-clients/quarkus-example-client
oc new-project myapp
mvn -Pnative -Dfabric8.mode=openshift -Dfabric8.build.strategy=docker package fabric8:build fabric8:resource fabric8:apply

执行这些命令后,将部署应用程序,但直到我们在AMQ Online中配置必要的消息资源后,该应用程序才能启动。

讯息设定


现在,剩下的就是设置消息传递系统中我们的应用程序所需的资源。为此,创建:1)地址空间以初始化消息传递系统的端点;2)地址,用于配置我们在应用程序中使用的地址;3)消息传递系统的用户设置客户端的凭据。

地址空间


AMQ Online中的AddressSpace对象是一组地址,这些地址共享连接端点以及身份验证和授权策略。创建地址空间时,可以指定如何提供消息传递系统的端点:

apiVersion: enmasse.io/v1beta1
kind: AddressSpace
metadata:
  name: quarkus-example
spec:
  type: brokered
  plan: brokered-single-broker
  endpoints:
  - name: messaging
    service: messaging
    exports:
    - name: quarkus-config
      kind: configmap

地址


地址用于发送和接收消息。每个地址都有一个定义其语义的类型,以及一个设置保留资源数量的计划。例如,可以这样确定地址:

apiVersion: enmasse.io/v1beta1
kind: Address
metadata:
  name: quarkus-example.orders
spec:
  address: orders
  type: queue
  plan: brokered-queue

讯息使用者


若要将消息发送和接收到只有可信应用程序可以发送到您的地址的消息,必须在消息传递系统中创建一个用户。对于在群集上运行的应用程序,可以使用OpenShift服务帐户对客户端进行身份验证。例如,可以定义用户“ serviceaccount”:

apiVersion: user.enmasse.io/v1beta1
kind: MessagingUser
metadata:
  name: quarkus-example.app
spec:
  username: system:serviceaccount:myapp:default
  authentication:
    type: serviceaccount
  authorization:
  - operations: ["send", "recv"]
    addresses: ["orders"]

定制应用程序的权限


为了使AMQ Online创建用于实现AMQP端点信息的configmap,您必须指定角色和角色绑定(Role和RoleBinding):

---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  name: quarkus-config
spec:
  rules:
  - apiGroups: [ "" ]
    resources: [ "configmaps" ]
    verbs: [ "create" ]
  - apiGroups: [ "" ]
    resources: [ "configmaps" ]
    resourceNames: [ "quarkus-config" ]
    verbs: [ "get", "update", "patch" ]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: quarkus-config
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: Role
  name: quarkus-config
subjects:
- kind: ServiceAccount
  name: address-space-controller
  namespace: amq-online-infra

如何套用设定


您可以这样应用消息传递配置:

cd enmasse-example-clients/quarkus-example-client
oc project myapp
oc apply -f src/main/resources/k8s/addressspace
oc apply -f src/main/resources/k8s/address

应用验证


为了确保应用程序已启动,首先,我们将检查相应的地址是否已创建并处于活动状态:

until [[ `oc get address quarkus-example.prices -o jsonpath='{.status.phase}'` == "Active" ]]; do echo "Not yet ready"; sleep 5; done

然后检查应用程序路由URL(只需在浏览器中打开此地址):

echo "http://$(oc get route quarkus-example-client -o jsonpath='{.spec.host}')/prices.html"

在浏览器中应该可以看到,随着AMQ Online发送和接收消息,票证会定期更新。

总结一下


因此,我们编写了一个使用AMQP进行消息传递的Quarkus应用程序,将该应用程序设置为可在Red Hat OpenShift平台上运行,并且还基于AMQ Online配置实现了其配置。然后,我们创建了初始化应用程序消息传递系统所必需的清单。

Quarkus系列到此结束,但是还有很多新的有趣的事情,请留在我们身边!

All Articles