当前位置: 首页 > news >正文

徐州丰县建设局网站品牌网站建设 飞沐

徐州丰县建设局网站,品牌网站建设 飞沐,乌兰县wap网站建设公司,wordpress主题zip前言 最近做项目#xff0c;还是K8S的插件监听器#xff08;理论上插件都是通过API-server通信#xff09;#xff0c;官方的不同写法居然都能出现争议#xff0c;争议点就是对API-Server的请求的耗时#xff0c;说是会影响API-Server。实际上通过源码分析两着有差别还是K8S的插件监听器理论上插件都是通过API-server通信官方的不同写法居然都能出现争议争议点就是对API-Server的请求的耗时说是会影响API-Server。实际上通过源码分析两着有差别但是差别不大对API-Server的影响几乎一样。 老式写法 package mainimport (controller/controlv1 k8s.io/api/core/v1k8s.io/apimachinery/pkg/fieldsk8s.io/client-go/kubernetesk8s.io/client-go/tools/cachek8s.io/client-go/tools/clientcmdk8s.io/client-go/util/workqueuek8s.io/klog/v2 )func main() {// 读取构建 configconfig, err : clientcmd.BuildConfigFromFlags(, xxx/config)if err ! nil {klog.Fatal(err)}// 创建 k8s clientclientSet, err : kubernetes.NewForConfig(config)if err ! nil {klog.Fatal(err)}// 指定 ListWatcher 在所有namespace下监听 pod 资源podListWatcher : cache.NewListWatchFromClient(clientSet.CoreV1().RESTClient(), pods, v1.NamespaceAll, fields.Everything())// 创建 workqueuequeue : workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())// 创建 indexer 和 informerindexer, informer : cache.NewIndexerInformer(podListWatcher, v1.Pod{}, 0, cache.ResourceEventHandlerFuncs{// 当有 pod 创建时根据 Delta queue 弹出的 object 生成对应的Key并加入到 workqueue中。此处可以根据Object的一些属性进行过滤AddFunc: func(obj interface{}) {key, err : cache.MetaNamespaceKeyFunc(obj)if err nil {queue.Add(key)}},UpdateFunc: func(obj, newObj interface{}) {key, err : cache.MetaNamespaceKeyFunc(newObj)if err nil {queue.Add(key)}},// pod 删除操作DeleteFunc: func(obj interface{}) {// DeletionHandlingMetaNamespaceKeyFunc 会在生成key 之前检查。因为资源删除后有可能会进行重建等操作监听时错过了删除信息从而导致该条记录是陈旧的。key, err : cache.DeletionHandlingMetaNamespaceKeyFunc(obj)if err nil {queue.Add(key)}},}, cache.Indexers{})controller : control.NewController(queue, indexer, informer)stop : make(chan struct{})defer close(stop)// 启动 controlgo controller.Run(1, stop)select {} }然后写个Controller代码 package controlimport (fmtv1 k8s.io/api/core/v1k8s.io/apimachinery/pkg/util/runtimek8s.io/apimachinery/pkg/util/waitk8s.io/client-go/tools/cachek8s.io/client-go/util/workqueuek8s.io/klog/v2time )type Controller struct {indexer cache.Indexer // Indexer 的引用queue workqueue.RateLimitingInterface //workqueue 的引用informer cache.Controller // Informer 的引用 }func NewController(queue workqueue.RateLimitingInterface, indexer cache.Indexer, informer cache.Controller) *Controller {return Controller{indexer: indexer,queue: queue,informer: informer,} }func (c *Controller) Run(threadiness int, stopCh chan struct{}) {defer runtime.HandleCrash()defer c.queue.ShutDown()klog.Info(Starting pod control)go c.informer.Run(stopCh) // 启动 informerif !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) {runtime.HandleError(fmt.Errorf(time out waitng for caches to sync))return}// 启动多个 worker 处理 workqueue 中的对象for i : 0; i threadiness; i {go wait.Until(c.runWorker, time.Second, stopCh)}-stopChklog.Info(Stopping Pod control) }func (c *Controller) runWorker() {// 启动无限循环接收并处理消息for c.processNextItem() {} }// 从 workqueue 中获取对象并打印信息。 func (c *Controller) processNextItem() bool {key, shutdown : c.queue.Get()// 退出if shutdown {return false}// 标记此key已经处理defer c.queue.Done(key)// 将key对应的 object 的信息进行打印err : c.syncToStdout(key.(string))c.handleError(err, key)return true }// 获取 key 对应的 object并打印相关信息 func (c *Controller) syncToStdout(key string) error {obj, exists, err : c.indexer.GetByKey(key)if err ! nil {klog.Errorf(Fetching object with key %s from store failed with %v, key, err)return err}if !exists {fmt.Printf(Pod %s does not exist\n, obj.(*v1.Pod).GetName())} else {fmt.Printf(Sync/Add/Update for Pod %s\n, obj.(*v1.Pod).GetName())}return nil }func (c *Controller) handleError(err error, key interface{}) {}这总写法的好处是自己处理各个环节Informer和indexer那个queue仅仅是队列从cache缓存取数据用的实际看看创建过程 创建lw的过程 cache.NewListWatchFromClient // NewListWatchFromClient creates a new ListWatch from the specified client, resource, namespace and field selector. func NewListWatchFromClient(c Getter, resource string, namespace string, fieldSelector fields.Selector) *ListWatch {optionsModifier : func(options *metav1.ListOptions) {options.FieldSelector fieldSelector.String()}return NewFilteredListWatchFromClient(c, resource, namespace, optionsModifier) }// NewFilteredListWatchFromClient creates a new ListWatch from the specified client, resource, namespace, and option modifier. // Option modifier is a function takes a ListOptions and modifies the consumed ListOptions. Provide customized modifier function // to apply modification to ListOptions with a field selector, a label selector, or any other desired options. func NewFilteredListWatchFromClient(c Getter, resource string, namespace string, optionsModifier func(options *metav1.ListOptions)) *ListWatch {listFunc : func(options metav1.ListOptions) (runtime.Object, error) {optionsModifier(options)return c.Get().Namespace(namespace).Resource(resource).VersionedParams(options, metav1.ParameterCodec).Do(context.TODO()).Get()}watchFunc : func(options metav1.ListOptions) (watch.Interface, error) {options.Watch trueoptionsModifier(options)return c.Get().Namespace(namespace).Resource(resource).VersionedParams(options, metav1.ParameterCodec).Watch(context.TODO())}return ListWatch{ListFunc: listFunc, WatchFunc: watchFunc} } ListAndWatch方法函数指针关键是List和Watch的函数跟新的写法有些许区别 创建Informer 此处默认使用DeletionHandlingMetaNamespaceKeyFunc函数创建key func NewIndexerInformer(lw ListerWatcher,objType runtime.Object,resyncPeriod time.Duration,h ResourceEventHandler,indexers Indexers, ) (Indexer, Controller) {// This will hold the client state, as we know it.clientState : NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers)return clientState, newInformer(lw, objType, resyncPeriod, h, clientState, nil) }func newInformer(lw ListerWatcher,objType runtime.Object,resyncPeriod time.Duration,h ResourceEventHandler,clientState Store,transformer TransformFunc, ) Controller {// This will hold incoming changes. Note how we pass clientState in as a// KeyLister, that way resync operations will result in the correct set// of update/delete deltas.fifo : NewDeltaFIFOWithOptions(DeltaFIFOOptions{KnownObjects: clientState,EmitDeltaTypeReplaced: true,})cfg : Config{Queue: fifo,ListerWatcher: lw,ObjectType: objType,FullResyncPeriod: resyncPeriod,RetryOnError: false,Process: func(obj interface{}) error {if deltas, ok : obj.(Deltas); ok {return processDeltas(h, clientState, transformer, deltas)}return errors.New(object given as Process argument is not Deltas)},}return New(cfg) }func New(c *Config) Controller {ctlr : controller{config: *c,clock: clock.RealClock{},}return ctlr } 这里注意消费delta队列的过程 这里是没有加锁的即Process函数指针 另外实际上还是创建controller内置结构体也是client-go创建的。 新式写法 config, err : clientcmd.BuildConfigFromFlags(, ~/.kube/config)//注意路径if err ! nil {log.Fatal(err)}//这2行是抓包的时候使用日常是不需要的config.TLSClientConfig.CAData nilconfig.TLSClientConfig.Insecure trueclientSet, err : kubernetes.NewForConfig(config)if err ! nil {log.Fatal(err)}//这里可以调一些参数defaultResync很关键factory : informers.NewSharedInformerFactoryWithOptions(clientSet, 0, informers.WithNamespace(default))informer : factory.Core().V1().Pods().Informer()//获取pod的informer实际上使用client-go的api很多informer都创建了直接拿过来用避免使用的时候重复创建informer.AddEventHandler(xxx) //事件处理是一个回调hookstopper : make(chan struct{}, 1)go informer.Run(stopper)log.Println(----- list and watch pod starting...)sigs : make(chan os.Signal, 1)signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)-sigsclose(stopper)log.Println(main stopped...) 实际上就是很多过程封装了比如创建Controller的过程 lw的创建过程 func NewFilteredPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {return cache.NewSharedIndexInformer(cache.ListWatch{ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {if tweakListOptions ! nil {tweakListOptions(options)}return client.CoreV1().Pods(namespace).List(context.TODO(), options)},WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {if tweakListOptions ! nil {tweakListOptions(options)}return client.CoreV1().Pods(namespace).Watch(context.TODO(), options)},},corev1.Pod{},resyncPeriod,indexers,) } 实际上实现是有pod实现的List最后取结果略有区别 // List takes label and field selectors, and returns the list of Pods that match those selectors. func (c *pods) List(ctx context.Context, opts metav1.ListOptions) (result *v1.PodList, err error) {var timeout time.Durationif opts.TimeoutSeconds ! nil {timeout time.Duration(*opts.TimeoutSeconds) * time.Second}result v1.PodList{}err c.client.Get().Namespace(c.ns).Resource(pods).VersionedParams(opts, scheme.ParameterCodec).Timeout(timeout).Do(ctx).Into(result)return }// Watch returns a watch.Interface that watches the requested pods. func (c *pods) Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) {var timeout time.Durationif opts.TimeoutSeconds ! nil {timeout time.Duration(*opts.TimeoutSeconds) * time.Second}opts.Watch truereturn c.client.Get().Namespace(c.ns).Resource(pods).VersionedParams(opts, scheme.ParameterCodec).Timeout(timeout).Watch(ctx) } 最关键的一点超时老式写法是没有超时设置的超时的重要性不言而喻推荐使用新写法 indexer的创建 默认使用MetaNamespaceIndexFunc函数创建key func (f *podInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {return NewFilteredPodInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions) } 创建Informer的同时创建indexer func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {realClock : clock.RealClock{}sharedIndexInformer : sharedIndexInformer{processor: sharedProcessor{clock: realClock},indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),listerWatcher: lw,objectType: exampleObject,resyncCheckPeriod: defaultEventHandlerResyncPeriod,defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf(%T, exampleObject)),clock: realClock,}return sharedIndexInformer }// NewIndexer returns an Indexer implemented simply with a map and a lock. func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {return cache{cacheStorage: NewThreadSafeStore(indexers, Indices{}),keyFunc: keyFunc,} } 除了创建key的函数不同其他一模一样 但是解析delta队列确加了锁 func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {s.blockDeltas.Lock()defer s.blockDeltas.Unlock()if deltas, ok : obj.(Deltas); ok {return processDeltas(s, s.indexer, s.transform, deltas)}return errors.New(object given as Process argument is not Deltas) } 实际上http请求而言http response关闭后http的访问就结束了本地加锁仅仅会影响本地的执行效率api-server无影响 根源 从代码分析两种写法没有区别对API-Server造成的影响仅仅是Http response的解析老式写法解析后直接返回新式写法的意思是创建结构体然后结构体去处理值并带上了超时时间。 那么为什么API-Server觉得一次请求时间很长呢比如List的过程Watch是长轮询不涉及请求时长根源在于API-Server在低版本测试版本1.20.x分页参数会失效。笔者自己尝试的1.25.4分页是有效的。估计是中间某次提交修复了笔者在github看到很多关于List的提交优化 还有 1.25.4的API-Server的List过程 func ListResource(r rest.Lister, rw rest.Watcher, scope *RequestScope, forceWatch bool, minRequestTimeout time.Duration) http.HandlerFunc {return func(w http.ResponseWriter, req *http.Request) {// For performance tracking purposes. 创建埋点trace : utiltrace.New(List, traceFields(req)...)namespace, err : scope.Namer.Namespace(req)if err ! nil {scope.err(err, w, req)return}// Watches for single objects are routed to this function.// Treat a name parameter the same as a field selector entry.hasName : true_, name, err : scope.Namer.Name(req)if err ! nil {hasName false}ctx : req.Context()ctx request.WithNamespace(ctx, namespace)outputMediaType, _, err : negotiation.NegotiateOutputMediaType(req, scope.Serializer, scope)if err ! nil {scope.err(err, w, req)return}opts : metainternalversion.ListOptions{}if err : metainternalversionscheme.ParameterCodec.DecodeParameters(req.URL.Query(), scope.MetaGroupVersion, opts); err ! nil {err errors.NewBadRequest(err.Error())scope.err(err, w, req)return}if errs : metainternalversionvalidation.ValidateListOptions(opts); len(errs) 0 {err : errors.NewInvalid(schema.GroupKind{Group: metav1.GroupName, Kind: ListOptions}, , errs)scope.err(err, w, req)return}// transform fields// TODO: DecodeParametersInto should do this.if opts.FieldSelector ! nil {fn : func(label, value string) (newLabel, newValue string, err error) {return scope.Convertor.ConvertFieldLabel(scope.Kind, label, value)}if opts.FieldSelector, err opts.FieldSelector.Transform(fn); err ! nil {// TODO: allow bad request to set field causes based on query parameterserr errors.NewBadRequest(err.Error())scope.err(err, w, req)return}}if hasName {// metadata.name is the canonical internal name.// SelectionPredicate will notice that this is a request for// a single object and optimize the storage query accordingly.nameSelector : fields.OneTermEqualSelector(metadata.name, name)// Note that fieldSelector setting explicitly the metadata.name// will result in reaching this branch (as the value of that field// is propagated to requestInfo as the name parameter.// That said, the allowed field selectors in this branch are:// nil, fields.Everything and field selector matching metadata.name// for our name.if opts.FieldSelector ! nil !opts.FieldSelector.Empty() {selectedName, ok : opts.FieldSelector.RequiresExactMatch(metadata.name)if !ok || name ! selectedName {scope.err(errors.NewBadRequest(fieldSelector metadata.name doesnt match requested name), w, req)return}} else {opts.FieldSelector nameSelector}}if opts.Watch || forceWatch {if rw nil {scope.err(errors.NewMethodNotSupported(scope.Resource.GroupResource(), watch), w, req)return}// TODO: Currently we explicitly ignore ?timeout and use only ?timeoutSeconds.timeout : time.Duration(0)if opts.TimeoutSeconds ! nil {timeout time.Duration(*opts.TimeoutSeconds) * time.Second}if timeout 0 minRequestTimeout 0 {timeout time.Duration(float64(minRequestTimeout) * (rand.Float64() 1.0))}klog.V(3).InfoS(Starting watch, path, req.URL.Path, resourceVersion, opts.ResourceVersion, labels, opts.LabelSelector, fields, opts.FieldSelector, timeout, timeout)ctx, cancel : context.WithTimeout(ctx, timeout)defer cancel()watcher, err : rw.Watch(ctx, opts)if err ! nil {scope.err(err, w, req)return}requestInfo, _ : request.RequestInfoFrom(ctx)metrics.RecordLongRunning(req, requestInfo, metrics.APIServerComponent, func() {serveWatch(watcher, scope, outputMediaType, req, w, timeout)})return}// Log only long List requests (ignore Watch).defer trace.LogIfLong(500 * time.Millisecond) //超过500ms就埋点打印日志这个埋点非常好用建议使用trace.Step(About to List from storage)result, err : r.List(ctx, opts) //API-Server实际上也是去ETCD取数据if err ! nil {scope.err(err, w, req)return}trace.Step(Listing from storage done)defer trace.Step(Writing http response done, utiltrace.Field{count, meta.LenList(result)})transformResponseObject(ctx, scope, trace, req, w, http.StatusOK, outputMediaType, result)} 可以看出超过500毫秒就会打印数据笔者测试差不多500个pod的List就是差不多500毫秒少一点Client-Go设计默认分页参数就是500条精确设计。 // GetList implements storage.Interface. func (s *store) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {preparedKey, err : s.prepareKey(key)if err ! nil {return err}recursive : opts.RecursiveresourceVersion : opts.ResourceVersionmatch : opts.ResourceVersionMatchpred : opts.Predicatetrace : utiltrace.New(fmt.Sprintf(List(recursive%v) etcd3, recursive),utiltrace.Field{audit-id, endpointsrequest.GetAuditIDTruncated(ctx)},utiltrace.Field{key, key},utiltrace.Field{resourceVersion, resourceVersion},utiltrace.Field{resourceVersionMatch, match},utiltrace.Field{limit, pred.Limit},utiltrace.Field{continue, pred.Continue})defer trace.LogIfLong(500 * time.Millisecond)listPtr, err : meta.GetItemsPtr(listObj)if err ! nil {return err}v, err : conversion.EnforcePtr(listPtr)if err ! nil || v.Kind() ! reflect.Slice {return fmt.Errorf(need ptr to slice: %v, err)} 去读取ETCD3的数据可以试试把k8s的低版本安装上debug试试。分析limit失效的原因笔者是高版本的K8S是已经修复版本。自定义的埋点List的代码 package mainimport (contextfmtv1 k8s.io/api/core/v1k8s.io/apimachinery/pkg/api/metametav1 k8s.io/apimachinery/pkg/apis/meta/v1k8s.io/apimachinery/pkg/fieldsk8s.io/apimachinery/pkg/runtimek8s.io/apimachinery/pkg/watchk8s.io/client-go/kubernetesk8s.io/client-go/tools/cachek8s.io/client-go/tools/pagerk8s.io/utils/tracetime )func TimeNewFilteredPodInformer(client *kubernetes.Clientset) error {options : metav1.ListOptions{ResourceVersion: 0}initTrace : trace.New(Reflector ListAndWatch, trace.Field{Key: name, Value: r.name})defer initTrace.LogIfLong(1 * time.Millisecond)var list runtime.Objectvar paginatedResult boolvar err errorlistCh : make(chan struct{}, 1)panicCh : make(chan interface{}, 1)go func() {defer func() {if r : recover(); r ! nil {panicCh - r}}()// Attempt to gather list in chunks, if supported by listerWatcher, if not, the first// list request will return the full response.pager : pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {lw : cache.ListWatch{ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {return client.CoreV1().Pods(v1.NamespaceAll).List(context.TODO(), options)},WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {return client.CoreV1().Pods(v1.NamespaceAll).Watch(context.TODO(), options)},}return lw.List(opts)}))list, paginatedResult, err pager.List(context.Background(), options)initTrace.Step(Objects listed: )fmt.Println(list END, is pager , paginatedResult)if err ! nil {fmt.Println(error is : , err.Error())}close(listCh)}()select {case r : -panicCh:panic(r)case -listCh:}initTrace.Step(Resource version extracted)items, err : meta.ExtractList(list)fmt.Println(list items size is : , len(items))if err ! nil {return fmt.Errorf(unable to understand list result %#v (%v), list, err)}initTrace.Step(Objects extracted)return nil }func TimeNewIndexerInformer(client *kubernetes.Clientset) error {options : metav1.ListOptions{ResourceVersion: 0}initTrace : trace.New(Reflector ListAndWatch, trace.Field{Key: name, Value: r.name})defer initTrace.LogIfLong(1 * time.Millisecond)var list runtime.Objectvar paginatedResult boolvar err errorlistCh : make(chan struct{}, 1)panicCh : make(chan interface{}, 1)go func() {defer func() {if r : recover(); r ! nil {panicCh - r}}()// Attempt to gather list in chunks, if supported by listerWatcher, if not, the first// list request will return the full response.pager : pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {lw : cache.NewListWatchFromClient(client.CoreV1().RESTClient(), pods, v1.NamespaceAll, fields.Everything())return lw.List(opts)}))list, paginatedResult, err pager.List(context.Background(), options)initTrace.Step(Objects listed: )fmt.Println(list END, is pager , paginatedResult)if err ! nil {fmt.Println(error is : , err.Error())}close(listCh)}()select {case r : -panicCh:panic(r)case -listCh:}initTrace.Step(Resource version extracted)items, err : meta.ExtractList(list)fmt.Println(list items size is : , len(items))if err ! nil {return fmt.Errorf(unable to understand list result %#v (%v), list, err)}initTrace.Step(Objects extracted)return nil }trace的包好用这里使用的k8s的包实际上sdk基础包也有相似的功能。 func (t *Trace) durationIsWithinThreshold() bool {if t.endTime nil { // we dont assume incomplete traces meet the thresholdreturn false}return t.threshold nil || *t.threshold 0 || t.endTime.Sub(t.startTime) *t.threshold } 总结 知其然知其所以然要想知道为什么分页不生效需要自定义API-Server debug才行看代码很难看出原因因为K8S实际上估计设计的时候也考虑过这个。
http://www.dnsts.com.cn/news/88338.html

