什么是Rule
Prometheus支持用户自定义Rule规则。Rule分为两类,一类是Recording Rule,另一类是Alerting Rule。Recording Rule的主要目的是通过PromQL可以实时对Prometheus中采集到的样本数据进行查询,聚合以及其它各种运算操作。而在某些PromQL较为复杂且计算量较大时,直接使用PromQL可能会导致Prometheus响应超时的情况。这时需要一种能够类似于后台批处理的机制能够在后台完成这些复杂运算的计算,对于使用者而言只需要查询这些运算结果即可。Prometheus通过Recoding Rule规则支持这种后台计算的方式,可以实现对复杂查询的性能优化,提高查询效率。
今天主要带来告警规则的分析。Prometheus中的告警规则允许你基于PromQL表达式定义告警触发条件,Prometheus后端对这些触发规则进行周期性计算,当满足触发条件后则会触发告警通知。
什么是告警Rule
告警是prometheus的一个重要功能,接下来从源码的角度来分析下告警的执行流程。
怎么定义告警Rule
- groups:
- –name:example
- rules:
- -alert:HighErrorRate
- #指标需要在触发告警之前的10分钟内大于0.5。
- expr:job:request_latency_seconds:mean5m{job="myjob"}>0.5
- for:10m
- labels:
- severity:page
- annotations:
- summary:Highrequestlatency
- description:descriptioninfo
在告警规则文件中,我们可以将一组相关的规则设置定义在一个group下。在每一个group中我们可以定义多个告警规则(rule)。一条告警规则主要由以下几部分组成:
- alert:告警规则的名称。
- expr:基于PromQL表达式告警触发条件,用于计算是否有时间序列满足该条件。
- for:评估等待时间,可选参数。用于表示只有当触发条件持续一段时间后才发送告警。在等待期间新产生告警的状态为pending。
- labels:自定义标签,允许用户指定要附加到告警上的一组附加标签。
- annotations:用于指定一组附加信息,比如用于描述告警详细信息的文字等,annotations的内容在告警产生时会一同作为参数发送到Alertmanager。
Rule管理器
规则管理器会根据配置的规则,基于规则PromQL表达式告警的触发条件,用于计算是否有时间序列满足该条件。在满足该条件时,将告警信息发送给告警服务。
- typeManagerstruct{
- opts*ManagerOptions//外部的依赖
- groupsmap[string]*Group//当前的规则组
- mtxsync.RWMutex//规则管理器读写锁
- blockchanstruct{}
- donechanstruct{}
- restoredbool
- loggerlog.Logger
- }
- opts(*ManagerOptions类型):记录了Manager实例使用到的其他模块,例如storage模块、notify模块等。
- groups(map[string]*Group类型):记录了所有的rules.Group实例,其中key由rules.Group的名称及其所在的配置文件构成。
- mtx(sync.RWMutex类型):在读写groups字段时都需要获取该锁进行同步。
读取Rule组配置
在Prometheus Server启动的过程中,首先会调用Manager.Update()方法加载Rule配置文件并进行解析,其大致流程如下。
- 调用Manager.LoadGroups()方法加载并解析Rule配置文件,最终得到rules.Group实例集合。
- 停止原有的rules.Group实例,启动新的rules.Group实例。其中会为每个rules.Group实例启动一个goroutine,它会关联rules.Group实例下的全部PromQL查询。
- func(m*Manager)Update(intervaltime.Duration,files[]string,externalLabelslabels.Labels,externalURLstring)error{
- m.mtx.Lock()
- deferm.mtx.Unlock()
- //从当前文件中加载规则
- groups,errs:=m.LoadGroups(interval,externalLabels,externalURL,files…)
- iferrs!=nil{
- for_,e:=rangeerrs{
- level.Error(m.logger).Log("msg","loadinggroupsfailed","err",e)
- }
- returnerrors.New("errorloadingrules,previousrulesetrestored")
- }
- m.restored=true
- varwgsync.WaitGroup
- //循环遍历规则组
- for_,newg:=rangegroups{
- //Ifthereisanoldgroupwiththesameidentifier,
- //checkifnewgroupequalswiththeoldgroup,ifyesthenskipit.
- //Ifnotequals,stopitandwaitforittofinishthecurrentiteration.
- //Thencopyitintothenewgroup.
- //根据新的rules.Group的信息获取规则组名
- gn:=GroupKey(newg.file,newg.name)
- //根据规则组名获取到老的规则组并删除原有的rules.Group实例
- oldg,ok:=m.groups[gn]
- delete(m.groups,gn)
- ifok&&oldg.Equals(newg){
- groups[gn]=oldg
- continue
- }
- wg.Add(1)
- //为每一个rules.Group实例启动一个goroutine
- gofunc(newg*Group){
- ifok{
- oldg.stop()
- //将老的规则组中的状态信息复制到新的规则组
- newg.CopyState(oldg)
- }
- wg.Done()
- //Waitwithstartingevaluationuntiltherulemanager
- //istoldtorun.Thisisnecessarytoavoidrunning
- //queriesagainstabootstrappingstorage.
- <-m.block
- //调用rules.Group.run()方法,开始周期性的执行PromQl语句
- newg.run(m.opts.Context)
- }(newg)
- }
- //Stopremainingoldgroups.
- //停止所有老规则组的服务
- wg.Add(len(m.groups))
- forn,oldg:=rangem.groups{
- gofunc(nstring,g*Group){
- g.markStale=true
- g.stop()
- ifm:=g.metrics;m!=nil{
- m.IterationsMissed.DeleteLabelValues(n)
- m.IterationsScheduled.DeleteLabelValues(n)
- m.EvalTotal.DeleteLabelValues(n)
- m.EvalFailures.DeleteLabelValues(n)
- m.GroupInterval.DeleteLabelValues(n)
- m.GroupLastEvalTime.DeleteLabelValues(n)
- m.GroupLastDuration.DeleteLabelValues(n)
- m.GroupRules.DeleteLabelValues(n)
- m.GroupSamples.DeleteLabelValues((n))
- }
- wg.Done()
- }(n,oldg)
- }
- wg.Wait()
- //更新规则管理器中的规则组
- m.groups=groups
- returnnil
- }
运行Rule组调度方法
规则组启动流程(Group.run):进入Group.run方法后先进行初始化等待,以使规则的运算时间在同一时刻,周期为g.interval;然后定义规则运算调度方法:iter,调度周期为g.interval;在iter方法中调用g.Eval方法执行下一层次的规则运算调度。
规则运算的调度周期g.interval,由prometheus.yml配置文件中global中的 [ evaluation_interval:| default = 1m ]指定。实现如下:
- func(g*Group)run(ctxcontext.Context){
- deferclose(g.terminated)
- //Waitaninitialamounttohaveconsistentlyslottedintervals.
- evalTimestamp:=g.EvalTimestamp(time.Now().UnixNano()).Add(g.interval)
- select{
- case<-time.After(time.Until(evalTimestamp))://初始化等待
- case<-g.done:
- return
- }
- ctx=promql.NewOriginContext(ctx,map[string]interface{}{
- "ruleGroup":map[string]string{
- "file":g.File(),
- "name":g.Name(),
- },
- })
- //定义规则组规则运算调度算法
- iter:=func(){
- g.metrics.IterationsScheduled.WithLabelValues(GroupKey(g.file,g.name)).Inc()
- start:=time.Now()
- //规则运算的入口
- g.Eval(ctx,evalTimestamp)
- timeSinceStart:=time.Since(start)
- g.metrics.IterationDuration.Observe(timeSinceStart.Seconds())
- g.setEvaluationTime(timeSinceStart)
- g.setLastEvaluation(start)
- }
- //Theassumptionhereisthatsincethetickerwasstartedafterhaving
- //waitedfor`evalTimestamp`topass,thetickswilltriggersoon
- //aftereach`evalTimestamp+N*g.interval`occurrence.
- tick:=time.NewTicker(g.interval)//设置规则运算定时器
- defertick.Stop()
- deferfunc(){
- if!g.markStale{
- return
- }
- gofunc(nowtime.Time){
- for_,rule:=rangeg.seriesInPreviousEval{
- for_,r:=rangerule{
- g.staleSeries=append(g.staleSeries,r)
- }
- }
- //Thatcanbegarbagecollectedatthispoint.
- g.seriesInPreviousEval=nil
- //Waitfor2intervalstogivetheopportunitytorenamedrules
- //toinsertnewseriesinthetsdb.Atthispointifthereisa
- //renamedrule,itshouldalreadybestarted.
- select{
- case<-g.managerDone:
- case<-time.After(2*g.interval):
- g.cleanupStaleSeries(ctx,now)
- }
- }(time.Now())
- }()
- //调用规则组规则运算的调度方法
- iter()
- ifg.shouldRestore{
- //Ifwehavetorestore,wewaitforanotherEvaltofinish.
- //Thereasonbehindthisis,duringfirsteval(orbeforeit)
- //wemightnothaveenoughdatascraped,andrecordingruleswouldnot
- //haveupdatedthelatestvalues,onwhichsomealertsmightdepend.
- select{
- case<-g.done:
- return
- case<-tick.C:
- missed:=(time.Since(evalTimestamp)/g.interval)-1
- ifmissed>0{
- g.metrics.IterationsMissed.WithLabelValues(GroupKey(g.file,g.name)).Add(float64(missed))
- g.metrics.IterationsScheduled.WithLabelValues(GroupKey(g.file,g.name)).Add(float64(missed))
- }
- evalTimestamp=evalTimestamp.Add((missed+1)*g.interval)
- iter()
- }
- g.RestoreForState(time.Now())
- g.shouldRestore=false
- }
- for{
- select{
- case<-g.done:
- return
- default:
- select{
- case<-g.done:
- return
- case<-tick.C:
- missed:=(time.Since(evalTimestamp)/g.interval)-1
- ifmissed>0{
- g.metrics.IterationsMissed.WithLabelValues(GroupKey(g.file,g.name)).Add(float64(missed))
- g.metrics.IterationsScheduled.WithLabelValues(GroupKey(g.file,g.name)).Add(float64(missed))
- }
- evalTimestamp=evalTimestamp.Add((missed+1)*g.interval)
- //调用规则组规则运算的调度方法
- iter()
- }
- }
- }
- }
运行Rule调度方法
规则组对具体规则的调度在Group.Eval中实现,在Group.Eval方法中会将规则组下的每条规则通过QueryFunc将(promQL)放到查询引擎(queryEngine)中执行,如果被执行的是AlertingRule类型,那么执行结果指标会被NotifyFunc组件发送给告警服务;如果是RecordingRule类型,最后将改结果指标存储到Prometheus的储存管理器中,并对过期指标进行存储标记处理。
- //Evalrunsasingleevaluationcycleinwhichallrulesareevaluatedsequentially.
- func(g*Group)Eval(ctxcontext.Context,tstime.Time){
- varsamplesTotalfloat64
- 遍历当前规则组下的所有规则
- fori,rule:=rangeg.rules{
- select{
- case<-g.done:
- return
- default:
- }
- func(iint,ruleRule){
- sp,ctx:=opentracing.StartSpanFromContext(ctx,"rule")
- sp.SetTag("name",rule.Name())
- deferfunc(ttime.Time){
- sp.Finish()
- //更新服务指标-规则的执行时间
- since:=time.Since(t)
- g.metrics.EvalDuration.Observe(since.Seconds())
- rule.SetEvaluationDuration(since)
- //记录本次规则执行的耗时
- rule.SetEvaluationTimestamp(t)
- }(time.Now())
- //记录规则运算的次数
- g.metrics.EvalTotal.WithLabelValues(GroupKey(g.File(),g.Name())).Inc()
- //运算规则
- vector,err:=rule.Eval(ctx,ts,g.opts.QueryFunc,g.opts.ExternalURL)
- iferr!=nil{
- //规则出现错误后,终止查询
- rule.SetHealth(HealthBad)
- rule.SetLastError(err)
- //记录查询失败的次数
- g.metrics.EvalFailures.WithLabelValues(GroupKey(g.File(),g.Name())).Inc()
- //Canceledqueriesareintentionalterminationofqueries.Thisnormally
- //happensonshutdownandthusweskiploggingofanyerrorshere.
- if_,ok:=err.(promql.ErrQueryCanceled);!ok{
- level.Warn(g.logger).Log("msg","Evaluatingrulefailed","rule",rule,"err",err)
- }
- return
- }
- samplesTotal+=float64(len(vector))
- //判断是否是告警类型规则
- ifar,ok:=rule.(*AlertingRule);ok{
- 发送告警
- ar.sendAlerts(ctx,ts,g.opts.ResendDelay,g.interval,g.opts.NotifyFunc)
- }
- var(
- numOutOfOrder=0
- numDuplicates=0
- )
- //此处为Recording获取存储器指标
- app:=g.opts.Appendable.Appender(ctx)
- seriesReturned:=make(map[string]labels.Labels,len(g.seriesInPreviousEval[i]))
- deferfunc(){
- iferr:=app.Commit();err!=nil{
- rule.SetHealth(HealthBad)
- rule.SetLastError(err)
- g.metrics.EvalFailures.WithLabelValues(GroupKey(g.File(),g.Name())).Inc()
- level.Warn(g.logger).Log("msg","Rulesampleappendingfailed","err",err)
- return
- }
- g.seriesInPreviousEval[i]=seriesReturned
- }()
- for_,s:=rangevector{
- if_,err:=app.Append(0,s.Metric,s.T,s.V);err!=nil{
- rule.SetHealth(HealthBad)
- rule.SetLastError(err)
- switcherrors.Cause(err){
- 储存指标返回的各种错误码处理
- casestorage.ErrOutOfOrderSample:
- numOutOfOrder++
- level.Debug(g.logger).Log("msg","Ruleevaluationresultdiscarded","err",err,"sample",s)
- casestorage.ErrDuplicateSampleForTimestamp:
- numDuplicates++
- level.Debug(g.logger).Log("msg","Ruleevaluationresultdiscarded","err",err,"sample",s)
- default:
- level.Warn(g.logger).Log("msg","Ruleevaluationresultdiscarded","err",err,"sample",s)
- }
- }else{
- //缓存规则运算后的结果指标
- seriesReturned[s.Metric.String()]=s.Metric
- }
- }
- ifnumOutOfOrder>0{
- level.Warn(g.logger).Log("msg","Erroroningestingout-of-orderresultfromruleevaluation","numDropped",numOutOfOrder)
- }
- ifnumDuplicates>0{
- level.Warn(g.logger).Log("msg","Erroroningestingresultsfromruleevaluationwithdifferentvaluebutsametimestamp","numDropped",numDuplicates)
- }
- formetric,lset:=rangeg.seriesInPreviousEval[i]{
- if_,ok:=seriesReturned[metric];!ok{
- //设置过期指标的指标值
- //Seriesnolongerexposed,markitstale.
- _,err=app.Append(0,lset,timestamp.FromTime(ts),math.Float64frombits(value.StaleNaN))
- switcherrors.Cause(err){
- casenil:
- casestorage.ErrOutOfOrderSample,storage.ErrDuplicateSampleForTimestamp:
- //Donotcounttheseinlogging,asthisisexpectedifseries
- //isexposedfromadifferentrule.
- default:
- level.Warn(g.logger).Log("msg","Addingstalesamplefailed","sample",metric,"err",err)
- }
- }
- }
- }(i,rule)
- }
- ifg.metrics!=nil{
- g.metrics.GroupSamples.WithLabelValues(GroupKey(g.File(),g.Name())).Set(samplesTotal)
- }
- g.cleanupStaleSeries(ctx,ts)
- }
然后就是规则的具体执行了,我们这里先只看AlertingRule的流程。首先看下AlertingRule的结构:
- //AnAlertingRulegeneratesalertsfromitsvectorexpression.
- typeAlertingRulestruct{
- //Thenameofthealert.
- namestring
- //Thevectorexpressionfromwhichtogeneratealerts.
- vectorparser.Expr
- //Thedurationforwhichalabelsetneedstopersistintheexpression
- //outputvectorbeforeanalerttransitionsfromPendingtoFiringstate.
- holdDurationtime.Duration
- //Extralabelstoattachtotheresultingalertsamplevectors.
- labelslabels.Labels
- //Non-identifyingkey/valuepairs.
- annotationslabels.Labels
- //Externallabelsfromtheglobalconfig.
- externalLabelsmap[string]string
- //trueifoldstatehasbeenrestored.WestartpersistingsamplesforALERT_FOR_STATE
- //onlyaftertherestoration.
- restoredbool
- //Protectsthebelow.
- mtxsync.Mutex
- //Timeinsecondstakentoevaluaterule.
- evaluationDurationtime.Duration
- //Timestampoflastevaluationofrule.
- evaluationTimestamptime.Time
- //Thehealthofthealertingrule.
- healthRuleHealth
- //Thelasterrorseenbythealertingrule.
- lastErrorerror
- //Amapofalertswhicharecurrentlyactive(PendingorFiring),keyedby
- //thefingerprintofthelabelsettheycorrespondto.
- activemap[uint64]*Alert
- loggerlog.Logger
- }
这里比较重要的就是active字段了,它保存了执行规则后需要进行告警的资源,具体是否告警还要执行一系列的逻辑来判断是否满足告警条件。具体执行的逻辑如下:
- func(r*AlertingRule)Eval(ctxcontext.Context,tstime.Time,queryQueryFunc,externalURL*url.URL)(promql.Vector,error){
- res,err:=query(ctx,r.vector.String(),ts)
- iferr!=nil{
- r.SetHealth(HealthBad)
- r.SetLastError(err)
- returnnil,err
- }
- //……
- }
这一步通过创建Manager时传入的QueryFunc函数执行规则配置中的expr表达式,然后得到返回的结果,这里的结果是满足表达式的指标的集合。比如配置的规则为:
- cpu_usage>90
那么查出来的结果可能是
- cpu_usage{instance="192.168.0.11"}91
- cpu_usage{instance="192.168.0.12"}92
然后遍历查询到的结果,根据指标的标签生成一个hash值,然后判断这个hash值是否之前已经存在(即之前是否已经有相同的指标数据返回),如果是,则更新上次的value及annotations,如果不是,则创建一个新的alert并保存至该规则下的active alert列表中。然后遍历规则的active alert列表,根据规则的持续时长配置、alert的上次触发时间、alert的当前状态、本次查询alert是否依然存在等信息来修改alert的状态。具体规则如下:
如果alert之前存在,但本次执行时不存在
- 状态是StatePending或者本次检查时间距离上次触发时间超过15分钟(15分钟为写死的常量),则将该alert从active列表中删除
- 状态不为StateInactive的alert修改为StateInactive
如果alert之前存在并且本次执行仍然存在
- alert的状态是StatePending并且本次检查距离上次触发时间超过配置的for持续时长,那么状态修改为StateFiring
其余情况修改alert的状态为StatePending
上面那一步只是修改了alert的状态,但是并没有真正执行发送告警操作。下面才是真正要执行告警操作:
- //判断规则是否是alert规则,如果是则发送告警信息(具体是否真正发送由ar.sendAlerts中的逻辑判断)
- ifar,ok:=rule.(*AlertingRule);ok{
- ar.sendAlerts(ctx,ts,g.opts.ResendDelay,g.interval,g.opts.NotifyFunc)
- }
- //…….
- func(r*AlertingRule)sendAlerts(ctxcontext.Context,tstime.Time,resendDelaytime.Duration,intervaltime.Duration,notifyFuncNotifyFunc){
- alerts:=[]*Alert{}
- r.ForEachActiveAlert(func(alert*Alert){
- ifalert.needsSending(ts,resendDelay){
- alert.LastSentAt=ts
- //AllowfortwoEvalorAlertmanagersendfailures.
- delta:=resendDelay
- ifinterval>resendDelay{
- delta=interval
- }
- alert.ValidUntil=ts.Add(4*delta)
- anew:=*alert
- alerts=append(alerts,&anew)
- }
- })
- notifyFunc(ctx,r.vector.String(),alerts…)
- }
- func(a*Alert)needsSending(tstime.Time,resendDelaytime.Duration)bool{
- ifa.State==StatePending{
- returnfalse
- }
- //ifanalerthasbeenresolvedsincethelastsend,resendit
- ifa.ResolvedAt.After(a.LastSentAt){
- returntrue
- }
- returna.LastSentAt.Add(resendDelay).Before(ts)
- }
概括一下以上逻辑就是:
- 如果alert的状态是StatePending,则不发送告警
- 如果alert的已经被解决,那么再次发送告警标记该条信息已经被解决
- 如果当前时间距离上次发送告警的时间大于配置的重新发送延时时间(ResendDelay),则发送告警,否则不发送
以上就是prometheus的告警流程。学习这个流程主要是问了能够对prometheus的rules相关的做二次开发。我们可以修改LoadGroups()方法,让其可以动态侧加载定义在mysql中定义的规则,动态实现告警规则更新。
参考:
《深入浅出prometheus原理、应用、源码与拓展详解》
原文链接:https://mp.weixin.qq.com/s/fwHfKYCy_SKJzaiNy-zh7A