idou老師教你學Istio 27:解讀Mixer Report流程
1、概述

1、概述
Mixer是Istio的核心組件,提供了遙測數據收集的功能,能夠實時采集服務的請求狀態等信息,以達到監控服務狀態目的。
1.1 核心功能
?前置檢查(Check):某服務接收并響應外部請求前,先通過Envoy向Mixer(Policy組件)發送Check請求,做一些access檢查,同時確認adaptor所需cache字段,供之后Report接口使用;
?配額管理(Quota):通過配額管理機制,處理多請求時發生的資源競爭;
?遙測數據上報(Report):該服務請求處理結束后,將請求相關的日志,監控等數據,通過Envoy上報給Mixer(telemetry)
1.2 示例圖
2、代碼分析
2.1 Report代碼分析
本節主要介紹Report的詳細流程(基于Istio release1.0.0版本,commit id為3a136c90)。Report是mixer server的一個接口,供Envoy通過grpc調用。首先,我們從mixer server的啟動入口main函數看起:
func?main()?{ ???rootCmd?:=?cmd.GetRootCmd(os.Args[1:],?supportedTemplates(),?supportedAdapters(),?shared.Printf,?shared.Fatalf) ???if?err?:=?rootCmd.Execute();?err?!=?nil?{ ??????os.Exit(-1) ???} }
在rootCmd中,mixs通過server命令啟動了mixer server,從而觸發了runserver函數,在runserver中初始化(New)了一個server,我們一起看下newServer的函數,在這個函數中,與我們本節相關的內容就是Mixer初始化了一個grpc服務器NewGRPCServer。
rootCmd.AddCommand(serverCmd(info,?adapters,?printf,?fatalf)) func?serverCmd(info?map[string]template.Info,?adapters?[]adapter.InfoFn,?printf,?fatalf?shared.FormatFn)?*cobra.Command?{ ???sa?:=?server.DefaultArgs() ???sa.Templates?=?info ???sa.Adapters?=?adapters ???serverCmd?:=?&cobra.Command{ ??????Use:???"server", ??????Short:?"Starts?Mixer?as?a?server", ??????Run:?func(cmd?*cobra.Command,?args?[]string)?{ ?????????runServer(sa,?printf,?fatalf) ??????}, ???}…?… } func?newServer(a?*Args,?p?*patchTable)?(*Server,?error)?{ ???grpc.EnableTracing?=?a.EnableGRPCTracing ???s.server?=?grpc.NewServer(grpcOptions...) ???mixerpb.RegisterMixerServer(s.server,?api.NewGRPCServer(s.dispatcher,?s.gp,?s.checkCache)) }
rootC
在這個grpc的服務端中,定義了一個Report接口,這就是我們這節課主要關注的內容(可以看到Check接口也在此定義,我們下節再講)
func?(s?*grpcServer)?Report(ctx?context.Context,?req?*mixerpb.ReportRequest)?(*mixerpb.ReportResponse,?error)?{ ???lg.Debugf("Report?(Count:?%d)",?len(req.Attributes)) ???//?校驗attribute是否為空,空則直接return ???if?len(req.Attributes)?==?0?{ ??????return?reportResp,?nil ???} ???//?若屬性word為空,賦為默認值 ???for?i?:=?0;?i??0?{ ?????????if?err?:=?accumBag.UpdateBagFromProto(&req.Attributes[i],?s.globalWordList);?err?!=?nil?{ ????????????err?=?fmt.Errorf("request?could?not?be?processed?due?to?invalid?attributes:?%v",?err) ????????????span.LogFields(otlog.String("error",?err.Error())) ????????????span.Finish() ????????????errors?=?multierror.Append(errors,?err) ????????????break ?????????} ??????} ??????lg.Debug("Dispatching?Preprocess") ??????//?真正開始分發,預處理階段 ??????if?err?:=?s.dispatcher.Preprocess(newctx,?accumBag,?reportBag);?err?!=?nil?{ ?????????err?=?fmt.Errorf("preprocessing?attributes?failed:?%v",?err) ?????????span.LogFields(otlog.String("error",?err.Error())) ?????????span.Finish() ?????????errors?=?multierror.Append(errors,?err) ?????????continue ??????} ??????lg.Debug("Dispatching?to?main?adapters?after?running?preprocessors") ??????lg.Debuga("Attribute?Bag:?\n",?reportBag) ??????lg.Debugf("Dispatching?Report?%d?out?of?%d",?i+1,?len(req.Attributes)) ??????//?真正開始分發,將數據逐步加入到緩存中 ??????if?err?:=?reporter.Report(reportBag);?err?!=?nil?{ ?????????span.LogFields(otlog.String("error",?err.Error())) ?????????span.Finish() ?????????errors?=?multierror.Append(errors,?err) ?????????continue ??????} ??????span.Finish() ??????//?purge?the?effect?of?the?Preprocess?call?so?that?the?next?time?through?everything?is?clean ??????reportBag.Reset() ???} ???reportBag.Done() ???accumBag.Done() ???protoBag.Done() ???//?真正的發送函數,從緩存中取出并發送到adaptor ???if?err?:=?reporter.Flush();?err?!=?nil?{ ??????errors?=?multierror.Append(errors,?err) ???} ???reporter.Done() ???if?errors?!=?nil?{ ??????reportSpan.LogFields(otlog.String("error",?errors.Error())) ???} ???reportSpan.Finish() ???if?errors?!=?nil?{ ??????lg.Errora("Report?failed:",?errors.Error()) ??????return?nil,?grpc.Errorf(codes.Unknown,?errors.Error()) ???} ???//?過程結束 ???return?reportResp,?nil }
通過上述代碼解讀,我們了解了Report接口的工作流程,但此時我們還并不知道一個請求的狀態是如何報給adaptor的,下面我們通過簡要的函數串接,把這部分流程串起來:
上述的預處理階段Preprocess與上報階段Report,最終都會調用到dispatch函數,僅通過不同的type來區分要做的事情;
func?(d?*Impl)?Preprocess(ctx?context.Context,?bag?attribute.Bag,?responseBag?*attribute.MutableBag)?error?{ ???s?:=?d.getSession(ctx,?tpb.TEMPLATE_VARIETY_ATTRIBUTE_GENERATOR,?bag) ???s.responseBag?=?responseBag ???err?:=?s.dispatch() ???if?err?==?nil?{ ??????err?=?s.err ???} ???…?… } func?(r?*reporter)?Report(bag?attribute.Bag)?error?{ ???s?:=?r.impl.getSession(r.ctx,?tpb.TEMPLATE_VARIETY_REPORT,?bag) ???s.reportStates?=?r.states ???err?:=?s.dispatch() ???if?err?==?nil?{ ??????err?=?s.err ???} ???…?… }
而dispatch函數中做了真正的分發動作,包括:
1.遍歷所有adaptor,調用adaptor中的函數,針對不同的adaptor生成不同的instance,并將instance緩存放入reportstates
var?instance?interface{} if?instance,?err?=?input.Builder(s.bag);?err?!=?nil?{ ???log.Errorf("error?creating?instance:?destination='%v',?error='%v'",?destination.FriendlyName,?err) ???s.err?=?multierror.Append(s.err,?err) ???continue } type?NamedBuilder?struct?{ ???InstanceShortName?string ???Builder???????????template.InstanceBuilderFn } InstanceBuilderFn?func(attrs?attribute.Bag)?(interface{},?error) CreateInstanceBuilder:?func(instanceName?string,?param?proto.Message,?expb?*compiled.ExpressionBuilder)?(template.InstanceBuilderFn,?error) builder.build(attr) //?For?report?templates,?accumulate?instances?as?much?as?possible?before?commencing?dispatch. if?s.variety?==?tpb.TEMPLATE_VARIETY_REPORT?{ ???state.instances?=?append(state.instances,?instance) ???continue }
2.將instance分發到所有adaptor,最終調用并分發到adaptor的HandleMetric函數中
func?(r?*reporter)?Flush()?error?{ ???s?:=?r.impl.getSession(r.ctx,?tpb.TEMPLATE_VARIETY_REPORT,?nil) ???s.reportStates?=?r.states ???s.dispatchBufferedReports() ???err?:=?s.err ???…?… } func?(s?*session)?dispatchBufferedReports()?{ ???//?Ensure?that?we?can?run?dispatches?to?all?destinations?in?parallel. ???s.ensureParallelism(len(s.reportStates)) ???//?dispatch?the?buffered?dispatchStates?we've?got ???for?k,?v?:=?range?s.reportStates?{ ??????s.dispatchToHandler(v) ??????delete(s.reportStates,?k) ???} ???s.waitForDispatched() } func?(s?*session)?dispatchToHandler(ds?*dispatchState)?{ ???s.activeDispatches++ ???ds.session?=?s ???s.impl.gp.ScheduleWork(ds.invokeHandler,?nil) } case?tpb.TEMPLATE_VARIETY_REPORT: ???ds.err?=?ds.destination.Template.DispatchReport( ??????ctx,?ds.destination.Handler,?ds.instances) type?TemplateInfo?struct?{ ???Name?????????????string ???Variety??????????tpb.TemplateVariety ???DispatchReport???template.DispatchReportFn ???DispatchCheck????template.DispatchCheckFn ???DispatchQuota????template.DispatchQuotaFn ???DispatchGenAttrs?template.DispatchGenerateAttributesFn } DispatchReport:?func(ctx?context.Context,?handler?adapter.Handler,?inst?[]interface{})?error?{ ???//?Convert?the?instances?from?the?generic?[]interface{},?to?their?specialized?type. ???instances?:=?make([]*metric.Instance,?len(inst)) ???for?i,?instance?:=?range?inst?{ ??????instances[i]?=?instance.(*metric.Instance) ???} ???//?Invoke?the?handler. ???if?err?:=?handler.(metric.Handler).HandleMetric(ctx,?instances);?err?!=?nil?{ ??????return?fmt.Errorf("failed?to?report?all?values:?%v",?err) ???} ???return?nil }
2.2 相關結構體定義
Report接口請求體定義
//?Used?to?report?telemetry?after?performing?one?or?more?actions. type?ReportRequest?struct?{ ???//?代表一個請求中的屬性 ???//?每個attribute代表一個請求動作,多個動作可匯總在一條message中以提高效率 ???//雖然每個“屬性”消息在語義上被視為與消息中的其他屬性無關的獨立獨立實體,但此消息格式利用屬性消息之間的增量編碼,以便大幅減少請求大小并改進端到端?效率。?每組單獨的屬性用于修改前一組。?這消除了在單個請求中多次冗余地發送相同屬性的需要。 ???//?如果客戶端上報時不想使用增量編碼,可全量的發送所有屬性. ???Attributes?[]CompressedAttributes?`protobuf:"bytes,1,rep,name=attributes"?json:"attributes"` ???//?所有屬性的默認消息級字典. ???//?這使得可以為該請求中的所有屬性共享相同的字典,這可以大大減少整體請求大小 ???DefaultWords?[]string?`protobuf:"bytes,2,rep,name=default_words,json=defaultWords"?json:"default_words,omitempty"` ???//?全局字典的詞條數,可檢測客戶端與服務端之間的全局字典是否同步 ???GlobalWordCount?uint32?`protobuf:"varint,3,opt,name=global_word_count,json=globalWordCount,proto3"?json:"global_word_count,omitempty"` }
3、總結
Mixer中涉及很多緩存命中等用于優化性能的設計,本文僅介紹了Mixer中Report接口發送到adaptor的過程,一些性能優化設計,如protobag,dispatch緩存等內容,將會在后續文章中解析。
相關服務請訪問https://support.huaweicloud.com/cce/index.html?cce_helpcenter_2019
Istio
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。