Skip to content

RocketMQ 之集成 SpringBoot

Github: https://github.com/apache/rocketmq-spring

SpringBoot Sample: https://github.com/apache/rocketmq-spring/tree/master/rocketmq-spring-boot-samples

SpringBoot AutoConfiguration

  • org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration
  • org.apache.rocketmq.spring.autoconfigure.RocketMQProperties
xml
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>${RELEASE.VERSION}</version>
</dependency>

rocketmq-spring-boot-starter 最新版本:

produce

参考:https://gitee.com/mengweijin/learning/tree/master/rocketmq-produce

yaml
rocketmq:
  name-server: localhost:9876
  producer:
    group: my-group1
java
package com.github.mengweijin.learning.rocketmq.produce.service;

import com.github.mengweijin.learning.rocketmq.produce.dto.UserDTO;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.util.MimeTypeUtils;

/**
 * @author mengweijin
 */
@Service
public class ProduceService {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    private static final String STRING_TOPIC = "string-topic";
    private static final String USER_TOPIC = "user-topic";

    public void sendMsg() {
        // Send string
        SendResult sendResult = rocketMQTemplate.syncSend(STRING_TOPIC, "Hello, World!");
        System.out.printf("syncSend1 to topic %s sendResult=%s %n", STRING_TOPIC, sendResult);

        sendResult = rocketMQTemplate.syncSend(USER_TOPIC, new UserDTO().setAge(18).setName("Kitty"));
        System.out.printf("syncSend1 to topic %s sendResult=%s %n", USER_TOPIC, sendResult);

        sendResult = rocketMQTemplate.syncSend(USER_TOPIC, MessageBuilder.withPayload(
                new UserDTO().setAge(21).setName("Lester")).setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON_VALUE).build());
        System.out.printf("syncSend1 to topic %s sendResult=%s %n", USER_TOPIC, sendResult);

        // Send string with spring Message
        sendResult = rocketMQTemplate.syncSend(STRING_TOPIC, MessageBuilder.withPayload("Hello, World! I'm from spring message").build());
        System.out.printf("syncSend2 to topic %s sendResult=%s %n", STRING_TOPIC, sendResult);

        // 异步发送
        // Send user-defined object
        rocketMQTemplate.asyncSend(USER_TOPIC, new UserDTO().setAge(25).setName("Tom"), new SendCallback() {
            @Override
            public void onSuccess(SendResult var1) {
                System.out.printf("async onSucess SendResult=%s %n", var1);
            }
            @Override
            public void onException(Throwable var1) {
                System.out.printf("async onException Throwable=%s %n", var1);
            }
        });

        // Send message with special tag     // tag0 will not be consumer-selected
        rocketMQTemplate.convertAndSend(STRING_TOPIC + ":tag0", "I'm from tag0");
        System.out.printf("syncSend topic %s tag %s %n", STRING_TOPIC, "tag0");
        rocketMQTemplate.syncSend(STRING_TOPIC + ":tag1", "I'm from tag1");
        System.out.printf("syncSend topic %s tag %s %n", STRING_TOPIC, "tag1");
    }
}

consume

参考:https://gitee.com/mengweijin/learning/tree/master/rocketmq-consume

yaml
rocketmq:
  name-server: localhost:9876
  consumer:
    group: my-group1
    topic: string-topic
java
@Service
@RocketMQMessageListener(topic = "string-topic", consumerGroup = "string_Tag0_consumer-Group", selectorExpression = "tag0")
// @RocketMQMessageListener(topic = "${demo.rocketmq.topic}", consumerGroup = "string_consumer", selectorExpression = "${demo.rocketmq.tag}")
public class StringTag0Consumer implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        System.out.printf("------- StringTag0Consumer received: %s \n", message);
    }
}

@Service
@RocketMQMessageListener(topic = "string-topic", consumerGroup = "string_All_Tag_consumer-Group")
// @RocketMQMessageListener(topic = "${demo.rocketmq.topic}", consumerGroup = "string_consumer", selectorExpression = "${demo.rocketmq.tag}")
public class StringAllTagConsumer implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        System.out.printf("------- StringAllTagConsumer received: %s \n", message);
    }
}

@Service
@RocketMQMessageListener(nameServer = "localhost:9876", topic = "user-topic", consumerGroup = "user_consumer-Group")
// @RocketMQMessageListener(nameServer = "${demo.rocketmq.myNameServer}", topic = "${demo.rocketmq.topic.user}", consumerGroup = "user_consumer")
public class UserConsumer implements RocketMQListener<UserDTO> {
    @Override
    public void onMessage(UserDTO message) {
        System.out.printf("######## user_consumer received: %s ; age: %s ; name: %s \n", message, message.getAge(), message.getName());
    }
}