`
attend
  • 浏览: 34767 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

dubbo源代码阅读

 
阅读更多

一,自定义的spring配置

基于sping 扩展schma 利用 DubboNamespaceHandler 实现对自定义schema的解析。见配置文件:spring.handlers spring.schemas

 

二,Consumer对于服务接口的透明调用

 

基于Javassist的动态代理模式,自动生成代理类。

通过InvokerInvocationHandlerinvoker调用:

return invoker.invoke(new RpcInvocation(method, args)).recreate(); 

invoker  RPC通信,基于minanetty等。

 

三,dubbo扩展机制

实现方式类似sunspi模式,实现自身的可扩展性。简单实现了接口的注入。

1Extension 注解 value=组件的名字

具体实现见ExtensionLoader

 

2,主要方法:

loadExtensionClasses 加载所有实现了META-INF/services目录下文件中的类,文件名为接口名。根据Extension注解的名字为keyCLASSVALUE放到缓存的MAP中。

getAdaptiveExtension 利用代码生成创建一下接口的适配器类:

 

Protocol

Cluster

ProxyFactory

等等

 

这个适配器类以Adaptive注解声明的值或者接口名为KEY,从URL中的参数或者URL getProtocol() 作为key的值,

然后ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(key ) 获得相应的实例。

 

 

getExtension(STR)方法获得接口实例,第一次取时创建,并按顺序实例化包装类,并把最后一个包装类返回。

 

Class<?> clazz = getExtensionClasses().get(name);

        if (clazz == null) {

            throw new IllegalStateException("No such extension " + type.getName() + " by name " + name);

        }

        try {

//实例化并自动注入一些接口的适配器

            T instance = injectExtension((T) clazz.newInstance());

            Set<Class<?>> wrapperClasses = cachedWrapperClasses;

            if (wrapperClasses != null && wrapperClasses.size() > 0) {

                for (Class<?> wrapperClass : wrapperClasses) {

                    instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));

//injectExtension 实例化包装类,并注入接口的适配器,  注意这个地方返回的是最后一个包装类。

                }

            }

            return instance;

        } catch (Throwable t) {

            throw new IllegalStateException("Extension instance(name: " + name + ", class: " +

                    type + ")  could not be instantiated: " + t.getMessage(), t);

        }

 

比如获得Protocol接口的实例   private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension(); 过程如下:

ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension() 利用javassist动态生成一个Protocol$Adaptive的实例,

生成的class反编译后代码如下:

 

//消费者调用这个方法获得Invoker

  public Invoker refer(Class paramClass, URL paramURL)

    throws RpcException

  {

    if (paramURL == null)

      throw new IllegalArgumentException("url == null");

    URL localURL = paramURL;

//默认的协议dubbo

    String str = (localURL.getProtocol() == null) ? "dubbo" : localURL.getProtocol();

    if (str == null)

      throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + localURL.toString() + ") use keys([protocol])");

    

 

/*注意这个地方获得的Protocol实例是:

     *com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper 包装类。主要是为了实现

     *invoker的包装。    

     *com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper->

     *com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper->

     *com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol

*层层包装创建ListenerInvokerWrapper,实现InvokerListener调用;创建InvokerChain 实现Filter的调用。

*默认的客户端过滤链 "consumercontext", "compatible", "deprecated", "collect", "genericimpl", "activelimit", "monitor", "future" 

*com.alibaba.dubbo.rpc.RpcConstants.DEFAULT_REFERENCE_FILTERS

*/

Protocol localProtocol = (Protocol)ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(str);

    return localProtocol.refer(paramClass, paramURL);

  }

 

  //服务提供者调用这个方法,发布服务。

  public Exporter export(Invoker paramInvoker)

    throws RpcException

  {

    if (paramInvoker == null)

      throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");

    if (paramInvoker.getUrl() == null)

      throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");

    URL localURL = paramInvoker.getUrl();

    String str = (localURL.getProtocol() == null) ? "dubbo" : localURL.getProtocol();

    if (str == null)

      throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + localURL.toString() + ") use keys([protocol])");

/*这个地方获得的Protocol实例同样是

*com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper 包装类,实现对于对

*invoker的包装

*加载的服务端过滤链,默认是  "context", "token", "exception", "echo", "generic", "accesslog", "trace", "classloader", "executelimit", "monitor" ,"timeout"

*com.alibaba.dubbo.rpc.RpcConstants.DEFAULT_SERVICE_FILTERS

*/

Protocol localProtocol = (Protocol)ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(str);

    return localProtocol.export(paramInvoker);

  }

  

  

之所以默认获得的Protocol实例是com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper,因为在配置文件中

META-INF/services/com.alibaba.dubbo.rpc.Protocol文件内容如下:

com.alibaba.dubbo.registry.support.RegistryProtocol

com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper

com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper --最后一个包装类

com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol

com.alibaba.dubbo.rpc.protocol.injvm.InjvmProtocol

com.alibaba.dubbo.rpc.protocol.rmi.RmiProtocol

com.alibaba.dubbo.rpc.protocol.hessian.HessianProtocol

 

 

四,Wrapper帮助类

 

提高调用某一个类的某一个方法的性能(避免反射调用)

使用javassist动态生成一个Wrapper的子类,实现抽象方法invokeMethod

/**

 * invoke method.

 * 

 * @param instance instance.

 * @param mn method name.

 * @param types 

 * @param args argument array.

 * @return return value.

 */

abstract public Object invokeMethod(Object instance, String mn, Class<?>[] types, Object[] args) throws NoSuchMethodException, InvocationTargetException;

 

 

生成的class的代码类似

 

public Object invokeMethod(Object paramObject, String paramString, Class[] paramArrayOfClass, Object[] paramArrayOfObject)

    throws InvocationTargetException

  {

    RegistryService localRegistryService;

    try

    {

      localRegistryService = (RegistryService)paramObject;

    }

    catch (Throwable localThrowable1)

    {

      throw new IllegalArgumentException(localThrowable1);

    }

    try

    {

      if ("register".equals(paramString))

      {

        localRegistryService.register((URL)paramArrayOfObject[0]);

        return null;

      }

      if ("subscribe".equals(paramString))

      {

        localRegistryService.subscribe((URL)paramArrayOfObject[0], (NotifyListener)paramArrayOfObject[1]);

        return null;

      }

      if ("unregister".equals(paramString))

      {

        localRegistryService.unregister((URL)paramArrayOfObject[0]);

        return null;

      }

      if ("unsubscribe".equals(paramString))

      {

        localRegistryService.unsubscribe((URL)paramArrayOfObject[0], (NotifyListener)paramArrayOfObject[1]);

        return null;

      }

    }

    catch (Throwable localThrowable2)

    {

      throw new InvocationTargetException(localThrowable2);

    }

    throw new NoSuchMethodException("Not found method \"" + paramString + "\" in class com.alibaba.dubbo.registry.RegistryService.");

  }

 直接动态生成了一个子类,没有通过反射调用。

 

 

Consumer 服务消费方分析

dubbo-demo-simple-consumer的源码为分析的起点,解析Consumer的运行流程。

1,配置文件的解析

<!-- 引用远程服务配置 -->

<dubbo:reference id="demoService" interface="com.alibaba.dubbo.demo.DemoService"/>

首先dubbo基于springschma扩展机制实现了自定义的命名空间定义和配置的解析。

源码见:com.alibaba.dubbo.config.spring.schema.DubboNamespaceHandler

com.alibaba.dubbo.config.spring.schema.DubboBeanDefinitionParser

 

 registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false)); 

DubboBeanDefinitionParserReferenceBean实现对dubbo:reference的解析。

 

2,获得服务代理

 

 

在  DemoService demoService = (DemoService)context.getBean("demoService");  ReferenceBean作为FactoryBean实现DemoService接口的代理对象的创建,见源码:

com.alibaba.dubbo.config.spring.ReferenceBean.getObject()
com.alibaba.dubbo.config.ReferenceConfig.get()
com.alibaba.dubbo.config.ReferenceConfig.init()

 

 

ReferenceConfig.java

 

获得protocol,cluster,proxyFactory接口的实例,实际调用如下:

 

protocol    --Protocol$Adaptive- com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper--

cluster     --com.alibaba.dubbo.rpc.cluster.support.FailoverCluster

proxyFactory--com.alibaba.dubbo.rpc.proxy.javassist.JavassistProxyFactory

 

createProxy 方法解析:

I,获得URL,从dubbo:reference url属性或者从loadRegistries();通过注册中心配置拼装URL。设置URLprotocolConstants.REGISTRY_PROTOCOL registry

II获得 invoker = protocol.refer(interfaceClass, urls.get(0)); 

Protocol$Adaptive-->根据URL的协议获得 com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper->com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper-->com.alibaba.dubbo.registry.support.RegistryProtocol

    refer(interfaceClass, urls.get(0)) ->  com.alibaba.dubbo.registry.support.RegistryProtocol.refer->

1RegistryFactory--getRegistry  RegistryFactory根据url配置可能是DubboRegistryFactoryMulticastRegistryFactoryZookeeperRegistryFactory

2new RegistryDirectory  创建注册中心目录服务

3registry.subscribe 订阅服务,--rpc远程访问registryService,RegistryDirectory作为 NotifyListener 

        -->RegistryDirectory.notify(urls)-->urls-toInvokers //此时生成了接口及接口的方法对应的invoker列表

4,cluster.merge(directory)   默认FailoverCluster生成FailoverClusterInvoker(RegistryDirectory)

创建invoker完成

 

III,创建代理(T) proxyFactory.getProxy(invoker)  --JavassistProxyFactory-》  (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));

使用Javassist创建了两个CLASS

public class Proxy1 extends Proxy

  implements ClassGenerator.DC

{

  public Object newInstance(InvocationHandler paramInvocationHandler)

  {

    return new proxy1(paramInvocationHandler);

  }

}

 

public class proxy1

  implements ClassGenerator.DC, EchoService, DemoService

{

  public static Method[] methods;

  private InvocationHandler handler;

 

  public String sayHello(String paramString)

  {

    Object[] arrayOfObject = new Object[1];

    arrayOfObject[0] = paramString;

    Object localObject = this.jdField_handler_of_type_JavaLangReflectInvocationHandler.invoke(this, jdField_methods_of_type_ArrayOfJavaLangReflectMethod[0], arrayOfObject);

    return ((String)localObject);

  }

 

  public Object $echo(Object paramObject)

  {

    Object[] arrayOfObject = new Object[1];

    arrayOfObject[0] = paramObject;

    Object localObject = this.jdField_handler_of_type_JavaLangReflectInvocationHandler.invoke(this, jdField_methods_of_type_ArrayOfJavaLangReflectMethod[1], arrayOfObject);

    return ((Object)localObject);

  }

 

  public proxy1(InvocationHandler paramInvocationHandler)

  {

    this.jdField_handler_of_type_JavaLangReflectInvocationHandler = paramInvocationHandler;

  }

}

 

返回proxy1xxx接口的子类。

 

 

4,动态代理背后的故事,以dubbo协议的通信为例

 

从生成的proxy1的代码我们可以看到  sayHello(String str)时调用了 InvokerInvocationHandler.invoke(Object proxy, Method method, Object[] args)

-----invoker.invoke(new RpcInvocation(method, args)).recreate();

这里的invoker是对 com.alibaba.dubbo.rpc.protocol.dubbo.DubboInvoker 的层层包装,实现负载均衡、失败转移(FailoverClusterInvoker)、InvokerListenerInvokerChain的顺序调用。

Protocol.refer生成。

 

如:FailoverClusterInvoker-》    

    public Result invoke(final Invocation invocation) throws RpcException {

 

        if(destroyed){

            throw new RpcException("Rpc invoker for " + getInterface() + " on consumer " + NetUtils.getLocalHost() 

                    + " use dubbo version " + Version.getVersion()

                    + " is not destroyed! Can not invoke any more.");

        }

        

        LoadBalance loadbalance;

        

        List<Invoker<T>> invokers = directory.list(invocation);//从服务目录中找到所有的invoker,处理了router ,目前router只有ScriptRouter实现。

        if (invokers != null && invokers.size() > 0) {

//加载负载均衡算法

            loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()

                    .getMethodParameter(invocation.getMethodName(),Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));

        } else {

            loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);

        }

 

        return doInvoke(invocation, invokers, loadbalance);

    }

 

public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {

        if (invokers == null || invokers.size() == 0)

            throw new RpcException("No provider available for service " + getInterface().getName() + " on consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", Please check whether the service do exist or version is right firstly, and check the provider has started.");

 

        int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;

        if (len <= 0)

            len = 1;

 

        // retry loop.

        RpcException le = null; // last exception.

        List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(invokers.size()); // invoked invokers.

        Set<URL> providers = new HashSet<URL>(len);

//挨个试,如果失败就继续。实现Failover

        for (int i = 0; i < len; i++) {

            //boolean pp = false; // is provider problem.

            Invoker<T> invoker = select(loadbalance, invocation, invokers, invoked);

            invoked.add(invoker);

            providers.add(invoker.getUrl());

            try {

                return invoker.invoke(invocation);

            } catch (RpcException e) {

                if (e.isBiz()) throw e;

 

                le = e;

                //pp = true;

            } catch (Throwable e) // biz exception.

            {

                throw new RpcException(e.getMessage(), e);

            } finally {

                //if (pp) // if provider problem, fail over.

                //    inv.setWeight(0);

            }

        }

        List<URL> urls = new ArrayList<URL>(invokers.size());

        for(Invoker<T> invoker : invokers){

            if(invoker != null ) 

                urls.add(invoker.getUrl());

        }

        throw new RpcException(le.getCode(),"Tried " + len + " times to invoke providers " + providers + " " + loadbalance.getClass().getAnnotation(Extension.class).value() + " select from all providers " + invokers + " for service " + getInterface().getName() + " method " + invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", but no luck to perform the invocation. Last error is: " + (le != null ? le.getMessage() : ""), le);

    }

}

 

 

再来具体看看实际执行远程调用DubboInvoker

DubboInvokercom.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol 创建-

    public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {

        // find client.

        int channels = url.getPositiveParameter(Constants.CONNECTIONS_KEY, 1);

        ExchangeClient[] clients = new ExchangeClient[channels];

        if ( channels == 1){

            clients[0]  = getOrInitClient(url);

        } else {

            for (int i = 0; i < clients.length; i++) {

                clients[i] = getOrInitClient(url);  //默认初始化了一个LazyConnectExchangeClient-->init 时 Exchangers.connect(url, requestHandler);

            }

        }

        

        System.out.println(serviceType.getName()+":"+clients.length);

        // create rpc invoker.

        DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, clients);

        invokers.add(invoker);

        return invoker;

    }

  在Protocol里创建了,ExchangeClient,及 requestHandler

 

   

  DubboInvoker.doInvoke(final Invocation invocation) -->  (Result) currentClient.request(inv, timeout).get();  调用ExchangeClient发起请求

  

  默认通过NETTY框架通信。

  见com.alibaba.dubbo.remoting.transport.netty.NettyClient

  对协议的encode,decode实现:

  com.alibaba.dubbo.rpc.protocol.dubbo.DubboCodec   对于配置文件中的

  

  <dubbo:method name="subscribe">

<dubbo:argument index="1" callback="true" />

  </dubbo:method>

 

 callbacktrue的,DubboCodec在服务端自动生成参数代理, 通过RPC远程调用消费者的方法。此时的invoker为 ChannelWrappedInvoker 

 

 ChannelWrappedInvoker发起的请求,由DubboProtocolrequestHandler处理  received--》  

   if (message instanceof Invocation) {

                reply((ExchangeChannel) channel, message);

}

 

 比如消费者 subscribe 时,com.alibaba.dubbo.registry.support.SimpleRegistryService 处理完成后要调用消费者的NotifyLisenter .notify(urls)  

 

 消费者在发送请求时,DubboCodec根据URL中配置的方法的某一个参数的callback属性是否为true自动发布服务,以接受服务端的callback

 


Remote模块分析

客户端: dubbo协议,netty实现

发送请求:

HeaderExchangeClient --> request -->

   HeaderExchangeChannel-->request--> 

-->req=new Request()

-->DefaultFuture future new DefaultFuture(channel, req, timeout);

-->NettyClient->send(req)

-->NettyChannel-->send(req,boolean sent) 

-->NettyHander-->writeRequested

---> encode 

--->Netty->channel+writebuffer,writetask 放到IO线程的任务队列

--->NioWorker -->processWriteTaskQueue

--->write0 IO线程里发送数据

--->NettyClient-->sent

--->DefaultChannelHandler-->sent

--->HeaderExchangeHandler-->sent

--->DubboProtocol requestHandler -->sent

--->DefaultFuture.sent(channel, request);

--->DefaultFuture.doSent()  标记数据发送完的时间

接收返回结果:

--->decode

NettyHander --> messageReceived

--->DefaultChannelHandler --->received


   -->O线程解码完后,交给线程池处理。

executor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.RECEIVED, message));

-->ChannelEventRunnable -->run 

-->HeaderExchangeHandler.received

----->HeaderExchangeHandler--handleResponse

--->DefaultFuture.received(channel, response);

 

 

  

 

总体上说,dubboRPC做了很好的封装,能够实现透明的远程调用,在消费端实现了对于服务端调用的负载均衡算法、支持服务的集群。并且提供了监控接口,可通过WEB界面了解服务的情况(请求次数、有哪些服务、服务有几个提供者)等。 提供的注册中心支持服务的注册、取消注册,并通知消费端服务列表的变更。

 

没时间排版,有兴趣的凑合着看吧。

  

 

 

   

  

 

 

    

 

 

 

 

 

 

 

<!--EndFragment-->

1
0
分享到:
评论
3 楼 moonlight2010 2014-12-29  
楼主多谢几篇关于dubbo的文章,最近我再学习,不胜感激!
2 楼 weida-liu 2012-11-20  
不错   支持楼主多些几篇
1 楼 cheng_xiao 2012-02-07  
感谢

相关推荐

Global site tag (gtag.js) - Google Analytics