dubbo自定义泛化调用接口
warning:
这篇文章距离上次修改已过622天,其中的内容可能已经有所变动。
在做公司业务时,有需求是让server端调用特定的client端
我是根据dubbo.application.name来判断的
package com.jinw.utils.rpc;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.jinw.constant.ApiConstant;
import org.apache.dubbo.config.ReferenceConfig;
import org.apache.dubbo.config.RegistryConfig;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* @Author @liuxin
* @Description dubbo rpc调用缓存类
* @Date 2023/4/20 16:47
*/
public class DubboReferenceCache {
private static final Map<String, CacheEntry> referenceConfigMap = new LinkedHashMap<>(16, 0.75f, true);
private static final int MAX_CACHE_SIZE = 1000;
private static final int CACHE_EXPIRE_TIME = 1800;
private static final ScheduledExecutorService CLEANUP_EXECUTOR = Executors.newSingleThreadScheduledExecutor();
static {
CLEANUP_EXECUTOR.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
synchronized (referenceConfigMap) {
long now = System.currentTimeMillis();
for (Iterator<Map.Entry<String, CacheEntry>> iter = referenceConfigMap.entrySet().iterator(); iter.hasNext(); ) {
Map.Entry<String, CacheEntry> entry = iter.next();
if (referenceConfigMap.size() > MAX_CACHE_SIZE || (now - entry.getValue().getCreatedTimestamp()) > (CACHE_EXPIRE_TIME * 1000)) {
iter.remove();
} else {
break;
}
}
}
}
}, 1, 1, TimeUnit.MINUTES);
}
public static <T> T getReference(Class<T> clazz, RegistryConfig registry, String group, String version, Integer timeout) {
//加后缀用于区分不同端
String cacheKey = clazz.getName() + "_" + registry.getId();
synchronized (referenceConfigMap) {
// 获取缓存中的服务引用实例
CacheEntry cacheEntry = referenceConfigMap.get(cacheKey);
if (cacheEntry != null) {
// 如果缓存中存在,则更新时间戳并返回实际服务引用
cacheEntry.setCreatedTimestamp(System.currentTimeMillis());
return ((ReferenceConfig<T>) cacheEntry.getReferenceConfig()).get();
}
// 如果缓存中不存在,则创建一个新的 ReferenceConfig 对象并加入到缓存中
ReferenceConfig<T> referenceConfig = new ReferenceConfig<>();
referenceConfig.setInterface(clazz);
// 设置重试次数
referenceConfig.setRetries(ApiConstant.MAX_REGISTRY_RETRIES);
//不需要检查服务提供者是否存在
referenceConfig.setCheck(false);
referenceConfig.setRegistry(registry);
// 延迟初始化服务引用实例
referenceConfig.setLazy(true);
// 声明为泛化接口
referenceConfig.setGeneric("true");
if (StrUtil.isNotBlank(group)) {
referenceConfig.setGroup(group);
}
if (StrUtil.isNotBlank(version)) {
referenceConfig.setVersion(version);
}
if (ObjectUtil.isNotNull(timeout)) {
// 超时时间
referenceConfig.setTimeout(timeout);
}
cacheEntry = new CacheEntry(System.currentTimeMillis(), referenceConfig);
referenceConfigMap.put(cacheKey, cacheEntry);
if (referenceConfigMap.size() > MAX_CACHE_SIZE) {
// 如果缓存数量超过限制,则清理最老的一个元素
String oldestKey = referenceConfigMap.keySet().iterator().next();
referenceConfigMap.remove(oldestKey);
}
return referenceConfig.get();
}
}
public static <T> T getReference(Class<T> clazz, RegistryConfig registry, String group, String version, Integer timeout, String url) {
//加后缀用于区分不同端
String cacheKey = clazz.getName() + "_" + registry.getId();
synchronized (referenceConfigMap) {
// 获取缓存中的服务引用实例
CacheEntry cacheEntry = referenceConfigMap.get(cacheKey);
if (cacheEntry != null) {
// 如果缓存中存在,则更新时间戳并返回实际服务引用
cacheEntry.setCreatedTimestamp(System.currentTimeMillis());
return ((ReferenceConfig<T>) cacheEntry.getReferenceConfig()).get();
}
// 如果缓存中不存在,则创建一个新的 ReferenceConfig 对象并加入到缓存中
ReferenceConfig<T> referenceConfig = new ReferenceConfig<>();
referenceConfig.setInterface(clazz);
// 设置重试次数
referenceConfig.setRetries(ApiConstant.MAX_REGISTRY_RETRIES);
//不需要检查服务提供者是否存在
referenceConfig.setCheck(false);
// 延迟初始化服务引用实例
referenceConfig.setLazy(true);
// 声明为泛化接口
referenceConfig.setGeneric("true");
if (StrUtil.isNotBlank(group)) {
referenceConfig.setGroup(group);
}
if (ObjectUtil.isNotNull(timeout)) {
// 超时时间
referenceConfig.setTimeout(timeout);
}
referenceConfig.setUrl(url);
cacheEntry = new CacheEntry(System.currentTimeMillis(), referenceConfig);
referenceConfigMap.put(cacheKey, cacheEntry);
if (referenceConfigMap.size() > MAX_CACHE_SIZE) {
// 如果缓存数量超过限制,则清理最老的一个元素
String oldestKey = referenceConfigMap.keySet().iterator().next();
referenceConfigMap.remove(oldestKey);
}
return referenceConfig.get();
}
}
private static class CacheEntry {
private long createdTimestamp;
private ReferenceConfig<?> referenceConfig;
public CacheEntry(long createdTimestamp, ReferenceConfig<?> referenceConfig) {
this.createdTimestamp = createdTimestamp;
this.referenceConfig = referenceConfig;
}
public long getCreatedTimestamp() {
return createdTimestamp;
}
public void setCreatedTimestamp(long createdTimestamp) {
this.createdTimestamp = createdTimestamp;
}
public ReferenceConfig<?> getReferenceConfig() {
return referenceConfig;
}
}
}
package com.jinw.utils.rpc;
import com.jinw.constant.ApiConstant;
import org.apache.dubbo.config.ApplicationConfig;
import org.apache.dubbo.config.RegistryConfig;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.model.ApplicationModel;
import org.apache.dubbo.rpc.service.GenericService;
import javax.annotation.Nullable;
/**
* 功能描述: 基于 Dubbo 泛化调用特性的远程调用
*
* @author 20024968@cnsuning.com
* @version 1.0.0
*/
public class DubboRpcClient {
private RegistryConfig registry;
public DubboRpcClient(ApplicationConfig application, RegistryConfig registry) {
this.registry = registry;
ApplicationModel.getConfigManager().setApplication(application);
}
public Object genericInvoke(Class<?> interfaceClass, String methodName, String[] parameterTypes, Object[] args) {
return genericInvoke(interfaceClass, methodName, parameterTypes, args, null, null, null);
}
public Object genericInvoke(Class<?> interfaceClass, String methodName, String[] parameterTypes, Object[] args, String version) {
return genericInvoke(interfaceClass, methodName, parameterTypes, args, null, version, ApiConstant.MAX_REFERENCE_TIMEOUT);
}
/**
* @return java.lang.Object
* @Author @liuxin
* @Description dubbo泛化 rpc调用
* @Date 2023/4/23 16:42
*/
public Object genericInvoke(Class<?> interfaceClass, String methodName, String[] parameterTypes, Object[] args, @Nullable String group, @Nullable String version, @Nullable Integer timeout) {
GenericService genericService = (GenericService) DubboReferenceCache.getReference(interfaceClass, registry, group, version, timeout);
Object $invoke = genericService.$invoke(methodName, parameterTypes, args);
// 重置RpcContext对象,避免其他的调用受到影响
RpcContext.getContext().clearAttachments();
return $invoke;
}
/**
* @return java.lang.Object
* @Author @liuxin
* @Description 通过dubbo的url进行调用
* @Date 2023/4/23 16:42
*/
public Object genericInvoke(Class<?> interfaceClass, String methodName, String[] parameterTypes, Object[] args, @Nullable String group, @Nullable String version, @Nullable Integer timeout, @Nullable String url) {
GenericService genericService = (GenericService) DubboReferenceCache.getReference(interfaceClass, registry, group, version, timeout, url);
Object $invoke = genericService.$invoke(methodName, parameterTypes, args);
// 重置RpcContext对象,避免其他的调用受到影响
RpcContext.getContext().clearAttachments();
return $invoke;
}
}
package com.jinw.utils.rpc;
import com.alibaba.fastjson.JSON;
import com.jinw.constant.ApiConstant;
import com.jinw.system.entity.SysUser;
import com.jinw.utils.SecurityUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcException;
/**
* @ClassName RpcUserUtils
* @Description TODO
* @Author @liuxin
* @Date 2023/4/4 17:29
* @Version 1.0
*/
public class RpcUtils {
public static SysUser getCurrentUser() {
RpcContext context = RpcContext.getContext();
String currentUser = context.getAttachment(ApiConstant.CURRENT_USER);
if (StringUtils.isNotBlank(currentUser)) {
return JSON.parseObject(currentUser, SysUser.class);
}
if (StringUtils.isBlank(currentUser)) {
try {
return SecurityUtils.getCurrentUser();
} catch (Exception e) {
throw new RpcException("Rpc获取用户信息失败!");
}
}
return null;
}
}
private Object getGenericInvokeResult(Integer partyId, Class<?> interfaceClass, String methodName, String[] parameterTypes, Object[] args) {
String[] zookeeperAddressSpilts = zookeeperAddress.replaceAll("zookeeper://", "").split(":");
RegistryService registryService = ExtensionLoader.getExtensionLoader(RegistryFactory.class)
.getExtension("zookeeper")
.getRegistry(new URL("zookeeper",
zookeeperAddressSpilts[0],
Integer.valueOf(zookeeperAddressSpilts[1])));
URL serviceUrl = new URL("dubbo", zookeeperAddressSpilts[0], 0,
interfaceClass.getName());
serviceUrl = serviceUrl.addParameter("version", ApiConstant.VERSION); // 在URL中添加版本号参数
// 获取可用的服务列表
List<URL> urlList = registryService.lookup(serviceUrl).stream().filter(url -> url.getParameter("application").contains(partyId.toString())).collect(Collectors.toList());
// 判断服务列表中是否包含指定的服务
if (ObjectUtil.isNotEmpty(urlList)) {
DubboRpcClient rpc = createDubboRpc(partyId);
// 调用提供的方法
Object result = rpc.genericInvoke(interfaceClass, methodName, parameterTypes, args, null, ApiConstant.VERSION, ApiConstant.MAX_REFERENCE_TIMEOUT, urlList.get(0).toString());
return result;
} else {
throw new RpcException("未找到可用服务!");
}
}
private DubboRpcClient createDubboRpc(Integer partyId) {
ApplicationConfig application = new ApplicationConfig("kingow-oa-client-" + partyId);
RegistryConfig registry = new RegistryConfig();
registry.setProtocol("zookeeper");
registry.setAddress(zookeeperAddress);
registry.setTimeout(ApiConstant.MAX_REGISTRY_TIMEOUT);
registry.setId("kingow-oa-client-" + partyId);
DubboRpcClient rpc = new DubboRpcClient(application, registry);
return rpc;
}
dubbo:
application:
# 服务名称,保持唯一
name: kingow-oa-client-9999
# zookeeper地址,用于向其注册服务
registry:
protocal: zookeeper
address: zookeeper://192.168.2.122:2181
timeout: 5000 # 如果zookeeper是放在远程服务器上超时时间请设置长一些,不然很容易超时连接失败
protocol:
name: dubbo
port: -1
# consumer:
# filter: dubboAccountFilter
# provider:
# filter: dubboAccountFilter
client:
file:
#本地文件上传路径配置
uploadDir: D:\client\data\upload
#python文件路径
python:
fileDir: D:\client\python
#shell文件路径
shell:
fileDir: D:\client\shell
#AT 模式
#seata 管理面板 http://192.168.2.122:7091
seata:
enabled: true
application-id: kingow-oa-client-9999
tx-service-group: kingow-oa-client-9999-group
enable-auto-data-source-proxy: true
client:
support:
spring:
datasource-autoproxy: true
registry:
type: zk
zk:
server-addr: 192.168.2.122:2181
connect-timeout: 2000
session-timeout: 6000
username: ""
password: ""
service:
grouplist:
default: 192.168.2.122:8091
vgroup-mapping:
kingow-oa-client-9999-group: default
dubbo的拦截器
package com.jinw.api.filter;
import cn.hutool.core.util.ObjectUtil;
import com.alibaba.fastjson.JSON;
import com.jinw.constant.ApiConstant;
import com.jinw.system.entity.SysUser;
import com.jinw.utils.SecurityUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.common.extension.Activate;
import org.apache.dubbo.rpc.*;
/**
* @Description dubbo拦截
* @Author ljh
* @Date 2020:09:27 11:47
*/
//https://blog.csdn.net/qq_36882793/article/details/118111841
//https://saint.blog.csdn.net/article/details/124110926?spm=1001.2101.3001.6650.8&utm_medium=distribute.pc_relevant.none-task-blog-2%7Edefault%7EBlogCommendFromBaidu%7ERate-8-124110926-blog-89025640.235%5Ev31%5Epc_relevant_default_base3&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2%7Edefault%7EBlogCommendFromBaidu%7ERate-8-124110926-blog-89025640.235%5Ev31%5Epc_relevant_default_base3&utm_relevant_index=15
//https://segmentfault.com/a/1190000040164855?utm_source=sf-similar-article
@Slf4j
@Activate(group = {CommonConstants.CONSUMER, CommonConstants.PROVIDER}, order = -30000)
public class DubboAccountFilter implements Filter {
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
RpcContext rpcContext = RpcContext.getContext();
if (ObjectUtil.isNotEmpty(rpcContext.getObjectAttachments())) {
if (ObjectUtil.isNotEmpty(rpcContext.getAttachment(ApiConstant.CURRENT_USER))) {
rpcContext.setAttachment(ApiConstant.CURRENT_USER, rpcContext.getAttachment(ApiConstant.CURRENT_USER));
log.debug(" DubboAccountFilter {} {} ", rpcContext.getMethodName(), rpcContext.getAttachment(ApiConstant.CURRENT_USER));
} else {
SysUser user = null;
try {
user = SecurityUtils.getCurrentUser();
} catch (Exception e) {
}
// 设置参数
rpcContext.setAttachment(ApiConstant.CURRENT_USER, JSON.toJSONString(user));
log.debug(" DubboAccountFilter {} {} ", rpcContext.getMethodName(), rpcContext.getAttachment(ApiConstant.CURRENT_USER));
}
} else {
SysUser user = null;
try {
user = SecurityUtils.getCurrentUser();
} catch (Exception e) {
}
// 设置参数
rpcContext.setAttachment(ApiConstant.CURRENT_USER, JSON.toJSONString(user));
log.debug(" DubboAccountFilter {} {} ", rpcContext.getMethodName(), rpcContext.getAttachment(ApiConstant.CURRENT_USER));
}
try {
return invoker.invoke(invocation);
} finally {
// 清空CURRENT_USER参数
// rpcContext.removeAttachment(ApiConstant.CURRENT_USER);
}
}
}