相关文章:

  • 做网站需要的相关知识自动关联已发布文章wordpress
  • 做一个自己的网站网上商城建设多少钱
  • 凡科网站制作教程自贡网站开发
  • 开锁在百度上做网站要钱吗wordpress wdlog主题
  • 吴江网站建设收费做网站选择系统
  • 地方网站 o2o娄底网站建设企业
  • wordpress后台左上角盐城seo 优化
  • 网站推广专业外贸建站seo
  • 网站跳转是什么意思途牛企业网站建设方案
  • 谷歌seo收费十堰seo按天计费
  • 企业网站设计哪个好网站优化 福州
  • 买的网站模板会影响提高网站建设水平意见方案
  • 爱站网 关键词挖掘工具站中山市网站开发
  • 网站的动画广告横幅怎么做的软件技术开发合同
  • 大连网站运营wordpress评论框高度
  • 医院建设网站意义公司网页设计费计入什么科目
  • 公司网站建设合同交印花税吗宁波网站建设计
  • 运城建设局网站网页界面设计一般使用的分辨率
  • 网站建设788gg中国免费网站服务器下载
  • 网站建设后期修改江苏省交通工程建设局网站
  • 哪个网站可以做英文兼职为网站做电影花絮
  • 钓鱼网站制作利鑫做彩票网站
  • 做外贸的物流网站有哪些如何创办一个赚钱的网站
  • 湛江建设局网站如何购买大量客户电话号码
  • 做色流网站服务器项目总结
  • 永久免费网站建设方案书签制作过程
  • 个人建设网站服务器怎么解决方案seo费用价格
  • 本溪做网站手机app应用制作
  • 开发网站类型申报城市维护建设税上哪个网站
  • 温州专业全网推广建站公司亚马逊网络营销方式