容错管理

最近更新时间:2020-08-06 15:14:52

操作场景
KMSE(微服务引擎)使用Resilience4j库,相比Hystrix更轻量,对外部依赖更少。

核心模块

  • resilience4j-circuitbreaker: 断路器
  • resilience4j-ratelimiter: 限流
  • resilience4j-bulkhead: 隔离板
  • resilience4j-retry: 自动重试

前提条件
向工程中添加依赖,在pom.xml中添加以下依赖

<dependency>
  <groupId>com.ksyun.kmse</groupId>
  <artifactId>spring-cloud-kmse-starter-fault-tolerant</artifactId>
  <version>${version}</version>
</dependency>

操作步骤
CircuitBreaker(断路器)使用

  1. 创建配置
    在控制台“服务治理>>容错管理”页面新建容错类型规则,如下图

容错管理

各项对应可配置参数如下:

配置参数 默认值 描述
failureRateThreshold 50 熔断器关闭状态和半开状态使用的同一个失败率阈值
ringBufferSizeInHalfOpenState 10 熔断器半开状态的缓冲区大小,会限制线程的并发量,例如缓冲区为10则每次只会允许10个请求调用后端服务
ringBufferSizeInClosedState 100 熔断器关闭状态的缓冲区大小,不会限制线程的并发量,在熔断器发生状态转换前所有请求都会调用后端服务
waitDurationInOpenState 60(s) 熔断器从打开状态转变为半开状态等待的时间,单位:(ms, s, m)
recordExceptions empty 需要记录为失败的异常列表
ignoreExceptions empty 需要忽略的异常列表
recordFailurePredicate throwable -> true 自定义的谓词逻辑用于判断异常是否需要记录或者需要忽略,默认所有异常都进行记录

对应application.yml配置(本地开发可直接编写配置,以下例子依据此配置)

resilience4j.circuitbreaker.instances:
  backendA:
    failureRateThreshold: 50
    recordFailurePredicate: com.ksyun.exception.RecordFailurePredicate
    ignoreExceptions:
      - com.ksyun.order.exception.BusinessException
    recordExceptions:
      - org.springframework.web.client.HttpServerErrorException
    ringBufferSizeInClosedState: 5
    ringBufferSizeInHalfOpenState: 3
    waitDurationInOpenState: 60s
  1. 调用方法
    CircuitBreaker目前支持两种方式调用,一种是程序式调用,一种是AOP使用注解的方式调用。

注解方式调用
首先在要保护的方法上使用@CircuitBreaker(name="",fallbackMethod="")注解,其中name是要使用的断路器的名称, fallbackMethod是要使用的降级方法,降级方法必须和原方法放在同一个类中,且降级方法的返回值和原方法相同,输入参数需要添加额外的exception参数,示例代码:

  • BusinessService接口定义各类情况的方法
 public interface BusinessService {
      String failure();

      String failureNotAOP() throws Throwable;

      String success();

      String failureWithFallback();

      String ignoreException();

  }
  • 实现类BusinessServiceImpl
  import com.ksyun.exception.BusinessException;
  import io.github.resilience4j.circuitbreaker.annotation.CircuitBreaker;
  import org.springframework.http.HttpStatus;
  import org.springframework.stereotype.Service;
  import org.springframework.web.client.HttpServerErrorException;

  @Service
  public class BusinessServiceImpl implements BusinessService {
    /**
     * 抛出record异常
     * @return
     */
    @Override
    @CircuitBreaker(name = "backendA")
    public String failure() {
        throw new HttpServerErrorException(HttpStatus.INTERNAL_SERVER_ERROR, "接口服务异常");
    }

    @Override
    public String failureNotAOP() throws Throwable {
        return null;
    }

    @Override
    public String success() {
        return null;
    }

    /**
     * 带fallback处理
     * @return
     */
    @Override
    @CircuitBreaker(name = "backendA", fallbackMethod = "fallback")
    public String failureWithFallback() {
        return failure();
    }

    /**
     * 降级处理
     * @param ex
     * @return
     */
    private String fallback(HttpServerErrorException ex) {
        return "fallback: Recovered HttpServerErrorException: " + ex.getMessage();
    }

    /**
     * 抛出ignore异常
     * @return
     */
    @CircuitBreaker(name = "backendA", fallbackMethod = "fallback")
    public String ignoreException() {
        throw new BusinessException("This exception is ignored by the CircuitBreaker of backend A");
    }
  }
  • 自定义异常BusinessException
