写点什么

Ribbon 从入门到源码解析

作者:李子捌
  • 2022 年 4 月 01 日
  • 本文字数:10517 字

    阅读完需:约 35 分钟

Ribbon从入门到源码解析

1、简介

在微服务架构中,服务拆分成一个个的微服务,并且以集群化的方式进行部署;此时服务与服务之间的调用变得复杂了起来,客户端需要自主选择调用服务端集群中的某个服务,这就是我们经常说到的客户端负载均衡,在 Spring Cloud 生态中使用的比较广泛的技术是 Ribbon。

2、案例

无论是使用 Fegin 还是 RestTemplate 发起服务调用,客户端负载均衡均是通过 Ribbon 来实现,这里使用 RestTemplate 演示案例。

2.1 搭建服务注册中心 EurekaServer

  • pom 依赖

<dependency>    <groupId>org.springframework.cloud</groupId>    <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId></dependency>
复制代码
  • application.yml

server:  port: 18888
spring:  application:    name: eurekaServer
eureka:  client:#    fetch-registry: false#    register-with-eureka: false    service-url:      defaultZone: http://127.0.0.1:18888/eureka
复制代码
  • 启动类

@EnableEurekaServer@SpringBootApplicationpublic class EurekaServerApplication {
    public static void main(String[] args) {        SpringApplication.run(EurekaServerApplication.class, args);    }
}
复制代码

2.2 搭建 order-service 服务

  • pom 依赖

<!--web--><dependency>      <groupId>org.springframework.boot</groupId>      <artifactId>spring-boot-starter-web</artifactId></dependency><!--EurekaClient--><dependency>      <groupId>org.springframework.cloud</groupId>      <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId></dependency>
复制代码
  • application.yml

# server portserver:  port: 18070
# namespring:  application:    name: order-service
# eureka servereureka:  client:    service-url:      defaultZone: http://127.0.0.1:18888/eureka
复制代码
  • 模拟业务代码

