小型静态网站是什么原因,分销系统源代码,搭建平台的另一种说法,南充网站建设迅达网络event 事件记录初始化 
一般在控制器都会有如下的初始化函数#xff0c;初始化 event 记录器等参数 
1. 创建 EventBroadcaster 
record.NewBroadcaster(): 创建事件广播器#xff0c;用于记录和分发事件。StartLogging(klog.Infof): 将事件以日志的形式输出。StartRecording…event 事件记录初始化 
一般在控制器都会有如下的初始化函数初始化 event 记录器等参数 
1. 创建 EventBroadcaster 
record.NewBroadcaster(): 创建事件广播器用于记录和分发事件。StartLogging(klog.Infof): 将事件以日志的形式输出。StartRecordingToSink: 将事件发送到 Kubernetes API Server存储为 Event 资源。 
2. 创建 EventRecorder 
NewRecorder(scheme, source)从广播器中创建事件记录器。 scheme: 用于验证和序列化资源。source: 指定事件的来源如 example-controller。  
import k8s.io/client-go/tools/recordfunc (c *controller) Initialize(opt *framework.ControllerOption) error {// ...// 1. 创建事件广播器 eventBroadcastereventBroadcaster : record.NewBroadcaster()// 将 event 记录到 logeventBroadcaster.StartLogging(klog.Infof)// 将 event 记录到 apiserver// c.kubeClient.CoreV1().Events() 这个是创建一个可以操作任意 ns 下 event 的 clienteventBroadcaster.StartRecordingToSink(corev1.EventSinkImpl{Interface: c.kubeClient.CoreV1().Events()})// 2. 基于事件广播器 创建事件记录器 Recorderc.recorder  eventBroadcaster.NewRecorder(versionedscheme.Scheme, v1.EventSource{Component: example-controller})
}// 事件的记录
const Create-Reason  PodCreate
func (c *controller)Controller_Do_Something(pod *corev1.Pod){newPod: pod.DeepCopy()// 生成个 event并记录// 内容为 newPod 创建成功event等级为 NormalReason 是 PodCreateMessage 是 Create Pod succeed// 之后 Recorder 内的 eventBroadcaster 会将此 event 广播出去然后 eventBroadcaster 之前注册的日志记录和event存储逻辑会执行// 日志记录逻辑会通过 klog.Infof 将此 event 打印出来// event存储逻辑会将此 event 存储到 apiserverc.recorder.Event(newPod, v1.EventTypeNormal, Create-Reason, Create Pod succeed)
}源码解析 0- 总体逻辑设计 控制中心 Broadcaster  eventBroadcaster : record.NewBroadcaster() 创建一个公共数据源或理解为总控中心也可以称之为控制器但不是k8s 控制器 返回的是eventBroadcasterImpl 结构体其封装了Broadcaster结构体因此 eventBroadcasterImpl 结构体的字段很丰富  Broadcaster 中的字段主要记录处理 event 的监听者watchers以及分发的控制等 eventBroadcaster.StartLogging(klog.Infof) 就是一个 watchereventBroadcaster.StartRecordingToSink(corev1.EventSinkImpl{Interface: c.kubeClient.CoreV1().Events()}) 也是个 watcher这些 watcher 都会被记录到Broadcaster结构体的watchers map[int64]*broadcasterWatcher 的map 中  eventBroadcasterImpl 在 Broadcaster 基础上增加少量配置参数和控制函数   Event 分发和 watcher 处理逻辑 eventBroadcaster : record.NewBroadcaster() 执行过程中会调用 watch.NewLongQueueBroadcaster(maxQueuedEvents, watch.DropIfChannelFull) 函数其会开启 event 分发逻辑go m.loop() go m.loop() 用于处理 event 的分发读取eventBroadcasterImpl 中 incoming chan Event  通道传来的 event分发给各个 watcher 的 result channel incoming 中的 event 是由 Recorder 写入的Recorder.Event 会生成个 event 并发送到incomimg channel 中go m.loop()函数会读取incoming通道中的 Event发送个各个watcher, 然后各个watcher执行自己的逻辑如记录为 info级别日志、或写入apiserver等 同时为了避免主进程的结束导致go m.loop()进程结束NewLongQueueBroadcaster 还利用distributing sync.WaitGroup变量进行 m.distributing.Add(1)让主进程等待避免主进程快速结束导致 loop 进程结束 StartLogging 或 StartRecordingToSink 函数会调用 StartEventWatcher 函数 StartEventWatcher 函数将传入的参数变为一个 event处理函数 eventHandler, StartEventWatcher 函数同时会开启一个 go 协程读取各自 watcher result channel 中的 event之后用eventHandler进行处理如记录为 info级别日志、或写入apiserver等  Event 产生逻辑  Recorder 是由 eventBroadcaster.NewRecorder 创建出来的相当于对eventBroadcasterImpl中Broadcaster 的封装  Recorder.Event 会生成个 event 并发送到incomimg channel 中  Recorder 会利用Broadcaster的incoming channel 写入 event  Recorder 会利用Broadcaster的incomingBlock控制写入时的并发避免同一时间写入 event 过多导致错乱这部分逻辑在blockQueue 函数中    
1- 控制中心的创建 —— NewBroadcaster 函数 
创建的eventBroadcaster 实际上就是创建一个 eventBroadcasterImpl 结构体并传入一些配置参数进行初始化注意 eventBroadcasterImpl封装了Broadcaster结构体 注意Broadcaster中有很多channel 、watchers和分发相关控制、并发控制字段等 eventBroadcaster.StartLogging(klog.Infof) 就是一个 watchereventBroadcaster.StartRecordingToSink(corev1.EventSinkImpl{Interface: c.kubeClient.CoreV1().Events()}) 也是个 watcher这些 watcher 都会被记录到watchers map[int64]*broadcasterWatcher 的map 中 基于eventBroadcaster 创建的Recorder实际上级就是对eventBroadcasterImpl结构体的封装之后Recorder创建 event 时会传入到eventBroadcasterImpl 内 Broadcaster  
// 路径 mod/k8s.io/client-gov0.29.0/tools/record/event.go
const maxQueuedEvents  1000
type FullChannelBehavior int
const (WaitIfChannelFull FullChannelBehavior  iotaDropIfChannelFull
)// Creates a new event broadcaster.
func NewBroadcaster(opts ...BroadcasterOption) EventBroadcaster {c : config{sleepDuration: defaultSleepDuration,}for _, opt : range opts {opt(c)}// 重点关注eventBroadcaster : eventBroadcasterImpl{Broadcaster:   watch.NewLongQueueBroadcaster(maxQueuedEvents, watch.DropIfChannelFull),sleepDuration: c.sleepDuration,options:       c.CorrelatorOptions,}ctx : c.Contextif ctx  nil {ctx  context.Background()} else {// Calling Shutdown is not required when a context was provided:// when the context is canceled, this goroutine will shut down// the broadcaster.go func() {-ctx.Done()eventBroadcaster.Broadcaster.Shutdown()}()}eventBroadcaster.cancelationCtx, eventBroadcaster.cancel  context.WithCancel(ctx)// 重点关注return eventBroadcaster
}// 路径 mod/k8s.io/apimachineryv0.29.0/pkg/watch/mux.go
// NewLongQueueBroadcaster functions nearly identically to NewBroadcaster,
// except that the incoming queue is the same size as the outgoing queues
// (specified by queueLength).
func NewLongQueueBroadcaster(queueLength int, fullChannelBehavior FullChannelBehavior) *Broadcaster {m : Broadcaster{watchers:            map[int64]*broadcasterWatcher{},incoming:            make(chan Event, queueLength),stopped:             make(chan struct{}),watchQueueLength:    queueLength,fullChannelBehavior: fullChannelBehavior,}m.distributing.Add(1)	// distributing sync.WaitGroup, 1 个进程go m.loop()  					// loop 进程很关键! 处理 event 的分发分发给 watcher 处理		return m
}1.1- eventBroadcasterImpl 结构体 
// 路径 mod/k8s.io/client-gov0.29.0/tools/record/event.go
type eventBroadcasterImpl struct {*watch.Broadcaster  // 此处引用下面的结构体sleepDuration  time.Durationoptions        CorrelatorOptionscancelationCtx context.Contextcancel         func()
}// 路径 /mod/k8s.io/apimachineryv0.29.0/pkg/watch/mux.go
// Broadcaster distributes event notifications among any number of watchers. Every event
// is delivered to every watcher.
type Broadcaster struct {watchers     map[int64]*broadcasterWatcher  // map 结构  id 和 watcher 的映射nextWatcher  int64													// 下一个 watcher 该分配的 iddistributing sync.WaitGroup									// 用于保证分发函数 loop 正常运行避免主函数停止导致 loop 函数停止// incomingBlock allows us to ensure we dont race and end up sending events// to a closed channel following a broadcaster shutdown.incomingBlock sync.Mutex										// 避免接收 event 时event 过多导致的并发因此需要锁进行控制incoming      chan Event										// 承接生成的 event其他 watcher 会从此 channel 中读取 event 进行记录到 apiserver 或日志打印等stopped       chan struct{}									// 承接关闭广播器 Broadcaster 的停止信号// How large to make watchers channel.watchQueueLength int// If one of the watch channels is full, dont wait for it to become empty.// Instead just deliver it to the watchers that do have space in their// channels and move on to the next event.// Its more fair to do this on a per-watcher basis than to do it on the// incoming channel, which would allow one slow watcher to prevent all// other watchers from getting new events.fullChannelBehavior FullChannelBehavior
}1.2- EventBroadcaster 接口 
// 路径 mod/k8s.io/client-gov0.29.0/tools/record/event.go
// EventBroadcaster knows how to receive events and send them to any EventSink, watcher, or log.
type EventBroadcaster interface {// StartEventWatcher starts sending events received from this EventBroadcaster to the given// event handler function. The return value can be ignored or used to stop recording, if// desired.StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface// StartRecordingToSink starts sending events received from this EventBroadcaster to the given// sink. The return value can be ignored or used to stop recording, if desired.StartRecordingToSink(sink EventSink) watch.Interface// StartLogging starts sending events received from this EventBroadcaster to the given logging// function. The return value can be ignored or used to stop recording, if desired.StartLogging(logf func(format string, args ...interface{})) watch.Interface// StartStructuredLogging starts sending events received from this EventBroadcaster to the structured// logging function. The return value can be ignored or used to stop recording, if desired.StartStructuredLogging(verbosity klog.Level) watch.Interface// NewRecorder returns an EventRecorder that can be used to send events to this EventBroadcaster// with the event source set to the given event source.NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorderLogger// Shutdown shuts down the broadcaster. Once the broadcaster is shut// down, it will only try to record an event in a sink once before// giving up on it with an error message.Shutdown()
}1.3- NewRecorder 接口的实现 
Recorder 封装了 Broadcaster 
// 路径 mod/k8s.io/client-gov0.29.0/tools/record/event.go// NewRecorder returns an EventRecorder that records events with the given event source.
func (e *eventBroadcasterImpl) NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorderLogger {return recorderImplLogger{recorderImpl: recorderImpl{scheme, source, e.Broadcaster, clock.RealClock{}}, logger: klog.Background()}
}type recorderImplLogger struct {*recorderImpllogger klog.Logger
}type recorderImpl struct {scheme *runtime.Schemesource v1.EventSource*watch.Broadcasterclock clock.PassiveClock
}1.3- loopevent的分发 
// // 路径 /mod/k8s.io/apimachineryv0.29.0/pkg/watch/mux.go
// loop receives from m.incoming and distributes to all watchers.
func (m *Broadcaster) loop() {// Deliberately not catching crashes here. Yes, bring down the process if theres a// bug in watch.Broadcaster.for event : range m.incoming {if event.Type  internalRunFunctionMarker {event.Object.(functionFakeRuntimeObject)()continue}m.distribute(event)  // 将 event 分发给 watcher}m.closeAll()m.distributing.Done()
}// distribute sends event to all watchers. Blocking.
func (m *Broadcaster) distribute(event Event) {if m.fullChannelBehavior  DropIfChannelFull {for _, w : range m.watchers {select {case w.result - event: // 将 event 发送到 watcher 的 result channel等待 watcher 进行处理case -w.stopped:default: // Dont block if the event cant be queued.}}} else {for _, w : range m.watchers {select {case w.result - event:case -w.stopped:}}}
}1.4 event 的产生 
// 路径 mod/k8s.io/client-gov0.29.0/tools/record/event.go// EventRecorder knows how to record events on behalf of an EventSource.
type EventRecorder interface {// Event constructs an event from the given information and puts it in the queue for sending.// object is the object this event is about. Event will make a reference-- or you may also// pass a reference to the object directly.// eventtype of this event, and can be one of Normal, Warning. New types could be added in future// reason is the reason this event is generated. reason should be short and unique; it// should be in UpperCamelCase format (starting with a capital letter). reason will be used// to automate handling of events, so imagine people writing switch statements to handle them.// You want to make that easy.// message is intended to be human readable.//// The resulting event will be created in the same namespace as the reference object.Event(object runtime.Object, eventtype, reason, message string)// Eventf is just like Event, but with Sprintf for the message field.Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{})// AnnotatedEventf is just like eventf, but with annotations attachedAnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{})
}func (recorder *recorderImpl) Event(object runtime.Object, eventtype, reason, message string) {recorder.generateEvent(klog.Background(), object, nil, eventtype, reason, message)
}func (recorder *recorderImpl) generateEvent(logger klog.Logger, object runtime.Object, annotations map[string]string, eventtype, reason, message string) {ref, err : ref.GetReference(recorder.scheme, object)if err ! nil {logger.Error(err, Could not construct reference, will not report event, object, object, eventType, eventtype, reason, reason, message, message)return}if !util.ValidateEventType(eventtype) {logger.Error(nil, Unsupported event type, eventType, eventtype)return}event : recorder.makeEvent(ref, annotations, eventtype, reason, message)event.Source  recorder.sourceevent.ReportingInstance  recorder.source.Hostevent.ReportingController  recorder.source.Component// NOTE: events should be a non-blocking operation, but we also need to not// put this in a goroutine, otherwise well race to write to a closed channel// when we go to shut down this broadcaster.  Just drop events if we get overloaded,// and log an error if that happens (weve configured the broadcaster to drop// outgoing events anyway).sent, err : recorder.ActionOrDrop(watch.Added, event)if err ! nil {logger.Error(err, Unable to record event (will not retry!))return}if !sent {logger.Error(nil, Unable to record event: too many queued events, dropped event, event, event)}
}// Action distributes the given event among all watchers, or drops it on the floor
// if too many incoming actions are queued up.  Returns true if the action was sent,
// false if dropped.
func (m *Broadcaster) ActionOrDrop(action EventType, obj runtime.Object) (bool, error) {m.incomingBlock.Lock()defer m.incomingBlock.Unlock()// Ensure that if the broadcaster is stopped we do not send events to it.select {case -m.stopped:return false, fmt.Errorf(broadcaster already stopped)default:}select {case m.incoming - Event{action, obj}:return true, nildefault:return false, nil}
}1.5 watcher 的处理 
eventBroadcaster.StartLogging(klog.Infof)// StartLogging starts sending events received from this EventBroadcaster to the given logging function.
// The return value can be ignored or used to stop recording, if desired.
func (e *eventBroadcasterImpl) StartLogging(logf func(format string, args ...interface{})) watch.Interface {return e.StartEventWatcher(func(e *v1.Event) {  // 对应 下面 eventHandlerlogf(Event(%#v): type: %v reason: %v %v, e.InvolvedObject, e.Type, e.Reason, e.Message)})
}// StartEventWatcher starts sending events received from this EventBroadcaster to the given event handler function.
// The return value can be ignored or used to stop recording, if desired.
func (e *eventBroadcasterImpl) StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface {watcher, err : e.Watch()if err ! nil {klog.FromContext(e.cancelationCtx).Error(err, Unable start event watcher (will not retry!))}go func() {  // 直接运行了defer utilruntime.HandleCrash()for {select {case -e.cancelationCtx.Done():watcher.Stop()returncase watchEvent : -watcher.ResultChan():  // 从 watcher result channel 中取出 event event, ok : watchEvent.Object.(*v1.Event)if !ok {// This is all local, so theres no reason this should// ever happen.continue}eventHandler(event) // 对 event 进行处理 }}}()return watcher
} 
附录1 | 示例详解 
以下是一个完整的 EventRecorder 和 EventBroadcaster 实例化的代码示例展示如何在 Kubernetes 控制器中记录事件。代码包含详细注释适合用于实际开发或学习 代码示例 
package mainimport (contextfmttimecorev1 k8s.io/api/core/v1k8s.io/apimachinery/pkg/runtimemetav1 k8s.io/apimachinery/pkg/runtime/schemak8s.io/client-go/kubernetesk8s.io/client-go/kubernetes/fakek8s.io/client-go/tools/recordk8s.io/client-go/util/workqueuek8s.io/klog/v2
)// 主要逻辑的入口
func main() {// 1. 创建 Kubernetes 客户端// 这里我们使用 fake 客户端进行演示生产环境应替换为真实的 kubernetes.Clientsetclientset : fake.NewSimpleClientset()// 2. 创建事件广播器EventBroadcaster// 事件广播器是事件处理的核心负责分发事件到日志和 API ServereventBroadcaster : record.NewBroadcaster()// 3. 启动日志记录功能// 通过 klog.Infof 输出事件到控制台或日志文件eventBroadcaster.StartLogging(klog.Infof)// 4. 启动事件的 API Server 记录功能// 配置事件接收器将事件发送到 API Server通过 kubeClient.CoreV1().Events 接口记录事件eventBroadcaster.StartRecordingToSink(corev1.EventSinkImpl{Interface: clientset.CoreV1().Events(),})// 5. 创建事件记录器EventRecorder// EventRecorder 用于开发者实际记录事件recorder : eventBroadcaster.NewRecorder(scheme(), // 提供资源的模式信息常用的是 runtime.NewScheme() 或自定义的 schemecorev1.EventSource{Component: example-controller},)// 6. 模拟一个 Kubernetes 对象如 Pod的引用// 事件通常需要与具体的 Kubernetes 资源关联objRef : corev1.ObjectReference{Kind:       Pod,                  // 资源类型Namespace:  default,              // 命名空间Name:       example-pod,          // 资源名称UID:        12345-abcde-67890,    // 唯一标识符APIVersion: v1,                   // API 版本}// 7. 使用 EventRecorder 记录事件// 记录一个正常类型的事件EventTypeNormalrecorder.Eventf(objRef, corev1.EventTypeNormal, PodCreated, Successfully created Pod %s, objRef.Name)// 模拟一个警告事件EventTypeWarningrecorder.Eventf(objRef, corev1.EventTypeWarning, PodFailed, Failed to create Pod %s due to insufficient resources, objRef.Name)// 模拟一个控制器逻辑的操作processQueue(recorder, objRef)// 等待事件记录完成time.Sleep(2 * time.Second)
}// 模拟处理队列的函数
func processQueue(recorder record.EventRecorder, objRef *corev1.ObjectReference) {// 创建一个工作队列queue : workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())// 将任务添加到队列queue.Add(task1)// 模拟处理队列中的任务for queue.Len()  0 {item, _ : queue.Get()defer queue.Done(item)// 记录一个事件表明任务已处理recorder.Eventf(objRef, corev1.EventTypeNormal, TaskProcessed, Successfully processed task: %v, item)fmt.Printf(Processed task: %v\n, item)}
}// 创建 Scheme 用于事件记录器
// Scheme 是 Kubernetes 中资源的模式定义用于确定资源类型和序列化方式
func scheme() *runtime.Scheme {s : runtime.NewScheme()// 添加 CoreV1 资源到 Scheme 中corev1.AddToScheme(s)metav1.AddToGroupVersion(s, schema.GroupVersion{Version: v1})return s
}代码详解 
1. 创建 EventBroadcaster 
record.NewBroadcaster(): 创建事件广播器用于记录和分发事件。StartLogging(klog.Infof): 将事件以日志的形式输出。StartRecordingToSink: 将事件发送到 Kubernetes API Server存储为 Event 资源。 
2. 创建 EventRecorder 
NewRecorder(scheme, source)从广播器中创建事件记录器。 scheme: 用于验证和序列化资源。source: 指定事件的来源如 example-controller。  
3. 创建对象引用 
ObjectReference: 用于标识事件关联的 Kubernetes 资源。包括类型、名称、命名空间、UID 等信息。 
4. 记录事件 Eventf 用于记录事件包括   类型corev1.EventTypeNormal正常或 corev1.EventTypeWarning警告。原因事件发生的原因如 PodCreated。消息事件的详细描述。  
5. 模拟队列任务 
使用 workqueue 模拟处理任务记录任务完成时的事件。 运行结果 
日志输出 
事件将输出到日志通过 klog 
I1119 12:34:56.123456   12345 example.go:52] Event(v1.ObjectReference{...}): type: Normal reason: PodCreated message: Successfully created Pod example-pod
I1119 12:34:56.123457   12345 example.go:53] Event(v1.ObjectReference{...}): type: Warning reason: PodFailed message: Failed to create Pod example-pod due to insufficient resources
Processed task: task1事件存储 
如果使用真实客户端事件会存储在集群中可通过 kubectl 查看 
kubectl get events事件输出 
LAST SEEN   TYPE      REASON         OBJECT            MESSAGE
5s          Normal    PodCreated     Pod/example-pod   Successfully created Pod example-pod
5s          Warning   PodFailed      Pod/example-pod   Failed to create Pod example-pod due to insufficient resources
5s          Normal    TaskProcessed  Pod/example-pod   Successfully processed task: task1代码用途 
日志记录和事件管理 帮助开发者跟踪控制器的运行状态和资源变更。任务队列处理 将业务逻辑与事件机制结合记录每个关键操作的状态。 以上代码展示了如何使用 EventRecorder 和 EventBroadcaster 实现 Kubernetes 控制器中的事件管理适合用于开发自定义控制器或调试集群事件处理逻辑。 
附录2 | stoped 变量的作用 
在 NewBroadcaster 函数中的 stopped 通道用于实现对 Broadcaster 对象的停止和关闭控制。具体来说它的作用是在广播器的生命周期中进行信号传递用于通知 Broadcaster 是否已经停止运行。 
详细分析 
1. 通道的定义 
stopped: make(chan struct{}),stopped 是一个无缓冲的通道类型为 struct{}。无缓冲的通道用于信号传递表示某个事件的发生而不需要传递具体数据。这里的 struct{} 是一个空结构体占用零内存因此不会传递任何实际数据。 
2. 停止广播器的作用 
stopped 通道用于在广播器停止时传递一个信号。通常这种信号用于通知相关的 goroutine 或者处理流程广播器已经停止工作可以做一些清理操作或者退出。 
3. 与 go m.loop() 的配合 
go m.loop()在 NewBroadcaster 中启动了一个新的 goroutine 来执行 m.loop()。这个 loop 方法通常是处理传入事件并进行分发的核心逻辑。loop 方法可能会定期检查 stopped 通道判断是否已经收到停止信号。 
4. 典型的停止逻辑假设 
func (m *Broadcaster) loop() {for {select {case event : -m.incoming:// 处理事件逻辑case -m.stopped:// 收到停止信号后退出 goroutinereturn}}
}在这个假设的 loop 实现中select 语句等待从 m.incoming 通道接收事件或者等待 stopped 通道的信号。当收到 stopped 通道的信号时loop 方法退出从而停止事件的分发。 
5. 停止广播器的触发 
在实际代码中某个外部操作可能会通过 close(m.stopped) 或者发送一个信号到 m.stopped 通道来通知 Broadcaster 停止工作。比如在处理完所有事件或者发生错误时可能会调用停止操作 
close(m.stopped)或者 
m.stopped - struct{}{}这样 loop 就会检测到停止信号并退出。 总结 
stopped 通道在 Broadcaster 中的作用是提供一个停止信号通知正在运行的 goroutine如 loop 方法停止执行。这种设计使得 Broadcaster 能够优雅地处理停止操作确保所有 goroutine 都能够适时退出并清理资源。