Dubbo的filter机制相当于springmvc中的过滤器,利用它我们可以实现很多功能,如token传递,异常的通用处理,分布式链路的实现等等。其使用也相当简单,只需要我们写一个类实现Filter即可。
public class DubboFilter implements Filter {
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
//从MDC中获取
String logId = MDC.get(MsConstants.TRACE_LOG_ID);
Map<String, String> attachments = invocation.getAttachments();
attachments.put(MsConstants.TRACE_LOG_ID, logId);
return invoker.invoke(invocation);
}
}
在之前我们分享的Dubbo远程调用实现之ReferenceBean中提到过的invoker是通过REF_PROTOCOL.refer(interfaceClass, urls.get(0))生成的
private static final Protocol REF_PROTOCOL = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
而REF_PROTOCOL又是通过dubbo的spi机制加载进来的。我们点开Protocol类可以发现Protocol有一个注解@SPI("dubbo"),这说明了protocol默认是dubbo的protocol,那么dubbo的protocol是哪个类呢?在dubbo的jar包META-INFO/org.apache.dubbo.rpc.Protocol文件中
filter=org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper
listener=org.apache.dubbo.rpc.protocol.ProtocolListenerWrapper
mock=org.apache.dubbo.rpc.support.MockProtocol
dubbo=org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol
injvm=org.apache.dubbo.rpc.protocol.injvm.InjvmProtocol
http=org.apache.dubbo.rpc.protocol.http.HttpProtocol
rmi=org.apache.dubbo.rpc.protocol.rmi.RmiProtocol
hessian=org.apache.dubbo.rpc.protocol.hessian.HessianProtocol
org.apache.dubbo.rpc.protocol.webservice.WebServiceProtocol
thrift=org.apache.dubbo.rpc.protocol.thrift.ThriftProtocol
native-thrift=org.apache.dubbo.rpc.protocol.nativethrift.ThriftProtocol
memcached=org.apache.dubbo.rpc.protocol.memcached.MemcachedProtocol
redis=org.apache.dubbo.rpc.protocol.redis.RedisProtocol
rest=org.apache.dubbo.rpc.protocol.rest.RestProtocol
xmlrpc=org.apache.dubbo.xml.rpc.protocol.xmlrpc.XmlRpcProtocol
grpc=org.apache.dubbo.rpc.protocol.grpc.GrpcProtocol
registry=org.apache.dubbo.registry.integration.RegistryProtocol
service-discovery-registry=org.apache.dubbo.registry.client.ServiceDiscoveryRegistryProtocol
qos=org.apache.dubbo.qos.protocol.QosProtocolWrapper
从这个文件中我们可以看出我们加载的是DubboProtocol这个类。但是我们只是加载了这个类,和dubbo的filter机制有什么关系呢?别着急,在dubbo的spi机制中有一个特殊逻辑,在找到了指定的加载类后,如果发现还存在包装类,还会将包装类加载进来。
private T createExtension(String name) {
Class<?> clazz = getExtensionClasses().get(name);
if (clazz == null) {
throw findException(name);
}
try {
T instance = (T) EXTENSION_INSTANCES.get(clazz);
if (instance == null) {
EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
instance = (T) EXTENSION_INSTANCES.get(clazz);
}
injectExtension(instance);
Set<Class<?>> wrapperClasses = cachedWrapperClasses;
if (CollectionUtils.isNotEmpty(wrapperClasses)) {
for (Class<?> wrapperClass : wrapperClasses) {
instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
}
}
initExtension(instance);
return instance;
} catch (Throwable t) {
throw new IllegalStateException("Extension instance (name: " + name + ", class: " +
type + ") couldn't be instantiated: " + t.getMessage(), t);
}
}
而在刚刚的文件中第一个protocol实现类ProtocolFilterWrapper就是一个filter的包装类。点开这个类,我们可以看到这个类实际上就是将dubbo的filter包装成一个invoker来执行filter中我们加的特殊逻辑。ProtocolFilterWrapper通过不断遍历filter,当前的filter持有上一个filter的引用来完成filter机制。这种实现方式我们通常也称之为责任链模式
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
if (!filters.isEmpty()) {
for (int i = filters.size() - 1; i >= 0; i--) {
final Filter filter = filters.get(i);
final Invoker<T> next = last;
last = new Invoker<T>() {
@Override
public Class<T> getInterface() {
return invoker.getInterface();
}
@Override
public URL getUrl() {
return invoker.getUrl();
}
@Override
public boolean isAvailable() {
return invoker.isAvailable();
}
@Override
public Result invoke(Invocation invocation) throws RpcException {
Result asyncResult;
try {
asyncResult = filter.invoke(next, invocation);
} catch (Exception e) {
if (filter instanceof ListenableFilter) {// Deprecated!
Filter.Listener listener = ((ListenableFilter) filter).listener();
if (listener != null) {
listener.onError(e, invoker, invocation);
}
} else if (filter instanceof Filter.Listener) {
Filter.Listener listener = (Filter.Listener) filter;
listener.onError(e, invoker, invocation);
}
throw e;
} finally {
}
return asyncResult.whenCompleteWithContext((r, t) -> {
if (filter instanceof ListenableFilter) {// Deprecated!
Filter.Listener listener = ((ListenableFilter) filter).listener();
if (listener != null) {
if (t == null) {
listener.onMessage(r, invoker, invocation);
} else {
listener.onError(t, invoker, invocation);
}
}
} else if (filter instanceof Filter.Listener) {
Filter.Listener listener = (Filter.Listener) filter;
if (t == null) {
listener.onMessage(r, invoker, invocation);
} else {
listener.onError(t, invoker, invocation);
}
} else {// Deprecated!
filter.onResponse(r, invoker, invocation);
}
});
}
@Override
public void destroy() {
invoker.destroy();
}
@Override
public String toString() {
return invoker.toString();
}
};
}
}
return last;
}
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
这段代码大家可以理解成,加载我们写的filter以及dubbo默认的filter,也是利用了dubbo的spi机制。dubbo