public class BusinessException extends RuntimeException {

      public BusinessException(String message) {
          super(message);
      }
  }
  • 自定义失败谓词,返回true则记录异常,返回false则忽略异常
import java.util.function.Predicate;

  public class RecordFailurePredicate implements Predicate<Throwable> {
      @Override
      public boolean test(Throwable throwable) {
          return !(throwable instanceof BusinessException);
      }
  }

通过一个请求接口模拟各类情况

@RestController
@RequestMapping(value = "/backendA")
public class BackendAController {

    @Autowired
    private BusinessService businessService;

    @GetMapping("failure/{type}")
    public String failure(@PathVariable("type") String type) throws Throwable {
        if ("1".equals(type)){
            return businessService.failure();
        } else if ("2".equals(type)){
            return businessService.failureWithFallback();
        } else if("3".equals(type)){
            return businessService.ignoreException();
        } else if("4".equals(type)) {
            return businessService.failureNotAOP();
        } else {
            return businessService.success();
        }
    }
}
  • 调用接口 (type=1)

容错管理

前5次返回失败错误信息后(配置中缓存区大小为5,缓存区被填满后开始计算失败率),第6次直接返回断路器backenA被打开,之后1分钟内的请求都将快速失败。

  • 调用接口(type=2),带fallback处理

容错管理

第6次断路器backenA被打开,请求被降级至fallback方法中处理。

  • 调用接口(type=3),ignore异常

容错管理

当抛出异常为ignore时,失败次数不会被记入缓冲区,也无法触发断路器开关

  • 断路器打开一分钟之后变为半开状态,调用接口(type=5), 正常响应

容错管理

3次请求成功率超过50%,断路器关闭,否则断路器重新开启

容错管理

(失败率超过50%,断路器重新打开)

程序式调用
使用decorate包装服务的方法,再使用Try.of().recover()进行降级处理。

@Service(value = "businessAService")
public class BusinessAService implements BusinessService {
    @Autowired
    private CircuitBreakerRegistry circuitBreakerRegistry;
    @Override
    public String failureNotAOP() throws Throwable {
        CircuitBreaker circuitBreaker = circuitBreakerRegistry.circuitBreaker("backendA");
        CheckedFunction0<String> checkedSupplier=CircuitBreaker.decorateCheckedSupplier(circuitBreaker, backendAConnector::failureNotAOP);
        Try<String> result = Try.of(checkedSupplier).recover(CallNotPermittedException.class,   throwable -> {
            Log.info("服务降级");
            return "fallback";
        });
        return result.get();
    }
}

Retry(重试)使用

  1. 创建配置
    在控制台“服务治理>>容错管理”页面新建重试类型规则,如下图:

容错管理

各项对应可配置参数

配置参数 默认值 描述
maxAttempts 3 最大重试次数
waitDuration 500(ms) 固定重试间隔,单位(ms, s, m)
enableExponentialBackoff false 是否允许使用指数退避算法进行重试间隔时间的计算
retryExceptionPredicate throwable -> true 自定义异常重试规则,需要重试的返回true
retryExceptions empty 需要重试的异常列表
ignoreExceptions empty 需要忽略的异常列表

对应application.yml配置(本地开发可直接编写配置,以下例子依据此配置)

resilience4j.retry.instances:
  backendA:
    enableExponentialBackoff: true
    exponentialBackoffMultiplier: 2
    retryExceptionPredicate: com.ksyun.exception.RetryOnExceptionPredicate
    ignoreExceptions:
      - com.ksyun.order.exception.BusinessException
    maxRetryAttempts: 3
    retryExceptions:
      - org.springframework.web.client.HttpServerErrorException
    waitDuration: 5s
  1. 调用方法
    还是以之前的接口为例。Retry支持注解方式和程序式两种方式的调用。

注解方式调用
首先在方法上使用@Retry(name="",fallbackMethod="")注解,其中name是要使用的重试器实例的名称,fallbackMethod是要使用的降级方法:

