我们在项目中添加 Spring Cloud Sentinel 依赖添加后 spring-cloud-starter-alibaba-sentinel 在 Spring-Boot 启动的过程中回去初始化 spring.factories 中的配置信息,如:SentinelWebAutoConfiguration 、SentinelAutoConfiguration 等配置文件来初始化
再讲代码之前我先声明一下我的版本号sentinel 1.8.0 。后续的所有内容均基于该版本进行
@ResoureSetinel 工作原理
配置流控规则我们最简单的方式就是通过 @ResoureSetinel 的方式来管理,该注解可以直接定义流控规则、降级规则。下面是一个简单的使用例子:
- @SentinelResource(value="ResOrderGet",
- fallback="fallback",
- fallbackClass=SentinelResourceExceptionHandler.class,
- blockHandler="blockHandler",
- blockHandlerClass=SentinelResourceExceptionHandler.class
- )
- @GetMapping("/order/get/{id}")
- publicCommonResult<StockModel>getStockDetails(@PathVariableIntegerid){
- StockModelstockModel=newStockModel();
- stockModel.setCode("STOCK==>1000");
- stockModel.setId(id);
- returnCommonResult.success(stockModel);
- }
如果大家熟悉 Spring 相关的组件大家都可以想到,这里多半是通过Spring Aop. 的方式来拦截 getStockDetails 方法。我们先看看SentinelAutoConfiguration 配置文件,我们可以找到 SentinelResourceAspect Bean 的定义方法。
- @Bean
- @ConditionalOnMissingBean
- publicSentinelResourceAspectsentinelResourceAspect(){
- returnnewSentinelResourceAspect();
- }
让后我们再来看看 SentinelResourceAspect 具体是怎么处理的,源码如下:
- //定义Pointcut
- @Pointcut("@annotation(com.alibaba.csp.sentinel.annotation.SentinelResource)")
- publicvoidsentinelResourceAnnotationPointcut(){
- }
- //Around来对被标记@SentinelResource注解的方法进行处理
- @Around("sentinelResourceAnnotationPointcut()")
- publicObjectinvokeResourceWithSentinel(ProceedingJoinPointpjp)throwsThrowable{
- MethodoriginMethod=resolveMethod(pjp);
- //获取注解信息
- SentinelResourceannotation=originMethod.getAnnotation(SentinelResource.class);
- //获取资源名称
- StringresourceName=getResourceName(annotation.value(),originMethod);
- EntryTypeentryType=annotation.entryType();
- intresourceType=annotation.resourceType();
- Entryentry=null;
- try{
- //执行entry
- entry=SphU.entry(resourceName,resourceType,entryType,pjp.getArgs());
- //执行业务方法
- Objectresult=pjp.proceed();
- //返回
- returnresult;
- }catch(BlockExceptionex){
- //处理BlockException
- returnhandleBlockException(pjp,annotation,ex);
- }catch(Throwableex){
- Class<?extendsThrowable>[]exceptionsToIgnore=annotation.exceptionsToIgnore();
- //Theignorelistwillbecheckedfirst.
- if(exceptionsToIgnore.length>0&&exceptionBelongsTo(ex,exceptionsToIgnore)){
- throwex;
- }
- if(exceptionBelongsTo(ex,annotation.exceptionsToTrace())){
- traceException(ex);
- //处理降级
- returnhandleFallback(pjp,annotation,ex);
- }
- //Nofallbackfunctioncanhandletheexception,sothrowitout.
- throwex;
- }
- }
我们总结一下, @SentinelResource 的执行过程, 首先是通过 Aop 进行拦截,然后通过 SphU.entry 执行对应的流控规则,最后调用业务方法。如果触发流控规则首先处理流控异常 BlockException 然后在判断是否有服务降级的处理,如果有就调用 fallback 方法。通过 handleBlockException 、handleFallback 进行处理。
责任链模式处理流控
通过上面的梳理,我们知道对于流控的过程,核心处理方法就是 SphU.entry 。在这个方法中其实主要就是初始化流控 Solt 和执行 Solt. 在这个过程中会对:簇点定义、流量控制、熔断降级、系统白名单等页面功能进行处理。
1. 初始化责任链
下面是初始化 Solt 的核心代码在 SphU.entryWithPriority
- //删减部分代码
- privateEntryentryWithPriority(ResourceWrapperresourceWrapper,intcount,booleanprioritized,Object…args)
- throwsBlockException{
- //初始化责任链
- ProcessorSlot<Object>chain=lookProcessChain(resourceWrapper);
- Entrye=newCtEntry(resourceWrapper,chain,context);
- try{
- //执行entry
- chain.entry(context,resourceWrapper,null,count,prioritized,args);
- }catch(BlockExceptione1){
- e.exit(count,args);
- //异常抛出,让SentinelResourceAspect.invokeResourceWithSentinel统一处理
- throwe1;
- }catch(Throwablee1){
- //Thisshouldnothappen,unlessthereareerrorsexistinginSentinelinternal.
- RecordLog.info("Sentinelunexpectedexception",e1);
- }
- returne;
- }
通过 lookProcessChain 方法我逐步的查找,我们可以看到最终的责任链初始化类,默认是 DefaultSlotChainBuilder
- publicclassDefaultSlotChainBuilderimplementsSlotChainBuilder{
- @Override
- publicProcessorSlotChainbuild(){
- ProcessorSlotChainchain=newDefaultProcessorSlotChain();
- //Note:theinstancesofProcessorSlotshouldbedifferent,sincetheyarenotstateless.
- //通过SPI去加载所有的ProcessorSlot实现,通过Order排序
- List<ProcessorSlot>sortedSlotList=SpiLoader.loadPrototypeInstanceListSorted(ProcessorSlot.class);
- for(ProcessorSlotslot:sortedSlotList){
- if(!(slotinstanceofAbstractLinkedProcessorSlot)){
- RecordLog.warn("TheProcessorSlot("+slot.getClass().getCanonicalName()+")isnotaninstanceofAbstractLinkedProcessorSlot,can'tbeaddedintoProcessorSlotChain");
- continue;
- }
- //添加到chain尾部
- chain.addLast((AbstractLinkedProcessorSlot<?>)slot);
- }
- returnchain;
- }
- }
2. 责任链的处理过程
我们可以通过断点的方式来查看在 sortedSlotList 集合中所有的 solt 顺序如下图所示:
我们可以通过如下的顺序进行逐个的简单的分析一下
- NodeSelectorSolt
- CusterBuilderSolt
- LogSlot
- StatisicSlot
- AuthoritySolt
- SystemSolts
- ParamFlowSolt
- FlowSolt
- DegradeSlot
对于 Sentinel 的 Slot 流控协作流程可以参考官方给出的文档, 如下图所示:
FlowSolt 流控
通过 NodeSelectorSolt、CusterBuilderSolt、StatisicSlot 等一系列的请求数据处理,在 FlowSolt会进入流控规则,所有的 Solt 都会执行 entry 方法, 如下所示
- //FlowSolt的entry方法
- @Override
- publicvoidentry(Contextcontext,ResourceWrapperresourceWrapper,DefaultNodenode,intcount,
- booleanprioritized,Object…args)throwsThrowable{
- //检查流量
- checkFlow(resourceWrapper,context,node,count,prioritized);
- fireEntry(context,resourceWrapper,node,count,prioritized,args);
- }
在后续的流程中,会执进行判断具体的流控策略,默认是快速失败,会执行 DefaultController 方法。
- //DefaultController
- @Override
- publicbooleancanPass(Nodenode,intacquireCount,booleanprioritized){
- //当前资源的调用次数
- intcurCount=avgUsedTokens(node);
- //当前资源的调用次数+1>当前阈值
- if(curCount+acquireCount>count){
- //删减比分代码
- //不通过
- returnfalse;
- }
- //通过
- returntrue;
- }
- privateintavgUsedTokens(Nodenode){
- if(node==null){
- returnDEFAULT_AVG_USED_TOKENS;
- }
- returngrade==RuleConstant.FLOW_GRADE_THREAD?node.curThreadNum():(int)(node.passQps());
- }
如果上面返回不通过会回到,那么会抛出 FlowException
- publicvoidcheckFlow(Function<String,Collection<FlowRule>>ruleProvider,ResourceWrapperresource,
- Contextcontext,DefaultNodenode,intcount,booleanprioritized)throwsBlockException{
- if(ruleProvider==null||resource==null){
- return;
- }
- Collection<FlowRule>rules=ruleProvider.apply(resource.getName());
- if(rules!=null){
- for(FlowRulerule:rules){
- if(!canPassCheck(rule,context,node,count,prioritized)){
- //流控规则不通过,会抛出FlowException
- thrownewFlowException(rule.getLimitApp(),rule);
- }
- }
- }
- }
然后会在 StatisticSlot 中增加统计信息, 最后会抛出给 SentinelResourceAspect 进行处理,完成流控功能。我们再来看看这个异常信息,如果是BlockException 异常,会进入 handleBlockException 方法处理, 如果是其他的业务异常首先会判断是否有配置 fallback 处理如果有,就调用 handleFallback 没有就继续往外抛,至此完成流控功能
- try{
- entry=SphU.entry(resourceName,resourceType,entryType,pjp.getArgs());
- Objectresult=pjp.proceed();
- returnresult;
- }catch(BlockExceptionex){
- returnhandleBlockException(pjp,annotation,ex);
- }catch(Throwableex){
- Class<?extendsThrowable>[]exceptionsToIgnore=annotation.exceptionsToIgnore();
- //Theignorelistwillbecheckedfirst.
- if(exceptionsToIgnore.length>0&&exceptionBelongsTo(ex,exceptionsToIgnore)){
- throwex;
- }
- if(exceptionBelongsTo(ex,annotation.exceptionsToTrace())){
- traceException(ex);
- returnhandleFallback(pjp,annotation,ex);
- }
- //Nofallbackfunctioncanhandletheexception,sothrowitout.
- throwex;
- }
DegradeSlot 降级
断路器的作用是当某些资源一直出现故障时,将触发断路器。断路器不会继续访问已经发生故障的资源,而是拦截请求并返回故障信号。
Sentinel 在 DegradeSlot 这个 Slot 中实现了熔断降级的功能,它有三个状态 OPEN 、HALF_OPEN、CLOSED 以ResponseTimeCircuitBreaker RT 响应时间维度来分析, 断路器工作的过程。下面是一个标准断路器的工作流程:
在 Sentinel 实现的源码过程如下图所示:
Sentinel 通过 Web 拦截器
Sentinel 在默认情况下, 不使用 @ResourceSentinel 注解实现流控的时候, Sentinel 通过拦截器进行流控实现的。初始化类在 SentinelWebAutoConfiguration 它实现了 WebMvcConfigurer 接口,在 sentinelWebInterceptor 方法初始化 SentinelWebInterceptor 等 Bean。
- @Bean
- @ConditionalOnProperty(name="spring.cloud.sentinel.filter.enabled",
- matchIfMissing=true)
- publicSentinelWebInterceptorsentinelWebInterceptor(
- SentinelWebMvcConfigsentinelWebMvcConfig){
- returnnewSentinelWebInterceptor(sentinelWebMvcConfig);
- }
我们在 SentinelWebInterceptor 的核心方法 preHandle 中处理,这里面我们又可以看到 SphU.entry 熟悉的过程调用流控的责任链。由于逻辑都类似,此处不再多说。代码如下:
- publicbooleanpreHandle(HttpServletRequestrequest,HttpServletResponseresponse,Objecthandler)
- throwsException{
- try{
- StringresourceName=getResourceName(request);
- if(StringUtil.isEmpty(resourceName)){
- returntrue;
- }
- if(increaseReferece(request,this.baseWebMvcConfig.getRequestRefName(),1)!=1){
- returntrue;
- }
- //Parsetherequestoriginusingregisteredoriginparser.
- Stringorigin=parseOrigin(request);
- StringcontextName=getContextName(request);
- ContextUtil.enter(contextName,origin);
- Entryentry=SphU.entry(resourceName,ResourceTypeConstants.COMMON_WEB,EntryType.IN);
- request.setAttribute(baseWebMvcConfig.getRequestAttributeName(),entry);
- returntrue;
- }catch(BlockExceptione){
- try{
- handleBlockException(request,response,e);
- }finally{
- ContextUtil.exit();
- }
- returnfalse;
- }
- }
参考文档
https://github.com/alibaba/Sentinel/wiki
https://martinfowler.com/bliki/CircuitBreaker.html
原文链接:https://mp.weixin.qq.com/s/88ZnMVwvMKVk_uWTOvK-qg






