阿里云消息队列 RocketMQ 5.x版- springboot JAVA客户端接入方式
主要是公司为了高可用方案 使用的是阿里云的RocketMQ 5.x版
公司用的spring cloud alibaba 那一套 架构
不过阿里云的mq不会自动创建 topic和group 需要在控制台手动创建 不然会报错
下面配置中的实例账户密码需要在这里查看
首先在项目中引入
<rocketmq-version>2.3.0</rocketmq-version>
<!-- rocketmq-spring-boot-starter -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>${rocketmq-version}</version>
<exclusions>
<exclusion>
<artifactId>fastjson</artifactId>
<groupId>com.alibaba</groupId>
</exclusion>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
nacos配置:
rocketmq:
#如果使用当前Demo访问阿里云RocketMQ 4.0系列实例,接入点应该是类似这样的格式 http://MQ_INST_XXX:xxx,注意!!!一定要有http协议头
#如果使用当前Demo访问阿里云RocketMQ 5.0系列实例,接入点应该是类似这样的格式 rmq-cn-xxx.xx:xxx,注意!!!一定不要自己添加http协议头
name-server: z9x07.cn-hangzhou.rmq.aliyuncs.com:8080
consumer:
#如果使用阿里云RocketMQ 5.0系列实例,请设置实例详情页获取的实例用户名,不要设置阿里云账号的AccessKeyId。
access-key: L6JJ6Yy2xxx
#如果使用阿里云RocketMQ 5.0系列实例,请设置实例详情页获取的实例密码,不要设置阿里云账号的AccessKeySecret。
secret-key: 582F4nF1xx
# 注意,4.0实例需要在控制台实例详情中查看是否有命名空间,无命名空间则不需要填写此项
instance-name: rmq-cn-x0r37
producer:
#如果使用阿里云RocketMQ 5.0系列实例,请设置实例详情页获取的实例用户名,不要设置阿里云账号的AccessKeyId。
access-key: L6JJ6Yy2xxx
#如果使用阿里云RocketMQ 5.0系列实例,请设置实例详情页获取的实例密码,不要设置阿里云账号的AccessKeySecret。
secret-key: 582F4nF1xx
group: lili_group
send-message-timeout: 30000
消费端:
package com.honghan.im.listener;
import cn.hutool.json.JSONUtil;
import com.honghan.im.config.GoodsTagsEnum;
import com.honghan.im.domain.FootPrint;
import com.honghan.im.service.impl.BusinessServiceImpl;
import com.honghan.im.service.FootprintService;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* 商品消息
*
* @author LinXin
* @since 2024/6/2114:46
**/
@Component
@Slf4j
@RocketMQMessageListener(topic = "${lili.data.rocketmq.goods-topic}", consumerGroup = "${lili.data.rocketmq.goods-group}")
public class GoodsMessageListener implements RocketMQListener<MessageExt> {
private static final int BATCH_SIZE = 10;
/**
* 用户足迹
*/
@Autowired
private FootprintService footprintService;
@Autowired
private BusinessServiceImpl businessService;
@Override
// @RetryOperation
public void onMessage(MessageExt messageExt) {
switch (GoodsTagsEnum.valueOf(messageExt.getTags())) {
//查看商品
case VIEW_GOODS:
FootPrint footPrint = JSONUtil.toBean(new String(messageExt.getBody()), FootPrint.class);
footprintService.saveFootprint(footPrint);
break; //商品评价
case GOODS_COMMENT_COMPLETE:
businessService.updateGoodsCommentNum(new String(messageExt.getBody()));
break;
default:
log.error("商品执行异常:{}", new String(messageExt.getBody()));
break;
}
}
}
生产端:
package com.honghan.im.dubbo;
import com.honghan.common.core.config.RocketmqCustomProperties;
import com.honghan.im.api.RemoteIMRocketMQService;
import com.honghan.im.config.GoodsTagsEnum;
import com.honghan.common.core.config.RocketmqSendCallbackBuilder;
import com.honghan.im.domain.FootPrint;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.dubbo.config.annotation.DubboService;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
@RequiredArgsConstructor
@Service
@DubboService
@Slf4j
public class RemoteIMRocketMQServiceImpl implements RemoteIMRocketMQService {
/**
* rocketMq
*/
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* rocketMq配置
*/
@Autowired
private RocketmqCustomProperties rocketmqCustomProperties;
@Autowired
private ApplicationEventPublisher applicationEventPublisher;
@Override
public void asyncSend(Long userId, Long shopId, Long goodsId, String skuId) {
FootPrint footPrint = new FootPrint(userId+"", shopId+"", goodsId+"", skuId+"");
String destination = rocketmqCustomProperties.getGoodsTopic() + ":" + GoodsTagsEnum.VIEW_GOODS.name();
rocketMQTemplate.asyncSend(destination, footPrint, RocketmqSendCallbackBuilder.commonCallback());
}
@Override
public void publishEvent(String text) {
String destination = rocketmqCustomProperties.getGoodsTopic() + ":" + GoodsTagsEnum.GOODS_COMMENT_COMPLETE.name();
//发送订单变更mq消息
rocketMQTemplate.asyncSend(destination, text, RocketmqSendCallbackBuilder.commonCallback());
//rocketMQTemplate.asyncSend( rocketmqCustomProperties.getGoodsTopic(), GoodsTagsEnum.GOODS_COMMENT_COMPLETE.name(), text)));
// applicationEventPublisher.publishEvent(new TransactionCommitSendMQEvent("同步商品评价消息",
// rocketmqCustomProperties.getGoodsTopic(), GoodsTagsEnum.GOODS_COMMENT_COMPLETE.name(),
// text));
}
}