/**
* 抛出需retry异常
* @return
*/
@Override
@Retry(name = "backendA")
public String failure() {
    throw new HttpServerErrorException(HttpStatus.INTERNAL_SERVER_ERROR, "接口服务异常");
}

/**
* 带fallback处理
* @return
*/
@Override
@Retry(name = "backendA", fallbackMethod = "fallback")
public String failureWithFallback() {
    return failure();
}

/**
* 降级处理
* @param ex
* @return
*/
private String fallback(HttpServerErrorException ex) {
    return "this is fallback with ex: " + ex.getMessage();
}

/**
* 抛出ignore异常
* @return
*/
@Override
@Retry(name = "backendA")
public String ignoreException() {
    throw new BusinessException("This exception is ignored by the CircuitBreaker of backend A");
}
  • 自定义重试异常谓词(需要重试的异常返回true,不需要则返回false)
import java.util.function.Predicate;

public class RetryExceptionPredicate implements Predicate<Throwable> {
    @Override
    public boolean test(Throwable throwable) {
        return !(throwable instanceof BusinessException);
    }
}
  • 调用接口(type=1)

容错管理

因为抛出异常为retryExceptions,所以请求重试了3次,等待时间为5s,最终耗时15s

  • 调用接口(type=2)

容错管理

重试之后仍抛出异常,则服务降级,调用fallback方法

  • 调用接口(type=3)

容错管理

因抛出异常为ignoreExceptions,所以没有进行重试

程序式调用

@Service(value = "businessAService")
public class BusinessAService implements BusinessService {
    @Autowired
    private RetryRegistry retryRegistry;

    @Override
    public String failureNotAOP() throws Throwable {
        Retry retry = retryRegistry.retry("backendA");
        CheckedFunction0<String> checkedSupplier = Retry.decorateCheckedSupplier(retry, backendAConnector::failureNotAOP);
        Try<String> result = Try.of(checkedSupplier).recover(CallNotPermittedException.class, throwable -> {
            Log.info("服务降级");
            return "fallback";
        });
        return result.get();
    }
}

Retry可以与CircuitBreaker 一起使用有2种调用方式,一种是先用重试组件装饰,再用熔断器装饰,这时熔断器的失败需要等重试结束才计算,另一种是先用熔断器装饰,再用重试组件装饰,这时每次调用服务都会记录进熔断器的缓冲环中,需要注意的是,第二种方式需要把CallNotPermittedException放进重试组件的白名单中,因为熔断器打开时重试是没有意义的。

@Override
public String failureNotAOP() throws Throwable {
    CircuitBreaker circuitBreaker = circuitBreakerRegistry.circuitBreaker("backendA");
    Retry retry = retryRegistry.retry("backendA");
    // 先用重试组件包装,再用熔断器包装
    CheckedFunction0<String> retrySupplier= Retry.decorateCheckedSupplier(retry, backendAConnector::failureNotAOP);
    CheckedFunction0<String> checkedSupplier =CircuitBreaker.decorateCheckedSupplier(circuitBreaker, retrySupplier);
    // 使用Try.of().recover()调用并进行降级处理
     ry<String> result = Try.of(checkedSupplier).recover(CallNotPermittedException.class, throwable -> {
        Log.info("断路器打开");
        return "fallback";
    });
    return result.get();
}

但是需要注意同时注解重试组件和熔断器的话,是按照第二种方案,即每一次请求都会被熔断器记录。

/**
* 抛出record异常
* @return
*/
@Override
@Retry(name = "backendA")
@CircuitBreaker(name = "backendA", fallbackMethod = "fallback")
public String failure() {
    throw new HttpServerErrorException(HttpStatus.INTERNAL_SERVER_ERROR, "接口服务异常");
}

RateLimter(限流)使用

  1. 创建配置
    在控制台“服务治理>>容错管理”页面新建限流类型规则,如下图:

容错管理

各项对应可配置参数

配置参数 默认值 描述
timeoutDuration 5[s] 线程等待权限的默认等待时间,单位(ms, s, m)
limitRefreshPeriod 500[ns] 单位时间:权限刷新的时间,每个周期结束后,RateLimiter将会把权限计数设置为limitForPeriod的值
limiteForPeriod 50 单位时间通过量:一个限制刷新期间的可用权限数

对应application.yml配置(本地开发可直接编写配置,以下例子依据此配置)

