阿里云消息队列 RocketMQ 5.x版- springboot JAVA客户端接入方式 Published on Jul 17, 2024 in 随笔 with 0 comment 主要是公司为了高可用方案 使用的是阿里云的RocketMQ 5.x版 公司用的spring cloud alibaba 那一套 架构 不过阿里云的mq不会自动创建 topic和group 需要在控制台手动创建 不然会报错  下面配置中的实例账户密码需要在这里查看  首先在项目中引入 ``` 2.3.0 org.apache.rocketmq rocketmq-spring-boot-starter ${rocketmq-version} fastjson com.alibaba slf4j-api org.slf4j ``` nacos配置: ``` rocketmq: #如果使用当前Demo访问阿里云RocketMQ 4.0系列实例,接入点应该是类似这样的格式 http://MQ_INST_XXX:xxx,注意!!!一定要有http协议头 #如果使用当前Demo访问阿里云RocketMQ 5.0系列实例,接入点应该是类似这样的格式 rmq-cn-xxx.xx:xxx,注意!!!一定不要自己添加http协议头 name-server: z9x07.cn-hangzhou.rmq.aliyuncs.com:8080 consumer: #如果使用阿里云RocketMQ 5.0系列实例,请设置实例详情页获取的实例用户名,不要设置阿里云账号的AccessKeyId。 access-key: L6JJ6Yy2xxx #如果使用阿里云RocketMQ 5.0系列实例,请设置实例详情页获取的实例密码,不要设置阿里云账号的AccessKeySecret。 secret-key: 582F4nF1xx # 注意,4.0实例需要在控制台实例详情中查看是否有命名空间,无命名空间则不需要填写此项 instance-name: rmq-cn-x0r37 producer: #如果使用阿里云RocketMQ 5.0系列实例,请设置实例详情页获取的实例用户名,不要设置阿里云账号的AccessKeyId。 access-key: L6JJ6Yy2xxx #如果使用阿里云RocketMQ 5.0系列实例,请设置实例详情页获取的实例密码,不要设置阿里云账号的AccessKeySecret。 secret-key: 582F4nF1xx group: lili_group send-message-timeout: 30000 ``` 消费端: ```java package com.honghan.im.listener; import cn.hutool.json.JSONUtil; import com.honghan.im.config.GoodsTagsEnum; import com.honghan.im.domain.FootPrint; import com.honghan.im.service.impl.BusinessServiceImpl; import com.honghan.im.service.FootprintService; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * 商品消息 * * @author LinXin * @since 2024/6/2114:46 **/ @Component @Slf4j @RocketMQMessageListener(topic = "${lili.data.rocketmq.goods-topic}", consumerGroup = "${lili.data.rocketmq.goods-group}") public class GoodsMessageListener implements RocketMQListener { private static final int BATCH_SIZE = 10; /** * 用户足迹 */ @Autowired private FootprintService footprintService; @Autowired private BusinessServiceImpl businessService; @Override // @RetryOperation public void onMessage(MessageExt messageExt) { switch (GoodsTagsEnum.valueOf(messageExt.getTags())) { //查看商品 case VIEW_GOODS: FootPrint footPrint = JSONUtil.toBean(new String(messageExt.getBody()), FootPrint.class); footprintService.saveFootprint(footPrint); break; //商品评价 case GOODS_COMMENT_COMPLETE: businessService.updateGoodsCommentNum(new String(messageExt.getBody())); break; default: log.error("商品执行异常:{}", new String(messageExt.getBody())); break; } } } ``` 生产端: ```java package com.honghan.im.dubbo; import com.honghan.common.core.config.RocketmqCustomProperties; import com.honghan.im.api.RemoteIMRocketMQService; import com.honghan.im.config.GoodsTagsEnum; import com.honghan.common.core.config.RocketmqSendCallbackBuilder; import com.honghan.im.domain.FootPrint; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.dubbo.config.annotation.DubboService; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Service; @RequiredArgsConstructor @Service @DubboService @Slf4j public class RemoteIMRocketMQServiceImpl implements RemoteIMRocketMQService { /** * rocketMq */ @Autowired private RocketMQTemplate rocketMQTemplate; /** * rocketMq配置 */ @Autowired private RocketmqCustomProperties rocketmqCustomProperties; @Autowired private ApplicationEventPublisher applicationEventPublisher; @Override public void asyncSend(Long userId, Long shopId, Long goodsId, String skuId) { FootPrint footPrint = new FootPrint(userId+"", shopId+"", goodsId+"", skuId+""); String destination = rocketmqCustomProperties.getGoodsTopic() + ":" + GoodsTagsEnum.VIEW_GOODS.name(); rocketMQTemplate.asyncSend(destination, footPrint, RocketmqSendCallbackBuilder.commonCallback()); } @Override public void publishEvent(String text) { String destination = rocketmqCustomProperties.getGoodsTopic() + ":" + GoodsTagsEnum.GOODS_COMMENT_COMPLETE.name(); //发送订单变更mq消息 rocketMQTemplate.asyncSend(destination, text, RocketmqSendCallbackBuilder.commonCallback()); //rocketMQTemplate.asyncSend( rocketmqCustomProperties.getGoodsTopic(), GoodsTagsEnum.GOODS_COMMENT_COMPLETE.name(), text))); // applicationEventPublisher.publishEvent(new TransactionCommitSendMQEvent("同步商品评价消息", // rocketmqCustomProperties.getGoodsTopic(), GoodsTagsEnum.GOODS_COMMENT_COMPLETE.name(), // text)); } } ``` 本文由 admin 创作,采用 知识共享署名4.0 国际许可协议进行许可。本站文章除注明转载/出处外,均为本站原创或翻译,转载前请务必署名。