• 首页
  • 邻居
  • 关于
  • 归档
  • 搜索
  • 夜间模式
    ©2020-2026  我的学习笔记 Theme by OneBlog

    我的学习笔记博客

    搜索
    标签
    # 随笔 # Java # 教程 # openwrt # Mysql # SQL # 爬虫 # post # Js调优 # MAVEN
  • 首页>
  • Java>
  • 正文
  • dubbo自定义泛化调用接口

    2023年05月09日 1.4 k 阅读 0 评论 16332 字

    在做公司业务时,有需求是让server端调用特定的client端
    我是根据dubbo.application.name来判断的

    package com.jinw.utils.rpc;
    
    import cn.hutool.core.util.ObjectUtil;
    import cn.hutool.core.util.StrUtil;
    import com.jinw.constant.ApiConstant;
    import org.apache.dubbo.config.ReferenceConfig;
    import org.apache.dubbo.config.RegistryConfig;
    
    import java.util.Iterator;
    import java.util.LinkedHashMap;
    import java.util.Map;
    import java.util.concurrent.Executors;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.concurrent.TimeUnit;
    
    /**
     * @Author @liuxin
     * @Description dubbo rpc调用缓存类
     * @Date 2023/4/20 16:47
     */
    public class DubboReferenceCache {
    
        private static final Map<String, CacheEntry> referenceConfigMap = new LinkedHashMap<>(16, 0.75f, true);
        private static final int MAX_CACHE_SIZE = 1000;
        private static final int CACHE_EXPIRE_TIME = 1800;
        private static final ScheduledExecutorService CLEANUP_EXECUTOR = Executors.newSingleThreadScheduledExecutor();
    
        static {
            CLEANUP_EXECUTOR.scheduleWithFixedDelay(new Runnable() {
                @Override
                public void run() {
                    synchronized (referenceConfigMap) {
                        long now = System.currentTimeMillis();
                        for (Iterator<Map.Entry<String, CacheEntry>> iter = referenceConfigMap.entrySet().iterator(); iter.hasNext(); ) {
                            Map.Entry<String, CacheEntry> entry = iter.next();
                            if (referenceConfigMap.size() > MAX_CACHE_SIZE || (now - entry.getValue().getCreatedTimestamp()) > (CACHE_EXPIRE_TIME * 1000)) {
                                iter.remove();
                            } else {
                                break;
                            }
                        }
                    }
                }
            }, 1, 1, TimeUnit.MINUTES);
        }
    
        public static <T> T getReference(Class<T> clazz, RegistryConfig registry, String group, String version, Integer timeout) {
            //加后缀用于区分不同端
            String cacheKey = clazz.getName() + "_" + registry.getId();
    
            synchronized (referenceConfigMap) {
                // 获取缓存中的服务引用实例
                CacheEntry cacheEntry = referenceConfigMap.get(cacheKey);
                if (cacheEntry != null) {
                    // 如果缓存中存在,则更新时间戳并返回实际服务引用
                    cacheEntry.setCreatedTimestamp(System.currentTimeMillis());
                    return ((ReferenceConfig<T>) cacheEntry.getReferenceConfig()).get();
                }
    
                // 如果缓存中不存在,则创建一个新的 ReferenceConfig 对象并加入到缓存中
                ReferenceConfig<T> referenceConfig = new ReferenceConfig<>();
    
                referenceConfig.setInterface(clazz);
                // 设置重试次数
                referenceConfig.setRetries(ApiConstant.MAX_REGISTRY_RETRIES);
    
                //不需要检查服务提供者是否存在
                referenceConfig.setCheck(false);
                referenceConfig.setRegistry(registry);
                // 延迟初始化服务引用实例
                referenceConfig.setLazy(true);
                // 声明为泛化接口
                referenceConfig.setGeneric("true");
                if (StrUtil.isNotBlank(group)) {
                    referenceConfig.setGroup(group);
                }
                if (StrUtil.isNotBlank(version)) {
                    referenceConfig.setVersion(version);
                }
                if (ObjectUtil.isNotNull(timeout)) {
                    // 超时时间
                    referenceConfig.setTimeout(timeout);
                }
    
                cacheEntry = new CacheEntry(System.currentTimeMillis(), referenceConfig);
                referenceConfigMap.put(cacheKey, cacheEntry);
    
                if (referenceConfigMap.size() > MAX_CACHE_SIZE) {
                    // 如果缓存数量超过限制,则清理最老的一个元素
                    String oldestKey = referenceConfigMap.keySet().iterator().next();
                    referenceConfigMap.remove(oldestKey);
                }
    
                return referenceConfig.get();
            }
        }
    
        public static <T> T getReference(Class<T> clazz, RegistryConfig registry, String group, String version, Integer timeout, String url) {
            //加后缀用于区分不同端
            String cacheKey = clazz.getName() + "_" + registry.getId();
    
            synchronized (referenceConfigMap) {
                // 获取缓存中的服务引用实例
                CacheEntry cacheEntry = referenceConfigMap.get(cacheKey);
                if (cacheEntry != null) {
                    // 如果缓存中存在,则更新时间戳并返回实际服务引用
                    cacheEntry.setCreatedTimestamp(System.currentTimeMillis());
                    return ((ReferenceConfig<T>) cacheEntry.getReferenceConfig()).get();
                }
    
                // 如果缓存中不存在,则创建一个新的 ReferenceConfig 对象并加入到缓存中
                ReferenceConfig<T> referenceConfig = new ReferenceConfig<>();
    
                referenceConfig.setInterface(clazz);
                // 设置重试次数
                referenceConfig.setRetries(ApiConstant.MAX_REGISTRY_RETRIES);
    
                //不需要检查服务提供者是否存在
                referenceConfig.setCheck(false);
    
                // 延迟初始化服务引用实例
                referenceConfig.setLazy(true);
                // 声明为泛化接口
                referenceConfig.setGeneric("true");
                if (StrUtil.isNotBlank(group)) {
                    referenceConfig.setGroup(group);
                }
    
                if (ObjectUtil.isNotNull(timeout)) {
                    // 超时时间
                    referenceConfig.setTimeout(timeout);
                }
    
                referenceConfig.setUrl(url);
                cacheEntry = new CacheEntry(System.currentTimeMillis(), referenceConfig);
                referenceConfigMap.put(cacheKey, cacheEntry);
    
                if (referenceConfigMap.size() > MAX_CACHE_SIZE) {
                    // 如果缓存数量超过限制,则清理最老的一个元素
                    String oldestKey = referenceConfigMap.keySet().iterator().next();
                    referenceConfigMap.remove(oldestKey);
                }
    
                return referenceConfig.get();
            }
        }
    
        private static class CacheEntry {
            private long createdTimestamp;
            private ReferenceConfig<?> referenceConfig;
    
            public CacheEntry(long createdTimestamp, ReferenceConfig<?> referenceConfig) {
                this.createdTimestamp = createdTimestamp;
                this.referenceConfig = referenceConfig;
            }
    
            public long getCreatedTimestamp() {
                return createdTimestamp;
            }
    
            public void setCreatedTimestamp(long createdTimestamp) {
                this.createdTimestamp = createdTimestamp;
            }
    
            public ReferenceConfig<?> getReferenceConfig() {
                return referenceConfig;
            }
        }
    }
    
    package com.jinw.utils.rpc;
    
    import com.jinw.constant.ApiConstant;
    import org.apache.dubbo.config.ApplicationConfig;
    import org.apache.dubbo.config.RegistryConfig;
    import org.apache.dubbo.rpc.RpcContext;
    import org.apache.dubbo.rpc.model.ApplicationModel;
    import org.apache.dubbo.rpc.service.GenericService;
    
    import javax.annotation.Nullable;
    
    /**
     * 功能描述: 基于 Dubbo 泛化调用特性的远程调用
     *
     * @author 20024968@cnsuning.com
     * @version 1.0.0
     */
    public class DubboRpcClient {
    
        private RegistryConfig registry;
    
    
        public DubboRpcClient(ApplicationConfig application, RegistryConfig registry) {
            this.registry = registry;
            ApplicationModel.getConfigManager().setApplication(application);
        }
    
        public Object genericInvoke(Class<?> interfaceClass, String methodName, String[] parameterTypes, Object[] args) {
            return genericInvoke(interfaceClass, methodName, parameterTypes, args, null, null, null);
        }
    
        public Object genericInvoke(Class<?> interfaceClass, String methodName, String[] parameterTypes, Object[] args, String version) {
            return genericInvoke(interfaceClass, methodName, parameterTypes, args, null, version, ApiConstant.MAX_REFERENCE_TIMEOUT);
        }
    
        /**
         * @return java.lang.Object
         * @Author @liuxin
         * @Description dubbo泛化 rpc调用
         * @Date 2023/4/23 16:42
         */
        public Object genericInvoke(Class<?> interfaceClass, String methodName, String[] parameterTypes, Object[] args, @Nullable String group, @Nullable String version, @Nullable Integer timeout) {
            GenericService genericService = (GenericService) DubboReferenceCache.getReference(interfaceClass, registry, group, version, timeout);
            Object $invoke = genericService.$invoke(methodName, parameterTypes, args);
            // 重置RpcContext对象,避免其他的调用受到影响
            RpcContext.getContext().clearAttachments();
            return $invoke;
        }
    
        /**
         * @return java.lang.Object
         * @Author @liuxin
         * @Description 通过dubbo的url进行调用
         * @Date 2023/4/23 16:42
         */
        public Object genericInvoke(Class<?> interfaceClass, String methodName, String[] parameterTypes, Object[] args, @Nullable String group, @Nullable String version, @Nullable Integer timeout, @Nullable String url) {
            GenericService genericService = (GenericService) DubboReferenceCache.getReference(interfaceClass, registry, group, version, timeout, url);
            Object $invoke = genericService.$invoke(methodName, parameterTypes, args);
            // 重置RpcContext对象,避免其他的调用受到影响
            RpcContext.getContext().clearAttachments();
            return $invoke;
        }
    }
    
    package com.jinw.utils.rpc;
    
    import com.alibaba.fastjson.JSON;
    import com.jinw.constant.ApiConstant;
    import com.jinw.system.entity.SysUser;
    import com.jinw.utils.SecurityUtils;
    import org.apache.commons.lang.StringUtils;
    import org.apache.dubbo.rpc.RpcContext;
    import org.apache.dubbo.rpc.RpcException;
    
    /**
     * @ClassName RpcUserUtils
     * @Description TODO
     * @Author @liuxin
     * @Date 2023/4/4 17:29
     * @Version 1.0
     */
    public class RpcUtils {
        public static SysUser getCurrentUser() {
            RpcContext context = RpcContext.getContext();
            String currentUser = context.getAttachment(ApiConstant.CURRENT_USER);
            if (StringUtils.isNotBlank(currentUser)) {
                return JSON.parseObject(currentUser, SysUser.class);
            }
    
            if (StringUtils.isBlank(currentUser)) {
                try {
                    return SecurityUtils.getCurrentUser();
                } catch (Exception e) {
                    throw new RpcException("Rpc获取用户信息失败!");
                }
            }
            return null;
        }
    
    }
    
     private Object getGenericInvokeResult(Integer partyId, Class<?> interfaceClass, String methodName, String[] parameterTypes, Object[] args) {
            String[] zookeeperAddressSpilts = zookeeperAddress.replaceAll("zookeeper://", "").split(":");
            RegistryService registryService = ExtensionLoader.getExtensionLoader(RegistryFactory.class)
                    .getExtension("zookeeper")
                    .getRegistry(new URL("zookeeper",
                            zookeeperAddressSpilts[0],
                            Integer.valueOf(zookeeperAddressSpilts[1])));
            URL serviceUrl = new URL("dubbo", zookeeperAddressSpilts[0], 0,
                    interfaceClass.getName());
            serviceUrl = serviceUrl.addParameter("version", ApiConstant.VERSION);  // 在URL中添加版本号参数
            // 获取可用的服务列表
            List<URL> urlList = registryService.lookup(serviceUrl).stream().filter(url -> url.getParameter("application").contains(partyId.toString())).collect(Collectors.toList());
            // 判断服务列表中是否包含指定的服务
            if (ObjectUtil.isNotEmpty(urlList)) {
                DubboRpcClient rpc = createDubboRpc(partyId);
                // 调用提供的方法
                Object result = rpc.genericInvoke(interfaceClass, methodName, parameterTypes, args, null, ApiConstant.VERSION, ApiConstant.MAX_REFERENCE_TIMEOUT, urlList.get(0).toString());
                return result;
            } else {
                throw new RpcException("未找到可用服务!");
            }
        }
    
        private DubboRpcClient createDubboRpc(Integer partyId) {
            ApplicationConfig application = new ApplicationConfig("kingow-oa-client-" + partyId);
            RegistryConfig registry = new RegistryConfig();
            registry.setProtocol("zookeeper");
            registry.setAddress(zookeeperAddress);
            registry.setTimeout(ApiConstant.MAX_REGISTRY_TIMEOUT);
            registry.setId("kingow-oa-client-" + partyId);
            DubboRpcClient rpc = new DubboRpcClient(application, registry);
            return rpc;
        }
    dubbo:
      application:
        # 服务名称,保持唯一
        name: kingow-oa-client-9999
        # zookeeper地址,用于向其注册服务
      registry:
        protocal: zookeeper
        address: zookeeper://192.168.2.122:2181
        timeout: 5000  # 如果zookeeper是放在远程服务器上超时时间请设置长一些,不然很容易超时连接失败
      protocol:
        name: dubbo
        port: -1
    #  consumer:
    #    filter: dubboAccountFilter
    #  provider:
    #    filter: dubboAccountFilter
    
    client:
     file:
      #本地文件上传路径配置
      uploadDir: D:\client\data\upload
      #python文件路径
     python:
       fileDir: D:\client\python
      #shell文件路径
     shell:
        fileDir: D:\client\shell
    
    #AT 模式
    #seata 管理面板 http://192.168.2.122:7091
    seata:
      enabled: true
      application-id: kingow-oa-client-9999
      tx-service-group: kingow-oa-client-9999-group
      enable-auto-data-source-proxy: true
      client:
        support:
          spring:
            datasource-autoproxy: true
      registry:
        type: zk
        zk:
          server-addr: 192.168.2.122:2181
          connect-timeout: 2000
          session-timeout: 6000
          username: ""
          password: ""
      service:
        grouplist:
          default: 192.168.2.122:8091
        vgroup-mapping:
          kingow-oa-client-9999-group: default

    dubbo的拦截器

    package com.jinw.api.filter;
    
    import cn.hutool.core.util.ObjectUtil;
    import com.alibaba.fastjson.JSON;
    import com.jinw.constant.ApiConstant;
    import com.jinw.system.entity.SysUser;
    import com.jinw.utils.SecurityUtils;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.dubbo.common.constants.CommonConstants;
    import org.apache.dubbo.common.extension.Activate;
    import org.apache.dubbo.rpc.*;
    
    /**
     * @Description dubbo拦截
     * @Author ljh
     * @Date 2020:09:27 11:47
     */
    //https://blog.csdn.net/qq_36882793/article/details/118111841
    //https://saint.blog.csdn.net/article/details/124110926?spm=1001.2101.3001.6650.8&utm_medium=distribute.pc_relevant.none-task-blog-2%7Edefault%7EBlogCommendFromBaidu%7ERate-8-124110926-blog-89025640.235%5Ev31%5Epc_relevant_default_base3&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2%7Edefault%7EBlogCommendFromBaidu%7ERate-8-124110926-blog-89025640.235%5Ev31%5Epc_relevant_default_base3&utm_relevant_index=15
    //https://segmentfault.com/a/1190000040164855?utm_source=sf-similar-article
    @Slf4j
    @Activate(group = {CommonConstants.CONSUMER, CommonConstants.PROVIDER}, order = -30000)
    public class DubboAccountFilter implements Filter {
        @Override
        public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
            RpcContext rpcContext = RpcContext.getContext();
            if (ObjectUtil.isNotEmpty(rpcContext.getObjectAttachments())) {
                if (ObjectUtil.isNotEmpty(rpcContext.getAttachment(ApiConstant.CURRENT_USER))) {
                    rpcContext.setAttachment(ApiConstant.CURRENT_USER, rpcContext.getAttachment(ApiConstant.CURRENT_USER));
                    log.debug(" DubboAccountFilter {} {} ", rpcContext.getMethodName(), rpcContext.getAttachment(ApiConstant.CURRENT_USER));
                } else {
                    SysUser user = null;
                    try {
                        user = SecurityUtils.getCurrentUser();
                    } catch (Exception e) {
                    }
                    // 设置参数
                    rpcContext.setAttachment(ApiConstant.CURRENT_USER, JSON.toJSONString(user));
                    log.debug(" DubboAccountFilter {} {} ", rpcContext.getMethodName(), rpcContext.getAttachment(ApiConstant.CURRENT_USER));
                }
            } else {
                SysUser user = null;
                try {
                    user = SecurityUtils.getCurrentUser();
                } catch (Exception e) {
                }
                // 设置参数
                rpcContext.setAttachment(ApiConstant.CURRENT_USER, JSON.toJSONString(user));
                log.debug(" DubboAccountFilter {} {} ", rpcContext.getMethodName(), rpcContext.getAttachment(ApiConstant.CURRENT_USER));
            }
            try {
                return invoker.invoke(invocation);
            } finally {
                // 清空CURRENT_USER参数
                // rpcContext.removeAttachment(ApiConstant.CURRENT_USER);
            }
        }
    }
    本文著作权归作者 [ admin ] 享有,未经作者书面授权,禁止转载,封面图片来源于 [ 互联网 ] ,本文仅供个人学习、研究和欣赏使用。如有异议,请联系博主及时处理。
    取消回复

    发表留言
    回复

    首页邻居关于归档
    Copyright©2020-2026  All Rights Reserved.  Load:0.016 s
    京ICP备18019712号
    Theme by OneBlog V3.6.5
    夜间模式

    开源不易,请尊重作者版权,保留基本的版权信息。