resilience4j.ratelimiter.instances:
  backendA:
    limitForPeriod: 5
    limitRefreshPeriod: 20s
    timeoutDuration: 5s
  1. 调用方式
    RateLimter目前支持两种方式调用,一种是程序式调用,一种是AOP使用注解的方式调用。
    注解方式调用
    调用一个正常接口,不抛出异常,为了让结果明显一些,程序中sleep5秒。
@Override
@RateLimiter(name="backendA")
public String success() {
    try {
        Thread.sleep(5000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "Hello World from backend A";
}

ab:Apache Bench,一个web压测工具,用来模拟并发,参考: http://httpd.apache.org/docs/2.4/programs/ab.html

用ab工具,同时发送5个请求,可以看到4个失败,只有1个成功。

容错管理

修改配置limitRefreshPeriod: 1s,重新测试,全部成功。可以看出即使服务还没完成,依然可以放入,只与时间有关,与线程无关。

容错管理

程序式调用
调用方法与前面类似,装饰方法之后用Try.of().recover()来执行

@Service(value = "businessAService")
public class BusinessAService implements BusinessService {
    @Autowired
    private RateLimiterRegistry rateLimiterRegistry;

    @Override
    public String failureNotAOP() throws Throwable {
        RateLimiter rateLimiter = rateLimiterRegistry.rateLimiter("name");
        CheckedFunction0<String> ratelimiterSupplier = RateLimiter.decorateCheckedSupplier(rateLimiter, backendAConnector::failureNotAOP);
        Try<String> result = Try.of(ratelimiterSupplier).recover(CallNotPermittedException.class, throwable -> {
            Log.info("服务降级");
            return "fallback";
        });
        return result.get();
    }
}

Bulkhead(隔离板)使用
Resilence4j的Bulkhead提供两种实现,一种是基于信号量的,另一种是基于有等待队列的固定大小的线程的,由于基于信号量的Bulkhead能很好地在多线程和I/O模型下工作,所以推荐基于信号量的Bulkhead的使用。

  1. 创建配置

容错管理

各项对应可配置参数

配置参数 默认值 描述
maxConcurrentCalls 25 可允许的最大并发线程数
maxWaitDuration 0 尝试进入饱和舱壁时应阻止线程的最大时间

对应application.yml配置(本地开发可直接编写配置,以下例子依据此配置)

resilience4j.bulkhead.instances:
  backendA:
    maxConcurrentCalls: 2
    maxWaitDuration: 100
  1. 调用方式
    Bulkhead目前支持两种方式调用,一种是程序式调用,一种是AOP使用注解的方式调用
    注解方式调用
    首先在连接器方法上使用@Bulkhead(name="", fallbackMethod="", type="")注解,其中name是要使用的Bulkhead实例的名称,fallbackMethod是要使用的降级方法,type是选择信号量或线程池的Bulkhead
@Override
@Bulkhead(name="backendA", type = Type.SEMAPHORE)
public String success() {
    try {
        Thread.sleep(5000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "Hello World from backend A";
}

用ab测试工具模拟并发请求,50个请求48个失败,只有2次成功。

容错管理

当最大并发达到上限时,此时请求将抛出异常。

容错管理

若有fallback方法,则服务降级至fallback中处理。

容错管理

程序式调用
调用方法都类似,装饰方法之后用Try.of().recover()来执行

@Service(value = "businessAService")
public class BusinessAService implements BusinessService {
    @Autowired
    private BulkheadRegistry bulkheadRegistry;

    @Override
    public String failureNotAOP() throws Throwable {
        Bulkhead bulkhead = bulkheadRegistry.bulkhead("backendA");
        CheckedFunction0<String> bulkheadSupplier = Bulkhead.decorateCheckedSupplier(bulkhead, backendAConnector::failureNotAOP);
        Try<String> result = Try.of(bulkheadSupplier).recover(CallNotPermittedException.class, throwable -> {
            Log.info("服务降级");
            return "fallback";
        });
        return result.get();
    }
}

注意
如果Retry、CircuitBreaker、Bulkhead、RateLimiter同时注解在方法上,默认的顺序是Bulkhead->RateLimiter->CircuitBreaker->Retry,即先控制并发再限流然后熔断最后重试。

金山云,开启您的云计算之旅

免费注册