RocketMQ 之集成 SpringBoot
Github: https://github.com/apache/rocketmq-spring
SpringBoot Sample: https://github.com/apache/rocketmq-spring/tree/master/rocketmq-spring-boot-samples
SpringBoot AutoConfiguration
- org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration
- org.apache.rocketmq.spring.autoconfigure.RocketMQProperties
xml
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>${RELEASE.VERSION}</version>
</dependency>
rocketmq-spring-boot-starter 最新版本:
produce
参考:https://gitee.com/mengweijin/learning/tree/master/rocketmq-produce
yaml
rocketmq:
name-server: localhost:9876
producer:
group: my-group1
java
package com.github.mengweijin.learning.rocketmq.produce.service;
import com.github.mengweijin.learning.rocketmq.produce.dto.UserDTO;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.util.MimeTypeUtils;
/**
* @author mengweijin
*/
@Service
public class ProduceService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
private static final String STRING_TOPIC = "string-topic";
private static final String USER_TOPIC = "user-topic";
public void sendMsg() {
// Send string
SendResult sendResult = rocketMQTemplate.syncSend(STRING_TOPIC, "Hello, World!");
System.out.printf("syncSend1 to topic %s sendResult=%s %n", STRING_TOPIC, sendResult);
sendResult = rocketMQTemplate.syncSend(USER_TOPIC, new UserDTO().setAge(18).setName("Kitty"));
System.out.printf("syncSend1 to topic %s sendResult=%s %n", USER_TOPIC, sendResult);
sendResult = rocketMQTemplate.syncSend(USER_TOPIC, MessageBuilder.withPayload(
new UserDTO().setAge(21).setName("Lester")).setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON_VALUE).build());
System.out.printf("syncSend1 to topic %s sendResult=%s %n", USER_TOPIC, sendResult);
// Send string with spring Message
sendResult = rocketMQTemplate.syncSend(STRING_TOPIC, MessageBuilder.withPayload("Hello, World! I'm from spring message").build());
System.out.printf("syncSend2 to topic %s sendResult=%s %n", STRING_TOPIC, sendResult);
// 异步发送
// Send user-defined object
rocketMQTemplate.asyncSend(USER_TOPIC, new UserDTO().setAge(25).setName("Tom"), new SendCallback() {
@Override
public void onSuccess(SendResult var1) {
System.out.printf("async onSucess SendResult=%s %n", var1);
}
@Override
public void onException(Throwable var1) {
System.out.printf("async onException Throwable=%s %n", var1);
}
});
// Send message with special tag // tag0 will not be consumer-selected
rocketMQTemplate.convertAndSend(STRING_TOPIC + ":tag0", "I'm from tag0");
System.out.printf("syncSend topic %s tag %s %n", STRING_TOPIC, "tag0");
rocketMQTemplate.syncSend(STRING_TOPIC + ":tag1", "I'm from tag1");
System.out.printf("syncSend topic %s tag %s %n", STRING_TOPIC, "tag1");
}
}
consume
参考:https://gitee.com/mengweijin/learning/tree/master/rocketmq-consume
yaml
rocketmq:
name-server: localhost:9876
consumer:
group: my-group1
topic: string-topic
java
@Service
@RocketMQMessageListener(topic = "string-topic", consumerGroup = "string_Tag0_consumer-Group", selectorExpression = "tag0")
// @RocketMQMessageListener(topic = "${demo.rocketmq.topic}", consumerGroup = "string_consumer", selectorExpression = "${demo.rocketmq.tag}")
public class StringTag0Consumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.printf("------- StringTag0Consumer received: %s \n", message);
}
}
@Service
@RocketMQMessageListener(topic = "string-topic", consumerGroup = "string_All_Tag_consumer-Group")
// @RocketMQMessageListener(topic = "${demo.rocketmq.topic}", consumerGroup = "string_consumer", selectorExpression = "${demo.rocketmq.tag}")
public class StringAllTagConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.printf("------- StringAllTagConsumer received: %s \n", message);
}
}
@Service
@RocketMQMessageListener(nameServer = "localhost:9876", topic = "user-topic", consumerGroup = "user_consumer-Group")
// @RocketMQMessageListener(nameServer = "${demo.rocketmq.myNameServer}", topic = "${demo.rocketmq.topic.user}", consumerGroup = "user_consumer")
public class UserConsumer implements RocketMQListener<UserDTO> {
@Override
public void onMessage(UserDTO message) {
System.out.printf("######## user_consumer received: %s ; age: %s ; name: %s \n", message, message.getAge(), message.getName());
}
}