@RestController@RequestMapping("order")public class OrderController {        @Autowired    private OrderService orderService;        @GetMapping("{orderId}")    public Order queryOrderByUserId(@PathVariable("orderId") Long orderId) {        // 根据id查询订单并返回        return orderService.queryOrderById(orderId);    }}
复制代码


@Servicepublic class OrderService {
    @Autowired    private RestTemplate restTemplate;
    @Autowired    private OrderMapper orderMapper;
    public Order queryOrderById(Long orderId) {        // 1.查询订单        Order order = orderMapper.findById(orderId);        // 2、查询用户信息        if (Objects.nonNull(order)) {            String url = String.format("http://user-service/user/%s", order.getUserId());            User user = restTemplate.getForObject(url, User.class);            // 3、封装用户信息            order.setUser(user);        }        // 4.返回        return order;    }}
复制代码
  • 启动类中注入 RestTemplate 并开启负载均衡

@MapperScan("com.lzb.order.mapper")@SpringBootApplication@EnableEurekaClientpublic class OrderApplication {
    public static void main(String[] args) {        SpringApplication.run(OrderApplication.class, args);    }
    /**     * RestTemplate bean容器的注入     * LoadBalanced 负载均衡注解     * @return     */    @Bean    @LoadBalanced    public RestTemplate restTemplate() {        return new RestTemplate();    }
}
复制代码

2.3 搭建 user-service 服务

  • pom 依赖

<!--web--><dependency>      <groupId>org.springframework.boot</groupId>      <artifactId>spring-boot-starter-web</artifactId></dependency><!--EurekaClient--><dependency>      <groupId>org.springframework.cloud</groupId>      <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId></dependency>
复制代码
  • application.yml

# server portserver:  port: 18080
# namespring:  application:    name: user-service
# eureka servereureka:  client:    service-url:      defaultZone: http://127.0.0.1:18888/eureka
复制代码
  • 模拟业务代码

@RestController@RequestMapping("/user")public class UserController {
    @Autowired    private UserService userService;
    @GetMapping("/{id}")    public User queryById(@PathVariable("id") Long id) {        return userService.queryById(id);    }}
复制代码
  • 启动类

@MapperScan("com.lzb.user.mapper")@SpringBootApplication@EnableEurekaClientpublic class UserApplication {    public static void main(String[] args) {        SpringApplication.run(UserApplication.class, args);    }}
复制代码


2.4 服务启动

在上述服务搭建之后,可以看出 order-service 服务调用了 user-service 服务,因此我将 user-service 服务集群部署,并且在 order-service 注入了 RestTemplate 且标注了 LoadBalanced 注解;启动顺序如下所示:

  • 启动 EurekaServer

  • 启动 user-service

  • 启动 user-service2

  • 启动 order-service


关于 IDEA 如何集群启动某个服务,方式比较多,我这里介绍一种常用的方法,步骤如下:

  • 首先启动该服务,直至服务启动成功

  • 右键启动的服务,选择 Copy Configuration


  • Edit Configuration 中修改服务 Name;传入端口参数,在 Environment 中的 VM options 键入-Dserver.port=xxxx;点击 Apply;点击 OK 即可;


  • 启动服务,右上角选择刚刚编辑的服务信息,DEBUG 启动即可。


  • 服务启动后 Eureka Server 中服务注册信息如下所示

image.png

2.5 测试结果

清空 user-service 和 user-service2 的控制台日志,在浏览器中请求四次 order-service,order-service 中会通过 RestTemplate 调用 order-service,由于 RestTemplate 使用了 LoadBlanced 注解修饰,因此 Ribbon 托管了 RestTemplate,在发起调用之前会解析服务名获取服务 Ip 和 port,然后根据负载均衡策略选择服务进行调用!

可以在 console 打印的日志中看出,第一次请求大到了 user-service,第二次请求打到了 user-service1,第三次请求大到了 user-service,第四次请求打到了 user-service1



3、Ribbon 如何实现负载均衡

可以试想一下,如果是你本人去实现一个 Ribbon 的功能你会怎么做?我想大家的思路应该都差不多如下:

  • 拦截 Http 请求

  • 解析请求中的服务名

  • 在 Eureka Client 拉取的 Eureka Server 中注册的可用服务信息中,根据服务名获取服务 IP 和 Port 信息

  • 根据负载均衡策略选择服务提供者发起 http 请求

3.1 拦截 http 请求

在 springboot 中常用的拦截器有三个:

  • org.springframework.web.servlet.HandlerInterceptor

  • org.springframework.http.client.ClientHttpRequestInterceptor

  • feign.RequestInterceptor

三者均是对 http 请求进行拦截,但是 3 个拦截器应用的项目不同,HandlerInterceptor 主要是处理 http servlet 请求;ClientHttpRequestInterceptor 主要是处理 HttpTemplate 请求或者 Ribbon 请求;RequestInterceptor 用于处理 Fegin 请求,Fegin 本质上是 http 请求;因此很明显,Ribbon 实现的是 ClientHttpRequestInterceptor 拦截器。


3.2 解析请求中的服务名

org.springframework.http.client.ClientHttpRequestInterceptor 接口中只有一个方法 intercept(),其子类均会重写该方法 org.springframework.cloud.client.loadbalancer.LoadBalancerInterceptor,在该方法入口处打上断点。并且在浏览器中访问 order-service,order-service 中会使用 RestTemplate 请求 user-service

此时可以看到 request.getURI()得到的是http://user-service/user/4

通过 final URI originalUri = request.getURI(); String serviceName = originalUri.getHost();解析获得服务名


3.3 根据服务名获取服务 IP 和 Port 信息

在 org.springframework.cloud.client.loadbalancer.LoadBalancerInterceptor 类中重写的 intercept()方法,最后一行代码至关重要,this.requestFactory.createRequest(request, body, execution)为包装 http 请求,不是很重要,最终的是 org.springframework.cloud.netflix.ribbon.RibbonLoadBalancerClient 类中 execute()方法。

此处的 serviceId 即为服务名 user-service,this.getLoadBalancer(serviceId);会根据服务名从 eureka 中解析中对应的服务地址和端口。

this.getLoadBalancer(serviceId)方法调用了 org.springframework.cloud.netflix.ribbon.SpringClientFactory 类中的 getLoadBalancer()方法,随后调用了 org.springframework.cloud.netflix.ribbon.SpringClientFactory.getInstance()方法,之后调用了其父类 org.springframework.cloud.context.named.NamedContextFactory.getInstance()方法,最终返回 org.springframework.context.annotation.AnnotationConfigApplicationContext,可以看到其实获取的是 spring 容器中的 ILoadBalancer.class 实现类 com.netflix.loadbalancer.DynamicServerListLoadBalancer 实例。

那现在还有最后一个问题,DynamicServerListLoadBalancer 实例中的服务信息是怎么来的呢?这里其实是 Eureka Clinet 从 Eureka Server 中拉取的服务列表。

3.4 根据负载均衡策略发起 http 请求

最后一步就是根据负载均衡策略选择服务提供者发起 http 请求,负载均衡策略的选择在 com.netflix.loadbalancer.ZoneAwareLoadBalancer 的 chooseServer()方法中实现。在选择发起请求的服务之后执行 org.springframework.cloud.netflix.ribbon.RibbonLoadBalancerClient 中的 execute()方法即完成整个 Ribbon 负载均衡过程。

4、简单源码解析

在 Ribbon 整个源码体系中,ILoadBalancer 接口的类关系图十分重要,因此源码解析也会根据这张图的类关系图来。


4.1 ILoadBalancer

com.netflix.loadbalancer.ILoadBalancer 是一个顶层接口类,该类中定义了几个未实现的方法,具体实现在子类中完成。

方法作用如下所示:

4.2 AbstractLoadBalancer

com.netflix.loadbalancer.AbstractLoadBalancer 是一个抽象类,它实现了 com.netflix.loadbalancer.ILoadBalancer 接口;其源码非常少,如下所示:

public abstract class AbstractLoadBalancer implements ILoadBalancer {        public enum ServerGroup{        ALL,        STATUS_UP,        STATUS_NOT_UP            }
    public Server chooseServer() {     return chooseServer(null);    }
    public abstract List<Server> getServerList(ServerGroup serverGroup);
    public abstract LoadBalancerStats getLoadBalancerStats();    }
复制代码

AbstractLoadBalancer 抽象类中定义类一个 ServerGroup 内部枚举类,ServerGroup 用于标志服务实例的分组类型:

  • ALL 表示所有服务

  • STATUS_UP 表示正常服务

  • STATUS_NOT_UP 表示下线服务

4.3 BaseLoadBalancer

com.netflix.loadbalancer.BaseLoadBalancer 类继承了 com.netflix.loadbalancer.AbstractLoadBalancer,BaseLoadBalancer 类源码比较复杂,但是有几个点是比较重要的。

  • allServerList 用于保存所有服务实例

  • upServerList 用于保存所有在线服务实例

@Monitor(name = PREFIX + "AllServerList", type = DataSourceType.INFORMATIONAL)protected volatile List<Server> allServerList = Collections        .synchronizedList(new ArrayList<Server>());@Monitor(name = PREFIX + "UpServerList", type = DataSourceType.INFORMATIONAL)protected volatile List<Server> upServerList = Collections        .synchronizedList(new ArrayList<Server>());
复制代码
  • 定义负载均衡默认策略为轮询

private final static IRule DEFAULT_RULE = new RoundRobinRule(); protected IRule rule = DEFAULT_RULE;
复制代码
  • IPingStrategy 表示服务检查策略,用于检查服务是否可用;默认的服务检查策略为 SerialPingStrategy,SerialPingStrategy 中的 pingServers 方法就是遍历所有服务实例,一个个发送 ping 请求,查看服务是否有效。

private final static SerialPingStrategy DEFAULT_PING_STRATEGY = new SerialPingStrategy();protected IPingStrategy pingStrategy = DEFAULT_PING_STRATEGY;
复制代码
  • BaseLoadBalancer 构造函数中启动了一个 PingTask,PingTask 每隔 10 秒钟会 ping 一次服务列表中的服务是否可用,PingTask 中干的事情就是 pingStrategy 服务检查策略。

protected int pingIntervalSeconds = 10;
public BaseLoadBalancer() {    this.name = DEFAULT_NAME;    this.ping = null;    setRule(DEFAULT_RULE);    setupPingTask();    lbStats = new LoadBalancerStats(DEFAULT_NAME);}
void setupPingTask() {    if (canSkipPing()) {        return;    }    if (lbTimer != null) {        lbTimer.cancel();    }    lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + name,            true);    lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000);    forceQuickPing();}
复制代码

4.4 DynamicServerListLoadBalancer

com.netflix.loadbalancer.DynamicServerListLoadBalancer 类继承了 com.netflix.loadbalancer.BaseLoadBalancer,因此 DynamicServerListLoadBalancer 类主要是对 BaseLoadBalancer 类功能进行扩展,DynamicServerListLoadBalancer 类源码比较复杂,但是有几个点是比较重要的。

  • serverListImpl 是 DynamicServerListLoadBalancer 中声明的 ServerList 类型的变量,ServerList 接口中定义了两个方法

volatile ServerList<T> serverListImpl;
复制代码
  • getInitialListOfServers 方法用于获取所有初始化服务列表

  • getUpdatedListOfServers 方法用于获取更新的服务实例列表

public interface ServerList<T extends Server> {
    public List<T> getInitialListOfServers();        public List<T> getUpdatedListOfServers();   
}
复制代码
  • ServerList 接口有 5 个实现类,DynamicServerListLoadBalancer 默认实现是 DomainExtractingServerList,但是 DomainExtractingServerList 构造函数中传入的是 DiscoveryEnabledNIWSServerList(可以看我下面 Debug 的图),因此可以看出重点类其实是 DiscoveryEnabledNIWSServerList




  • DiscoveryEnabledNIWSServerList 类中一个比较重要的方法是 obtainServersViaDiscovery 方法,可以从名字看出这是通过注册中心获取服务列表,代码中可以看出依赖 EurekaClient 从服务注册中心中获取具体的服务实例 InstanceInfo

private List<DiscoveryEnabledServer> obtainServersViaDiscovery() {        List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>();
        if (eurekaClientProvider == null || eurekaClientProvider.get() == null) {            logger.warn("EurekaClient has not been initialized yet, returning an empty list");            return new ArrayList<DiscoveryEnabledServer>();        }
        EurekaClient eurekaClient = eurekaClientProvider.get();        if (vipAddresses!=null){            for (String vipAddress : vipAddresses.split(",")) {                // if targetRegion is null, it will be interpreted as the same region of client                List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion);                for (InstanceInfo ii : listOfInstanceInfo) {                    if (ii.getStatus().equals(InstanceStatus.UP)) {
                        if(shouldUseOverridePort){                            if(logger.isDebugEnabled()){                                logger.debug("Overriding port on client name: " + clientName + " to " + overridePort);                            }
                            // copy is necessary since the InstanceInfo builder just uses the original reference,                            // and we don't want to corrupt the global eureka copy of the object which may be                            // used by other clients in our system                            InstanceInfo copy = new InstanceInfo(ii);
                            if(isSecure){                                ii = new InstanceInfo.Builder(copy).setSecurePort(overridePort).build();                            }else{                                ii = new InstanceInfo.Builder(copy).setPort(overridePort).build();                            }                        }
                        DiscoveryEnabledServer des = createServer(ii, isSecure, shouldUseIpAddr);                        serverList.add(des);                    }                }                if (serverList.size()>0 && prioritizeVipAddressBasedServers){                    break; // if the current vipAddress has servers, we dont use subsequent vipAddress based servers                }            }        }        return serverList;    }

复制代码
  • DiscoveryEnabledNIWSServerList 类中另一个比较重要点是定义了一个 ServerListUpdater.UpdateAction 更新器,该更新器用于更新服务信息。ServerListUpdater 提供两个实现类 com.netflix.niws.loadbalancer.EurekaNotificationServerListUpdater 和 com.netflix.loadbalancer.PollingServerListUpdater;其中 EurekaNotificationServerListUpdater 通过 Eureka 的事件监听机制来更新服务信息;而此处默认的是 PollingServerListUpdater 定时任务更新机制。


  • PollingServerListUpdater 代码中可以看出定时任务延迟启动 initialDelayMs 为 1 秒,刷新频率 refreshIntervalMs 为 30 秒

private static long LISTOFSERVERS_CACHE_UPDATE_DELAY = 1000; // msecs;private static int LISTOFSERVERS_CACHE_REPEAT_INTERVAL = 30 * 1000; // msecs;  
public PollingServerListUpdater() {    this(LISTOFSERVERS_CACHE_UPDATE_DELAY, LISTOFSERVERS_CACHE_REPEAT_INTERVAL);}
public PollingServerListUpdater(final long initialDelayMs, final long refreshIntervalMs) {    this.initialDelayMs = initialDelayMs;    this.refreshIntervalMs = refreshIntervalMs;}
public synchronized void start(final UpdateAction updateAction) {    if (isActive.compareAndSet(false, true)) {        final Runnable wrapperRunnable = new Runnable() {            @Override            public void run() {                if (!isActive.get()) {                    if (scheduledFuture != null) {                        scheduledFuture.cancel(true);                    }                    return;                }                try {                    updateAction.doUpdate();                    lastUpdated = System.currentTimeMillis();                } catch (Exception e) {                    logger.warn("Failed one update cycle", e);                }            }        };
        scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(                wrapperRunnable,                initialDelayMs,                refreshIntervalMs,                TimeUnit.MILLISECONDS        );    } else {        logger.info("Already active, no-op");    }}
复制代码
  • 在 DynamicServerListLoadBalancer 定义了一个变量 ServerListFilter,可以看到在 updateListOfServers 方法中,会判断 filter 是否为空,然后对 getUpdatedListOfServers 获取到的服务列表 servers 执行 getFilteredListOfServers 方法,其实就是对服务列表根据 ServerListFilter 接口的实现类逻辑进行过滤。

volatile ServerListFilter<T> filter;
public void updateListOfServers() {    List<T> servers = new ArrayList<T>();    if (serverListImpl != null) {        servers = serverListImpl.getUpdatedListOfServers();        LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",                getIdentifier(), servers);
        if (filter != null) {            servers = filter.getFilteredListOfServers(servers);            LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",                    getIdentifier(), servers);        }    }    updateAllServerList(servers);}

复制代码
  • ServerListFilter 的实现类如下所示,默认的实现类是 DefaultNIWSServerListFilter,但是 DefaultNIWSServerListFilter 啥也没有,仅仅继承了 ZoneAffinityServerListFilter;因此具体的功能还是在 ZoneAffinityServerListFilter 中实现,而 ZoneAffinityServerListFilter 主要提供的是对服务提供者所处的 Zone 和服务消费者所在的 Zone 进行比较,过滤掉不在一个 Zone 的实例。

4.5 ZoneAwareLoadBalancer

com.netflix.loadbalancer.ZoneAwareLoadBalancer 是 com.netflix.loadbalancer.DynamicServerListLoadBalancer 的唯一子类,在 DynamicServerListLoadBalancer 中还有一个非常重要的方法没有实现,那就是 chooseServer 方法。chooseServer 用于负载均衡器选择服务器进行调用,因此 ZoneAwareLoadBalancer 的出现就是解决这个问题。此外 ZoneAwareLoadBalancer 重写了 setServerListForZones 方法,setServerListForZones 方法 getLoadBalancer(zone)用于创建负载均衡器; existingLBEntry.getValue().setServersList(Collections.emptyList())用于清除不包含 server 的 zone

protected void setServerListForZones(Map<String, List<Server>> zoneServersMap) {    super.setServerListForZones(zoneServersMap);    if (balancers == null) {        balancers = new ConcurrentHashMap<String, BaseLoadBalancer>();    }    for (Map.Entry<String, List<Server>> entry: zoneServersMap.entrySet()) {        String zone = entry.getKey().toLowerCase();        getLoadBalancer(zone).setServersList(entry.getValue());    }    // check if there is any zone that no longer has a server    // and set the list to empty so that the zone related metrics does not    // contain stale data    for (Map.Entry<String, BaseLoadBalancer> existingLBEntry: balancers.entrySet()) {        if (!zoneServersMap.keySet().contains(existingLBEntry.getKey())) {            existingLBEntry.getValue().setServersList(Collections.emptyList());        }    }
复制代码


发布于: 刚刚阅读数: 2
用户头像

李子捌

关注

InfoQ签约作者|华为云享专家 |CSDN博客专家 2020.07.20 加入

公众号【李子捌】

评论

发布
暂无评论
Ribbon从入门到源码解析_微服务_李子捌_InfoQ写作平台