淘先锋技术网

首页 1 2 3 4 5 6 7

Dubbo的filter机制相当于springmvc中的过滤器,利用它我们可以实现很多功能,如token传递,异常的通用处理,分布式链路的实现等等。其使用也相当简单,只需要我们写一个类实现Filter即可。

public class DubboFilter implements Filter {
    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        //从MDC中获取
        String logId = MDC.get(MsConstants.TRACE_LOG_ID);
        Map<String, String> attachments = invocation.getAttachments();
        attachments.put(MsConstants.TRACE_LOG_ID, logId);
        return invoker.invoke(invocation);
    }
}

        在之前我们分享的Dubbo远程调用实现之ReferenceBean中提到过的invoker是通过REF_PROTOCOL.refer(interfaceClass, urls.get(0))生成的

    private static final Protocol REF_PROTOCOL = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();

        而REF_PROTOCOL又是通过dubbo的spi机制加载进来的。我们点开Protocol类可以发现Protocol有一个注解@SPI("dubbo"),这说明了protocol默认是dubbo的protocol,那么dubbo的protocol是哪个类呢?在dubbo的jar包META-INFO/org.apache.dubbo.rpc.Protocol文件中

filter=org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper
listener=org.apache.dubbo.rpc.protocol.ProtocolListenerWrapper
mock=org.apache.dubbo.rpc.support.MockProtocol
dubbo=org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol
injvm=org.apache.dubbo.rpc.protocol.injvm.InjvmProtocol
http=org.apache.dubbo.rpc.protocol.http.HttpProtocol
rmi=org.apache.dubbo.rpc.protocol.rmi.RmiProtocol
hessian=org.apache.dubbo.rpc.protocol.hessian.HessianProtocol
org.apache.dubbo.rpc.protocol.webservice.WebServiceProtocol
thrift=org.apache.dubbo.rpc.protocol.thrift.ThriftProtocol
native-thrift=org.apache.dubbo.rpc.protocol.nativethrift.ThriftProtocol
memcached=org.apache.dubbo.rpc.protocol.memcached.MemcachedProtocol
redis=org.apache.dubbo.rpc.protocol.redis.RedisProtocol
rest=org.apache.dubbo.rpc.protocol.rest.RestProtocol
xmlrpc=org.apache.dubbo.xml.rpc.protocol.xmlrpc.XmlRpcProtocol
grpc=org.apache.dubbo.rpc.protocol.grpc.GrpcProtocol
registry=org.apache.dubbo.registry.integration.RegistryProtocol
service-discovery-registry=org.apache.dubbo.registry.client.ServiceDiscoveryRegistryProtocol
qos=org.apache.dubbo.qos.protocol.QosProtocolWrapper

        从这个文件中我们可以看出我们加载的是DubboProtocol这个类。但是我们只是加载了这个类,和dubbo的filter机制有什么关系呢?别着急,在dubbo的spi机制中有一个特殊逻辑,在找到了指定的加载类后,如果发现还存在包装类,还会将包装类加载进来。

    private T createExtension(String name) {
        Class<?> clazz = getExtensionClasses().get(name);
        if (clazz == null) {
            throw findException(name);
        }
        try {
            T instance = (T) EXTENSION_INSTANCES.get(clazz);
            if (instance == null) {
                EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
                instance = (T) EXTENSION_INSTANCES.get(clazz);
            }
            injectExtension(instance);
            Set<Class<?>> wrapperClasses = cachedWrapperClasses;
            if (CollectionUtils.isNotEmpty(wrapperClasses)) {
                for (Class<?> wrapperClass : wrapperClasses) {
                    instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
                }
            }
            initExtension(instance);
            return instance;
        } catch (Throwable t) {
            throw new IllegalStateException("Extension instance (name: " + name + ", class: " +
                    type + ") couldn't be instantiated: " + t.getMessage(), t);
        }
    }

        而在刚刚的文件中第一个protocol实现类ProtocolFilterWrapper就是一个filter的包装类。点开这个类,我们可以看到这个类实际上就是将dubbo的filter包装成一个invoker来执行filter中我们加的特殊逻辑。ProtocolFilterWrapper通过不断遍历filter,当前的filter持有上一个filter的引用来完成filter机制。这种实现方式我们通常也称之为责任链模式

   private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
        Invoker<T> last = invoker;
        List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);

        if (!filters.isEmpty()) {
            for (int i = filters.size() - 1; i >= 0; i--) {
                final Filter filter = filters.get(i);
                final Invoker<T> next = last;
                last = new Invoker<T>() {

                    @Override
                    public Class<T> getInterface() {
                        return invoker.getInterface();
                    }

                    @Override
                    public URL getUrl() {
                        return invoker.getUrl();
                    }

                    @Override
                    public boolean isAvailable() {
                        return invoker.isAvailable();
                    }

                    @Override
                    public Result invoke(Invocation invocation) throws RpcException {
                        Result asyncResult;
                        try {
                            asyncResult = filter.invoke(next, invocation);
                        } catch (Exception e) {
                            if (filter instanceof ListenableFilter) {// Deprecated!
                                Filter.Listener listener = ((ListenableFilter) filter).listener();
                                if (listener != null) {
                                    listener.onError(e, invoker, invocation);
                                }
                            } else if (filter instanceof Filter.Listener) {
                                Filter.Listener listener = (Filter.Listener) filter;
                                listener.onError(e, invoker, invocation);
                            }
                            throw e;
                        } finally {

                        }
                        return asyncResult.whenCompleteWithContext((r, t) -> {
                            if (filter instanceof ListenableFilter) {// Deprecated!
                                Filter.Listener listener = ((ListenableFilter) filter).listener();
                                if (listener != null) {
                                    if (t == null) {
                                        listener.onMessage(r, invoker, invocation);
                                    } else {
                                        listener.onError(t, invoker, invocation);
                                    }
                                }
                            } else if (filter instanceof Filter.Listener) {
                                Filter.Listener listener = (Filter.Listener) filter;
                                if (t == null) {
                                    listener.onMessage(r, invoker, invocation);
                                } else {
                                    listener.onError(t, invoker, invocation);
                                }
                            } else {// Deprecated!
                                filter.onResponse(r, invoker, invocation);
                            }
                        });
                    }

                    @Override
                    public void destroy() {
                        invoker.destroy();
                    }

                    @Override
                    public String toString() {
                        return invoker.toString();
                    }
                };
            }
        }

        return last;
    }

        

List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);

        这段代码大家可以理解成,加载我们写的filter以及dubbo默认的filter,也是利用了dubbo的spi机制。dubbo