• 首页
  • 邻居
  • 关于
  • 归档
  • 搜索
  • 夜间模式
    ©2020-2026  我的学习笔记 Theme by OneBlog

    我的学习笔记博客

    搜索
    标签
    # 随笔 # Java # 教程 # openwrt # Mysql # SQL # 爬虫 # post # Js调优 # MAVEN
  • 首页>
  • 随笔>
  • 正文
  • 阿里云消息队列 RocketMQ 5.x版- springboot JAVA客户端接入方式

    2024年07月17日 1.5 k 阅读 0 评论 5979 字

    主要是公司为了高可用方案 使用的是阿里云的RocketMQ 5.x版

    公司用的spring cloud alibaba 那一套 架构

    不过阿里云的mq不会自动创建 topic和group 需要在控制台手动创建 不然会报错

    下面配置中的实例账户密码需要在这里查看


    首先在项目中引入

     <rocketmq-version>2.3.0</rocketmq-version>
     
     <!-- rocketmq-spring-boot-starter -->
                <dependency>
                    <groupId>org.apache.rocketmq</groupId>
                    <artifactId>rocketmq-spring-boot-starter</artifactId>
                    <version>${rocketmq-version}</version>
                    <exclusions>
                        <exclusion>
                            <artifactId>fastjson</artifactId>
                            <groupId>com.alibaba</groupId>
                        </exclusion>
                        <exclusion>
                            <artifactId>slf4j-api</artifactId>
                            <groupId>org.slf4j</groupId>
                        </exclusion>
                    </exclusions>
                </dependency>

    nacos配置:

    rocketmq:
    #如果使用当前Demo访问阿里云RocketMQ 4.0系列实例,接入点应该是类似这样的格式  http://MQ_INST_XXX:xxx,注意!!!一定要有http协议头
    #如果使用当前Demo访问阿里云RocketMQ 5.0系列实例,接入点应该是类似这样的格式  rmq-cn-xxx.xx:xxx,注意!!!一定不要自己添加http协议头
      name-server: z9x07.cn-hangzhou.rmq.aliyuncs.com:8080
      consumer:
       #如果使用阿里云RocketMQ 5.0系列实例,请设置实例详情页获取的实例用户名,不要设置阿里云账号的AccessKeyId。
       access-key: L6JJ6Yy2xxx
       #如果使用阿里云RocketMQ 5.0系列实例,请设置实例详情页获取的实例密码,不要设置阿里云账号的AccessKeySecret。
       secret-key: 582F4nF1xx
       # 注意,4.0实例需要在控制台实例详情中查看是否有命名空间,无命名空间则不需要填写此项
       instance-name: rmq-cn-x0r37
      producer:
       #如果使用阿里云RocketMQ 5.0系列实例,请设置实例详情页获取的实例用户名,不要设置阿里云账号的AccessKeyId。
       access-key: L6JJ6Yy2xxx
       #如果使用阿里云RocketMQ 5.0系列实例,请设置实例详情页获取的实例密码,不要设置阿里云账号的AccessKeySecret。
       secret-key: 582F4nF1xx
       group: lili_group
       send-message-timeout: 30000

    消费端:

    package com.honghan.im.listener;
    
    import cn.hutool.json.JSONUtil;
    import com.honghan.im.config.GoodsTagsEnum;
    import com.honghan.im.domain.FootPrint;
    import com.honghan.im.service.impl.BusinessServiceImpl;
    import com.honghan.im.service.FootprintService;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.rocketmq.common.message.MessageExt;
    import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
    import org.apache.rocketmq.spring.core.RocketMQListener;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    /**
     * 商品消息
     *
     * @author LinXin
     * @since 2024/6/2114:46
     **/
    @Component
    @Slf4j
    @RocketMQMessageListener(topic = "${lili.data.rocketmq.goods-topic}", consumerGroup = "${lili.data.rocketmq.goods-group}")
    public class GoodsMessageListener implements RocketMQListener<MessageExt> {
    
        private static final int BATCH_SIZE = 10;
    
    
        /**
         * 用户足迹
         */
        @Autowired
        private FootprintService footprintService;
        @Autowired
        private BusinessServiceImpl businessService;
    
        @Override
      //  @RetryOperation
        public void onMessage(MessageExt messageExt) {
    
            switch (GoodsTagsEnum.valueOf(messageExt.getTags())) {
                //查看商品
                case VIEW_GOODS:
                    FootPrint footPrint = JSONUtil.toBean(new String(messageExt.getBody()), FootPrint.class);
                    footprintService.saveFootprint(footPrint);
                    break;   //商品评价
                case GOODS_COMMENT_COMPLETE:
                    businessService.updateGoodsCommentNum(new String(messageExt.getBody()));
                    break;
                default:
                    log.error("商品执行异常:{}", new String(messageExt.getBody()));
                    break;
            }
        }
    }
    

    生产端:

    package com.honghan.im.dubbo;
    
    import com.honghan.common.core.config.RocketmqCustomProperties;
    import com.honghan.im.api.RemoteIMRocketMQService;
    import com.honghan.im.config.GoodsTagsEnum;
    import com.honghan.common.core.config.RocketmqSendCallbackBuilder;
    import com.honghan.im.domain.FootPrint;
    import lombok.RequiredArgsConstructor;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.dubbo.config.annotation.DubboService;
    import org.apache.rocketmq.spring.core.RocketMQTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.ApplicationEventPublisher;
    import org.springframework.context.annotation.Lazy;
    import org.springframework.stereotype.Service;
    
    @RequiredArgsConstructor
    @Service
    @DubboService
    @Slf4j
    public class RemoteIMRocketMQServiceImpl implements RemoteIMRocketMQService {
        /**
         * rocketMq
         */
        @Autowired
        private RocketMQTemplate rocketMQTemplate;
        /**
         * rocketMq配置
         */
        @Autowired
        private RocketmqCustomProperties rocketmqCustomProperties;
    
        @Autowired
        private ApplicationEventPublisher applicationEventPublisher;
    
        @Override
        public void asyncSend(Long userId, Long shopId, Long goodsId, String skuId) {
            FootPrint footPrint = new FootPrint(userId+"", shopId+"", goodsId+"", skuId+"");
            String destination = rocketmqCustomProperties.getGoodsTopic() + ":" + GoodsTagsEnum.VIEW_GOODS.name();
            rocketMQTemplate.asyncSend(destination, footPrint, RocketmqSendCallbackBuilder.commonCallback());
        }
    
        @Override
        public void publishEvent(String text) {
            String destination = rocketmqCustomProperties.getGoodsTopic() + ":" + GoodsTagsEnum.GOODS_COMMENT_COMPLETE.name();
            //发送订单变更mq消息
            rocketMQTemplate.asyncSend(destination, text, RocketmqSendCallbackBuilder.commonCallback());
    
            //rocketMQTemplate.asyncSend( rocketmqCustomProperties.getGoodsTopic(), GoodsTagsEnum.GOODS_COMMENT_COMPLETE.name(), text)));
    
    //        applicationEventPublisher.publishEvent(new TransactionCommitSendMQEvent("同步商品评价消息",
    //            rocketmqCustomProperties.getGoodsTopic(), GoodsTagsEnum.GOODS_COMMENT_COMPLETE.name(),
    //            text));
        }
    }
    
    本文著作权归作者 [ admin ] 享有,未经作者书面授权,禁止转载,封面图片来源于 [ 互联网 ] ,本文仅供个人学习、研究和欣赏使用。如有异议,请联系博主及时处理。
    取消回复

    发表留言
    回复

    首页邻居关于归档
    Copyright©2020-2026  All Rights Reserved.  Load:0.016 s
    京ICP备18019712号
    Theme by OneBlog V3.6.5
    夜间模式

    开源不易,请尊重作者版权,保留基本的版权信息。