dubbo自定义泛化调用接口 Published on May 9, 2023 in Java教程 with 0 comment 在做公司业务时,有需求是让server端调用特定的client端 我是根据dubbo.application.name来判断的 ```java package com.jinw.utils.rpc; import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; import com.jinw.constant.ApiConstant; import org.apache.dubbo.config.ReferenceConfig; import org.apache.dubbo.config.RegistryConfig; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; /** * @Author @liuxin * @Description dubbo rpc调用缓存类 * @Date 2023/4/20 16:47 */ public class DubboReferenceCache { private static final Map referenceConfigMap = new LinkedHashMap<>(16, 0.75f, true); private static final int MAX_CACHE_SIZE = 1000; private static final int CACHE_EXPIRE_TIME = 1800; private static final ScheduledExecutorService CLEANUP_EXECUTOR = Executors.newSingleThreadScheduledExecutor(); static { CLEANUP_EXECUTOR.scheduleWithFixedDelay(new Runnable() { @Override public void run() { synchronized (referenceConfigMap) { long now = System.currentTimeMillis(); for (Iterator> iter = referenceConfigMap.entrySet().iterator(); iter.hasNext(); ) { Map.Entry entry = iter.next(); if (referenceConfigMap.size() > MAX_CACHE_SIZE || (now - entry.getValue().getCreatedTimestamp()) > (CACHE_EXPIRE_TIME * 1000)) { iter.remove(); } else { break; } } } } }, 1, 1, TimeUnit.MINUTES); } public static T getReference(Class clazz, RegistryConfig registry, String group, String version, Integer timeout) { //加后缀用于区分不同端 String cacheKey = clazz.getName() + "_" + registry.getId(); synchronized (referenceConfigMap) { // 获取缓存中的服务引用实例 CacheEntry cacheEntry = referenceConfigMap.get(cacheKey); if (cacheEntry != null) { // 如果缓存中存在,则更新时间戳并返回实际服务引用 cacheEntry.setCreatedTimestamp(System.currentTimeMillis()); return ((ReferenceConfig) cacheEntry.getReferenceConfig()).get(); } // 如果缓存中不存在,则创建一个新的 ReferenceConfig 对象并加入到缓存中 ReferenceConfig referenceConfig = new ReferenceConfig<>(); referenceConfig.setInterface(clazz); // 设置重试次数 referenceConfig.setRetries(ApiConstant.MAX_REGISTRY_RETRIES); //不需要检查服务提供者是否存在 referenceConfig.setCheck(false); referenceConfig.setRegistry(registry); // 延迟初始化服务引用实例 referenceConfig.setLazy(true); // 声明为泛化接口 referenceConfig.setGeneric("true"); if (StrUtil.isNotBlank(group)) { referenceConfig.setGroup(group); } if (StrUtil.isNotBlank(version)) { referenceConfig.setVersion(version); } if (ObjectUtil.isNotNull(timeout)) { // 超时时间 referenceConfig.setTimeout(timeout); } cacheEntry = new CacheEntry(System.currentTimeMillis(), referenceConfig); referenceConfigMap.put(cacheKey, cacheEntry); if (referenceConfigMap.size() > MAX_CACHE_SIZE) { // 如果缓存数量超过限制,则清理最老的一个元素 String oldestKey = referenceConfigMap.keySet().iterator().next(); referenceConfigMap.remove(oldestKey); } return referenceConfig.get(); } } public static T getReference(Class clazz, RegistryConfig registry, String group, String version, Integer timeout, String url) { //加后缀用于区分不同端 String cacheKey = clazz.getName() + "_" + registry.getId(); synchronized (referenceConfigMap) { // 获取缓存中的服务引用实例 CacheEntry cacheEntry = referenceConfigMap.get(cacheKey); if (cacheEntry != null) { // 如果缓存中存在,则更新时间戳并返回实际服务引用 cacheEntry.setCreatedTimestamp(System.currentTimeMillis()); return ((ReferenceConfig) cacheEntry.getReferenceConfig()).get(); } // 如果缓存中不存在,则创建一个新的 ReferenceConfig 对象并加入到缓存中 ReferenceConfig referenceConfig = new ReferenceConfig<>(); referenceConfig.setInterface(clazz); // 设置重试次数 referenceConfig.setRetries(ApiConstant.MAX_REGISTRY_RETRIES); //不需要检查服务提供者是否存在 referenceConfig.setCheck(false); // 延迟初始化服务引用实例 referenceConfig.setLazy(true); // 声明为泛化接口 referenceConfig.setGeneric("true"); if (StrUtil.isNotBlank(group)) { referenceConfig.setGroup(group); } if (ObjectUtil.isNotNull(timeout)) { // 超时时间 referenceConfig.setTimeout(timeout); } referenceConfig.setUrl(url); cacheEntry = new CacheEntry(System.currentTimeMillis(), referenceConfig); referenceConfigMap.put(cacheKey, cacheEntry); if (referenceConfigMap.size() > MAX_CACHE_SIZE) { // 如果缓存数量超过限制,则清理最老的一个元素 String oldestKey = referenceConfigMap.keySet().iterator().next(); referenceConfigMap.remove(oldestKey); } return referenceConfig.get(); } } private static class CacheEntry { private long createdTimestamp; private ReferenceConfig> referenceConfig; public CacheEntry(long createdTimestamp, ReferenceConfig> referenceConfig) { this.createdTimestamp = createdTimestamp; this.referenceConfig = referenceConfig; } public long getCreatedTimestamp() { return createdTimestamp; } public void setCreatedTimestamp(long createdTimestamp) { this.createdTimestamp = createdTimestamp; } public ReferenceConfig> getReferenceConfig() { return referenceConfig; } } } ``` ```java package com.jinw.utils.rpc; import com.jinw.constant.ApiConstant; import org.apache.dubbo.config.ApplicationConfig; import org.apache.dubbo.config.RegistryConfig; import org.apache.dubbo.rpc.RpcContext; import org.apache.dubbo.rpc.model.ApplicationModel; import org.apache.dubbo.rpc.service.GenericService; import javax.annotation.Nullable; /** * 功能描述: 基于 Dubbo 泛化调用特性的远程调用 * * @author 20024968@cnsuning.com * @version 1.0.0 */ public class DubboRpcClient { private RegistryConfig registry; public DubboRpcClient(ApplicationConfig application, RegistryConfig registry) { this.registry = registry; ApplicationModel.getConfigManager().setApplication(application); } public Object genericInvoke(Class> interfaceClass, String methodName, String[] parameterTypes, Object[] args) { return genericInvoke(interfaceClass, methodName, parameterTypes, args, null, null, null); } public Object genericInvoke(Class> interfaceClass, String methodName, String[] parameterTypes, Object[] args, String version) { return genericInvoke(interfaceClass, methodName, parameterTypes, args, null, version, ApiConstant.MAX_REFERENCE_TIMEOUT); } /** * @return java.lang.Object * @Author @liuxin * @Description dubbo泛化 rpc调用 * @Date 2023/4/23 16:42 */ public Object genericInvoke(Class> interfaceClass, String methodName, String[] parameterTypes, Object[] args, @Nullable String group, @Nullable String version, @Nullable Integer timeout) { GenericService genericService = (GenericService) DubboReferenceCache.getReference(interfaceClass, registry, group, version, timeout); Object $invoke = genericService.$invoke(methodName, parameterTypes, args); // 重置RpcContext对象,避免其他的调用受到影响 RpcContext.getContext().clearAttachments(); return $invoke; } /** * @return java.lang.Object * @Author @liuxin * @Description 通过dubbo的url进行调用 * @Date 2023/4/23 16:42 */ public Object genericInvoke(Class> interfaceClass, String methodName, String[] parameterTypes, Object[] args, @Nullable String group, @Nullable String version, @Nullable Integer timeout, @Nullable String url) { GenericService genericService = (GenericService) DubboReferenceCache.getReference(interfaceClass, registry, group, version, timeout, url); Object $invoke = genericService.$invoke(methodName, parameterTypes, args); // 重置RpcContext对象,避免其他的调用受到影响 RpcContext.getContext().clearAttachments(); return $invoke; } } ``` ```java package com.jinw.utils.rpc; import com.alibaba.fastjson.JSON; import com.jinw.constant.ApiConstant; import com.jinw.system.entity.SysUser; import com.jinw.utils.SecurityUtils; import org.apache.commons.lang.StringUtils; import org.apache.dubbo.rpc.RpcContext; import org.apache.dubbo.rpc.RpcException; /** * @ClassName RpcUserUtils * @Description TODO * @Author @liuxin * @Date 2023/4/4 17:29 * @Version 1.0 */ public class RpcUtils { public static SysUser getCurrentUser() { RpcContext context = RpcContext.getContext(); String currentUser = context.getAttachment(ApiConstant.CURRENT_USER); if (StringUtils.isNotBlank(currentUser)) { return JSON.parseObject(currentUser, SysUser.class); } if (StringUtils.isBlank(currentUser)) { try { return SecurityUtils.getCurrentUser(); } catch (Exception e) { throw new RpcException("Rpc获取用户信息失败!"); } } return null; } } ``` ```java private Object getGenericInvokeResult(Integer partyId, Class> interfaceClass, String methodName, String[] parameterTypes, Object[] args) { String[] zookeeperAddressSpilts = zookeeperAddress.replaceAll("zookeeper://", "").split(":"); RegistryService registryService = ExtensionLoader.getExtensionLoader(RegistryFactory.class) .getExtension("zookeeper") .getRegistry(new URL("zookeeper", zookeeperAddressSpilts[0], Integer.valueOf(zookeeperAddressSpilts[1]))); URL serviceUrl = new URL("dubbo", zookeeperAddressSpilts[0], 0, interfaceClass.getName()); serviceUrl = serviceUrl.addParameter("version", ApiConstant.VERSION); // 在URL中添加版本号参数 // 获取可用的服务列表 List urlList = registryService.lookup(serviceUrl).stream().filter(url -> url.getParameter("application").contains(partyId.toString())).collect(Collectors.toList()); // 判断服务列表中是否包含指定的服务 if (ObjectUtil.isNotEmpty(urlList)) { DubboRpcClient rpc = createDubboRpc(partyId); // 调用提供的方法 Object result = rpc.genericInvoke(interfaceClass, methodName, parameterTypes, args, null, ApiConstant.VERSION, ApiConstant.MAX_REFERENCE_TIMEOUT, urlList.get(0).toString()); return result; } else { throw new RpcException("未找到可用服务!"); } } private DubboRpcClient createDubboRpc(Integer partyId) { ApplicationConfig application = new ApplicationConfig("kingow-oa-client-" + partyId); RegistryConfig registry = new RegistryConfig(); registry.setProtocol("zookeeper"); registry.setAddress(zookeeperAddress); registry.setTimeout(ApiConstant.MAX_REGISTRY_TIMEOUT); registry.setId("kingow-oa-client-" + partyId); DubboRpcClient rpc = new DubboRpcClient(application, registry); return rpc; } ``` ```yaml dubbo: application: # 服务名称,保持唯一 name: kingow-oa-client-9999 # zookeeper地址,用于向其注册服务 registry: protocal: zookeeper address: zookeeper://192.168.2.122:2181 timeout: 5000 # 如果zookeeper是放在远程服务器上超时时间请设置长一些,不然很容易超时连接失败 protocol: name: dubbo port: -1 # consumer: # filter: dubboAccountFilter # provider: # filter: dubboAccountFilter client: file: #本地文件上传路径配置 uploadDir: D:\client\data\upload #python文件路径 python: fileDir: D:\client\python #shell文件路径 shell: fileDir: D:\client\shell #AT 模式 #seata 管理面板 http://192.168.2.122:7091 seata: enabled: true application-id: kingow-oa-client-9999 tx-service-group: kingow-oa-client-9999-group enable-auto-data-source-proxy: true client: support: spring: datasource-autoproxy: true registry: type: zk zk: server-addr: 192.168.2.122:2181 connect-timeout: 2000 session-timeout: 6000 username: "" password: "" service: grouplist: default: 192.168.2.122:8091 vgroup-mapping: kingow-oa-client-9999-group: default ``` dubbo的拦截器 ```java package com.jinw.api.filter; import cn.hutool.core.util.ObjectUtil; import com.alibaba.fastjson.JSON; import com.jinw.constant.ApiConstant; import com.jinw.system.entity.SysUser; import com.jinw.utils.SecurityUtils; import lombok.extern.slf4j.Slf4j; import org.apache.dubbo.common.constants.CommonConstants; import org.apache.dubbo.common.extension.Activate; import org.apache.dubbo.rpc.*; /** * @Description dubbo拦截 * @Author ljh * @Date 2020:09:27 11:47 */ //https://blog.csdn.net/qq_36882793/article/details/118111841 //https://saint.blog.csdn.net/article/details/124110926?spm=1001.2101.3001.6650.8&utm_medium=distribute.pc_relevant.none-task-blog-2%7Edefault%7EBlogCommendFromBaidu%7ERate-8-124110926-blog-89025640.235%5Ev31%5Epc_relevant_default_base3&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2%7Edefault%7EBlogCommendFromBaidu%7ERate-8-124110926-blog-89025640.235%5Ev31%5Epc_relevant_default_base3&utm_relevant_index=15 //https://segmentfault.com/a/1190000040164855?utm_source=sf-similar-article @Slf4j @Activate(group = {CommonConstants.CONSUMER, CommonConstants.PROVIDER}, order = -30000) public class DubboAccountFilter implements Filter { @Override public Result invoke(Invoker> invoker, Invocation invocation) throws RpcException { RpcContext rpcContext = RpcContext.getContext(); if (ObjectUtil.isNotEmpty(rpcContext.getObjectAttachments())) { if (ObjectUtil.isNotEmpty(rpcContext.getAttachment(ApiConstant.CURRENT_USER))) { rpcContext.setAttachment(ApiConstant.CURRENT_USER, rpcContext.getAttachment(ApiConstant.CURRENT_USER)); log.debug(" DubboAccountFilter {} {} ", rpcContext.getMethodName(), rpcContext.getAttachment(ApiConstant.CURRENT_USER)); } else { SysUser user = null; try { user = SecurityUtils.getCurrentUser(); } catch (Exception e) { } // 设置参数 rpcContext.setAttachment(ApiConstant.CURRENT_USER, JSON.toJSONString(user)); log.debug(" DubboAccountFilter {} {} ", rpcContext.getMethodName(), rpcContext.getAttachment(ApiConstant.CURRENT_USER)); } } else { SysUser user = null; try { user = SecurityUtils.getCurrentUser(); } catch (Exception e) { } // 设置参数 rpcContext.setAttachment(ApiConstant.CURRENT_USER, JSON.toJSONString(user)); log.debug(" DubboAccountFilter {} {} ", rpcContext.getMethodName(), rpcContext.getAttachment(ApiConstant.CURRENT_USER)); } try { return invoker.invoke(invocation); } finally { // 清空CURRENT_USER参数 // rpcContext.removeAttachment(ApiConstant.CURRENT_USER); } } } ``` 本文由 admin 创作,采用 知识共享署名4.0 国际许可协议进行许可。本站文章除注明转载/出处外,均为本站原创或翻译,转载前请务必署名。