在Spring Boot中使用RocketMQ事务消息进行消息的发送和接收的代码示例(核心配置)
以下是 Spring Boot 中使用 RocketMQ 事务消息的核心配置 + 极简可运行代码示例,聚焦关键配置和核心逻辑,剔除冗余代码,便于快速集成到项目中。
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.3</version>
</dependency>
</dependencies>
rocketmq:
name-server: 127.0.0.1:9876
producer:
group: tx-producer-group
send-message-timeout: 3000
consumer:
group: tx-consumer-group
retry-times-when-consume-failed: 3
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import java.util.UUID;
@Component
public class TxProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void sendTxMsg(String bizId, String content) {
Message<String> txMsg = MessageBuilder
.withPayload(content)
.setHeader(RocketMQHeaders.TRANSACTION_ID, UUID.randomUUID().toString())
.build();
rocketMQTemplate.sendMessageInTransaction(
"tx_topic:tx_tag",
txMsg,
bizId
);
}
@org.apache.rocketmq.spring.annotation.RocketMQTransactionListener(txProducerGroup = "tx-producer-group")
public class TxListener implements org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener {
@Override
public org.apache.rocketmq.spring.core.RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
String bizId = (String) arg;
String content = (String) msg.getPayload();
boolean isSuccess = doLocalBiz(bizId, content);
if (isSuccess) {
return org.apache.rocketmq.spring.core.RocketMQLocalTransactionState.COMMIT;
} else {
return org.apache.rocketmq.spring.core.RocketMQLocalTransactionState.ROLLBACK;
}
} catch (Exception e) {
return org.apache.rocketmq.spring.core.RocketMQLocalTransactionState.UNKNOWN;
}
}
@Override
public org.apache.rocketmq.spring.core.RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
String content = (String) msg.getPayload();
boolean isSuccess = checkLocalBiz(content);
return isSuccess ?
org.apache.rocketmq.spring.core.RocketMQLocalTransactionState.COMMIT :
org.apache.rocketmq.spring.core.RocketMQLocalTransactionState.ROLLBACK;
}
private boolean doLocalBiz(String bizId, String content) {
return true;
}
private boolean checkLocalBiz(String content) {
return true;
}
}
}
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Component
@RocketMQMessageListener(
topic = "tx_topic",
consumerGroup = "tx-consumer-group",
selectorExpression = "tx_tag"
)
public class TxConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String msg) {
try {
if (checkIdempotent(msg)) {
System.out.println("消息已消费,跳过:" + msg);
return;
}
doConsumeBiz(msg);
recordConsumeStatus(msg);
System.out.println("消费成功:" + msg);
} catch (Exception e) {
throw new RuntimeException("消费失败,重试", e);
}
}
private boolean checkIdempotent(String msg) {
return false;
}
private void recordConsumeStatus(String msg) {}
private void doConsumeBiz(String msg) {}
}
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class RocketMQTxApp {
public static void main(String[] args) {
SpringApplication.run(RocketMQTxApp.class, args);
}
}
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
public class TxTest {
@Autowired
private TxProducer txProducer;
@Test
public void testTxMsg() throws InterruptedException {
txProducer.sendTxMsg("BIZ_001", "测试事务消息");
Thread.sleep(5000);
}
}
- 本地 / 服务器已启动 RocketMQ NameServer(9876 端口)和 Broker;
- 核心配置中
name-server 地址需与实际部署一致。
此示例保留了 RocketMQ 事务消息的核心配置和核心逻辑,可直接复制到项目中,替换 doLocalBiz/doConsumeBiz 为实际业务逻辑即可快速集成。
版权声明:
作者:SE_Yang
链接:https://www.cnesa.cn/9921.html
文章版权归作者所有,未经允许请勿转载。
THE END