当前位置: 首页 > news >正文

网站应该怎么建设用wampserver搭建网站

网站应该怎么建设,用wampserver搭建网站,如何看一个关键词在某个网站是否被百度收录,东南亚跨境电商有哪些平台原理 通过用一个goroutine以及堆来存储要待调度的延迟任务#xff0c;当达到调度时间后#xff0c;将其添加到协程池中去执行。 主要是使用了chan、Mutex、atomic及ants协程池来实现。 用途 主要是用于高并发及大量定时任务要处理的情况#xff0c;如果使用Go协程来实现每…原理 通过用一个goroutine以及堆来存储要待调度的延迟任务当达到调度时间后将其添加到协程池中去执行。 主要是使用了chan、Mutex、atomic及ants协程池来实现。 用途 主要是用于高并发及大量定时任务要处理的情况如果使用Go协程来实现每次延迟任务的调度那么数量极大的goroutine将会占用内存导致性能下降使用协程池实现延迟任务的调度会改善该情况。 如在物联网设备中当连接数量达到几十万时如果使用goroutine来处理心跳或者活跃检测频繁的创建销毁goroutine会影响性能。 特色 在常见的cron等开源框架中使用的是数组存储待调度的任务每次循环时都要排序并且要删除某个任务则时间复杂度是O(n)。 本文通过使用堆及双重Map优化存储待调度的任务使得添加任务时间复杂度为O(log n)获取任务时间复杂度为O(1)删除时间复杂度为O(1)。 调度器并不会真正的删除取消任务当取消任务达到执行时间时会直接continue是为了提高删除效率如果要删除取消任务那么删除的时间复杂度为O(log n)当有极大量任务时会占用一些内存通过空间换时间来提高删除效率下文也提供了删除取消任务的实现根据不同的场景使用不同的定时任务。 API 创建 NewSchedule(workerNum int, options ...ants.Option) (*Schedule, error) //创建协程数是1的延迟任务调度器 s, _ : NewSchedule(1)创建一个延迟调度任务器workerNum是协程数量options是ants协程池的配置除了WithMaxBlockingTasks不能配置别的都可以具体参考https://github.com/panjf2000/ants 调度一次 func (s *Schedule) ScheduleOne(job func(), duration time.Duration) (TaskId, error) //1秒后打印一次时间 taskId, _ : s.ScheduleOne(func() {fmt.Println(time.Now()) }, time.Second)重复调度 func (s *Schedule) Schedule(job func(), duration time.Duration) (TaskId, error) //每隔一秒打印一次时间 taskId, _ : s.Schedule(func() {fmt.Println(time.Now()) }, time.Second)取消调度 func (s *Schedule) Schedule(job func(), duration time.Duration) (TaskId, error) //每隔一秒打印一次时间 taskId, _ : s.Schedule(func() {fmt.Println(time.Now()) }, time.Second) //休眠3秒后取消调度 time.Sleep(3 * time.Second) s.CancelTask(taskId)停止调度 func (s *Schedule) Schedule(job func(), duration time.Duration) (TaskId, error) //每隔一秒打印一次时间 taskId, _ : s.Schedule(func() {fmt.Println(time.Now()) }, time.Second) //休眠3秒后停用延迟任务调度器 time.Sleep(3 * time.Second) s.Shutdown()代码 package scheduleimport (container/heaperrorsgithub.com/panjf2000/ants/v2mathsync/atomictime )var (// ErrScheduleShutdown 延迟任务调度器已关闭错误ErrScheduleShutdown errors.New(schedule: schedule is already in shutdown) )const invalidTaskId 0type TaskId uint32 type OriginalTaskId uint32// Schedule 延迟调度的结构体提供延迟调度任务的全部方法 // 通过NewSchedule方法创建Schedule通过Schedule、ScheduleOne方法添加延迟调度任务通过CancelTask方法取消任务通过Shutdown停止延迟任务 type Schedule struct {//任务堆按时间排序taskHeap taskHeap//可执行的任务Mapkey是当前的任务idvalue是任务的第一次原始id用于优化取消任务时需要遍历堆去删除executeTaskIdMap map[TaskId]OriginalTaskId//任务id的Mapkey是任务的第一次原始idvalue是当前的任务id用于优化取消任务时需要遍历堆去删除originalTaskIdMap map[OriginalTaskId]TaskId//调度器是否运行中running atomic.Bool//下一个任务idnextTaskId atomic.Uint32//任务运行池pool *ants.Pool//添加任务ChanaddTaskChan chan *Task//删除任务ChanstopTaskChan chan struct{}//取消任务ChancancelTaskChan chan OriginalTaskId }// NewSchedule 构建一个Schedule // workerNum 工作的协程数量options ants协程池的配置除了WithMaxBlockingTasks不能配置别的都可以具体参考https://github.com/panjf2000/ants func NewSchedule(workerNum int, options ...ants.Option) (*Schedule, error) {//延迟任务的最大任务数量必须不限制options append(options, ants.WithMaxBlockingTasks(0))//创建一个协程池pool, err : ants.NewPool(workerNum)if err ! nil {return nil, err}//创建一个延迟调度结构体s : Schedule{taskHeap: make(taskHeap, 0),executeTaskIdMap: make(map[TaskId]OriginalTaskId),originalTaskIdMap: make(map[OriginalTaskId]TaskId),running: atomic.Bool{},nextTaskId: atomic.Uint32{},pool: pool,addTaskChan: make(chan *Task),stopTaskChan: make(chan struct{}),cancelTaskChan: make(chan OriginalTaskId),}//启动调度 会开启一个协程去将即将要调度的任务添加到协程池中运行s.start()return s, nil }// ScheduleOne 添加延迟调度任务只调度一次 // job 执行的方法 duration 周期间隔如果是负数立马执行如果是负数立马且只执行一次 func (s *Schedule) ScheduleOne(job func(), duration time.Duration) (uint32, error) {return s.doSchedule(job, duration, true) }// Schedule 添加延迟调度任务重复调度 // job 执行的方法 duration 周期间隔如果是负数立马且只执行一次 func (s *Schedule) Schedule(job func(), duration time.Duration) (uint32, error) {return s.doSchedule(job, duration, false) }// doSchedule 添加延迟调度任务的具体实现 func (s *Schedule) doSchedule(job func(), duration time.Duration, onlyOne bool) (uint32, error) {if s.running.Load() {//如果是负数 只执行一次if duration 0 {onlyOne true}nextTaskId : s.getNextTaskId()task : new(Task)task.job jobtask.executeTime time.Now().Add(duration)task.onlyOne onlyOnetask.duration durationtask.originalId OriginalTaskId(nextTaskId)task.id TaskId(nextTaskId)s.addTaskChan - taskreturn uint32(task.originalId), nil} else {return invalidTaskId, ErrScheduleShutdown} }// CancelTask 取消延迟调度任务 // taskId 任务id func (s *Schedule) CancelTask(taskId uint32) {if s.running.Load() {if taskId ! invalidTaskId {s.cancelTaskChan - OriginalTaskId(taskId)}} }// Shutdown 结束延迟任务调度 func (s *Schedule) Shutdown() {//通过cas设值if s.running.CompareAndSwap(true, false) {s.stopTaskChan - struct{}{}} }// IsShutdown 延迟任务调度是否关闭 func (s *Schedule) IsShutdown() bool {return !s.running.Load() }// start 启动延迟任务调度 func (s *Schedule) start() {s.running.Store(true)go func() {for {now : time.Now()var timer *time.Timer//如果没有任务提交睡眠等待任务if s.taskHeap.Len() 0 {timer time.NewTimer(math.MaxUint16 * time.Hour)} else {//查看第一个要执行的任务是否是被取消的task : s.taskHeap.Peek()_, ok : s.executeTaskIdMap[task.id]if !ok {//是被取消的任务移除后continueheap.Pop(s.taskHeap)continue} else {//设置执行间隔timer time.NewTimer(task.executeTime.Sub(now))}}select {case -timer.C://到达第一个任务执行时间task : heap.Pop(s.taskHeap).(*Task)//提交到线程池执行返回的error不需要处理因为任务池是无限大_ s.pool.Submit(task.job)//单次执行则删除多次执行则更新if task.onlyOne {s.removeTask(task.originalId, task.id)} else {s.updateTask(task)}case originalTaskId : -s.cancelTaskChan:timer.Stop()//如果取消的任务id在待执行任务列表中则删除任务if taskId, ok : s.originalTaskIdMap[originalTaskId]; ok {s.removeTask(originalTaskId, taskId)}case task : -s.addTaskChan:timer.Stop()//添加任务s.addTask(task)case -s.stopTaskChan:timer.Stop()//关闭资源s.close()return}}}() }// updateTask 更新延迟调度任务 func (s *Schedule) updateTask(executedTask *Task) {//拷贝 并设置新的执行时间和IDtask : *executedTasktask.executeTime time.Now().Add(task.duration)nextTaskId : s.getNextTaskId()task.id TaskId(nextTaskId)//把已执行的任务删除s.removeTask(invalidTaskId, executedTask.id)//添加新的任务s.addTask(task) }// removeTask 移除任务 func (s *Schedule) removeTask(originalTaskId OriginalTaskId, taskId TaskId) {//如果原始的任务ID不为空则为使用者取消的从任务Map中也删除if originalTaskId ! invalidTaskId {delete(s.originalTaskIdMap, originalTaskId)}delete(s.executeTaskIdMap, taskId) }// addTask 添加任务 func (s *Schedule) addTask(task *Task) {s.originalTaskIdMap[task.originalId] task.ids.executeTaskIdMap[task.id] task.originalIdheap.Push(s.taskHeap, task) }// getNextTaskId 获取下一个任务id func (s *Schedule) getNextTaskId() uint32 {taskId : s.nextTaskId.Add(1)if taskId invalidTaskId {taskId s.nextTaskId.Add(1)}return taskId }// close 关闭Schedule资源和协程池的资源 func (s *Schedule) close() {//关闭所有资源并设置为 nil help gcs.taskHeap nils.executeTaskIdMap nils.originalTaskIdMap nils.pool.Release()s.pool nilclose(s.addTaskChan)close(s.cancelTaskChan)close(s.stopTaskChan)s.addTaskChan nils.cancelTaskChan nils.stopTaskChan nil }// Task 调度任务结构体是一个调度任务的实体信息 type Task struct {// 原始id用于Schedule本身的删除使用用两层Map的方式优化数组删除的O(n)时间复杂度originalId OriginalTaskId// 任务idid TaskId// 执行的时间每次执行完如果重复调度就重新计算executeTime time.Time// 周期间隔duration time.Duration// 执行的任务job func()// 是否只执行一次onlyOne bool }// 任务的堆使用队只需要在添加的时候进行排序堆顶是最先要执行的任务 type taskHeap []*Task// 下面都是堆接口的实现func (t *taskHeap) Len() int {return len(*t) } func (t *taskHeap) Less(i, j int) bool {return (*t)[i].executeTime.Before((*t)[j].executeTime) }func (t *taskHeap) Swap(i, j int) {(*t)[i], (*t)[j] (*t)[j], (*t)[i] }func (t *taskHeap) Push(x interface{}) {*t append(*t, x.(*Task)) }func (t *taskHeap) Pop() interface{} {old : *tn : len(old)x : old[n-1]old[n-1] nil*t old[:n-1]return x }// Peek 查看堆顶元素非堆接口的实现 func (t *taskHeap) Peek() *Task {return (*t)[0] } 代码加上详细的中文注解大约300行。 github地址 https://github.com/xzc-coder/go-schedule 另一个版本的实现删除时间复杂度为O(log n)相对上文中的实现占用的内存会少但是删除效率会变低。 package scheduleimport (container/heaperrorsgithub.com/panjf2000/ants/v2mathsync/atomictime )var (// ErrScheduleShutdown 延迟任务调度器已关闭错误ErrScheduleShutdown errors.New(schedule: schedule is already in shutdown) )const invalidTaskId 0type TaskId uint32// Schedule 延迟调度的结构体提供延迟调度任务的全部方法 // 通过NewSchedule方法创建Schedule通过Schedule、ScheduleOne方法添加延迟调度任务通过CancelTask方法取消任务通过Shutdown停止延迟任务 type Schedule struct {//任务堆按时间排序taskHeap taskHeaptaskMap map[TaskId]*Task//调度器是否运行中running atomic.Bool//下一个任务idnextTaskId atomic.Uint32//任务运行池pool *ants.Pool//添加任务ChanaddTaskChan chan *Task//删除任务ChanstopTaskChan chan struct{}//取消任务ChancancelTaskChan chan TaskId }// NewSchedule 构建一个Schedule // workerNum 工作的协程数量options ants协程池的配置除了WithMaxBlockingTasks不能配置别的都可以具体参考https://github.com/panjf2000/ants func NewSchedule(workerNum int, options ...ants.Option) (*Schedule, error) {//延迟任务的最大任务数量必须不限制options append(options, ants.WithMaxBlockingTasks(0))//创建一个协程池pool, err : ants.NewPool(workerNum)if err ! nil {return nil, err}//创建一个延迟调度结构体s : Schedule{taskHeap: make(taskHeap, 0),taskMap: make(map[TaskId]*Task),running: atomic.Bool{},nextTaskId: atomic.Uint32{},pool: pool,addTaskChan: make(chan *Task),stopTaskChan: make(chan struct{}),cancelTaskChan: make(chan TaskId),}//启动调度 会开启一个协程去将即将要调度的任务添加到协程池中运行s.start()return s, nil }// ScheduleOne 添加延迟调度任务只调度一次 // job 执行的方法 duration 周期间隔如果是负数立马执行如果是负数立马且只执行一次 func (s *Schedule) ScheduleOne(job func(), duration time.Duration) (uint32, error) {return s.doSchedule(job, duration, true) }// Schedule 添加延迟调度任务重复调度 // job 执行的方法 duration 周期间隔如果是负数立马且只执行一次 func (s *Schedule) Schedule(job func(), duration time.Duration) (uint32, error) {return s.doSchedule(job, duration, false) }// doSchedule 添加延迟调度任务的具体实现 func (s *Schedule) doSchedule(job func(), duration time.Duration, onlyOne bool) (uint32, error) {if s.running.Load() {//如果是负数 只执行一次if duration 0 {onlyOne true}nextTaskId : s.getNextTaskId()task : new(Task)task.job jobtask.executeTime time.Now().Add(duration)task.onlyOne onlyOnetask.duration durationtask.id TaskId(nextTaskId)task.index 0s.addTaskChan - taskreturn uint32(task.id), nil} else {return invalidTaskId, ErrScheduleShutdown} }// CancelTask 取消延迟调度任务 // taskId 任务id func (s *Schedule) CancelTask(taskId uint32) {if s.running.Load() {if taskId ! invalidTaskId {s.cancelTaskChan - TaskId(taskId)}} }// Shutdown 结束延迟任务调度 func (s *Schedule) Shutdown() {//通过cas设值if s.running.CompareAndSwap(true, false) {s.stopTaskChan - struct{}{}} }// IsShutdown 延迟任务调度是否关闭 func (s *Schedule) IsShutdown() bool {return !s.running.Load() }// start 启动延迟任务调度 func (s *Schedule) start() {s.running.Store(true)go func() {for {now : time.Now()var timer *time.Timer//如果没有任务提交睡眠等待任务if s.taskHeap.Len() 0 {timer time.NewTimer(math.MaxUint16 * time.Hour)} else {task : s.taskHeap.Peek()//设置执行间隔timer time.NewTimer(task.executeTime.Sub(now))}select {case -timer.C://到达第一个任务执行时间task : heap.Pop(s.taskHeap).(*Task)//提交到线程池执行返回的error不需要处理因为任务池是无限大_ s.pool.Submit(task.job)//单次执行则删除多次执行则更新if task.onlyOne {s.removeTask(false, task)} else {s.updateTask(task)}case taskId : -s.cancelTaskChan:timer.Stop()//如果取消的任务id在待执行任务列表中则删除任务if task, ok : s.taskMap[taskId]; ok {s.removeTask(true, task)}case task : -s.addTaskChan:timer.Stop()//添加任务s.addTask(task)case -s.stopTaskChan:timer.Stop()//关闭资源s.close()return}}}() }// updateTask 更新延迟调度任务 func (s *Schedule) updateTask(executedTask *Task) {//拷贝 并设置新的执行时间和IDtask : *executedTasktask.executeTime time.Now().Add(task.duration)//把已执行的任务删除s.removeTask(false, executedTask)//添加新的任务s.addTask(task) }// removeTask 移除任务 func (s *Schedule) removeTask(removeHeap bool, task *Task) {//从Map和堆中delete(s.taskMap, task.id)if removeHeap {heap.Remove(s.taskHeap, task.index)} }// addTask 添加任务 func (s *Schedule) addTask(task *Task) {heap.Push(s.taskHeap, task)s.taskMap[task.id] task }// getNextTaskId 获取下一个任务id func (s *Schedule) getNextTaskId() uint32 {taskId : s.nextTaskId.Add(1)if taskId invalidTaskId {taskId s.nextTaskId.Add(1)}return taskId }// close 关闭Schedule资源和协程池的资源 func (s *Schedule) close() {//关闭所有资源并设置为 nil help gcs.taskHeap nils.taskMap nils.pool.Release()s.pool nilclose(s.addTaskChan)close(s.cancelTaskChan)close(s.stopTaskChan)s.addTaskChan nils.cancelTaskChan nils.stopTaskChan nil }// Task 调度任务结构体是一个调度任务的实体信息 type Task struct {// 任务idid TaskId// 执行的时间每次执行完如果重复调度就重新计算executeTime time.Time// 周期间隔duration time.Duration// 执行的任务job func()// 是否只执行一次onlyOne bool//所在堆数组的下标位置index int }// 任务的堆使用队只需要在添加的时候进行排序堆顶是最先要执行的任务 type taskHeap []*Task// 下面都是堆接口的实现func (t *taskHeap) Len() int {return len(*t) } func (t *taskHeap) Less(i, j int) bool {return (*t)[i].executeTime.Before((*t)[j].executeTime) }func (t *taskHeap) Swap(i, j int) {(*t)[i], (*t)[j] (*t)[j], (*t)[i](*t)[i].index i(*t)[j].index j }func (t *taskHeap) Push(x interface{}) {*t append(*t, x.(*Task)) }func (t *taskHeap) Pop() interface{} {old : *tn : len(old)x : old[n-1]old[n-1] nil*t old[:n-1]return x }// Peek 查看堆顶元素非堆接口的实现 func (t *taskHeap) Peek() *Task {return (*t)[0] }
http://www.dnsts.com.cn/news/78706.html

相关文章:

  • 视频网站模板源码网站定制制作公司
  • 有什么软件做短视频网站分销系统开发公司
  • wordpress 小说站ip可以用wordpress
  • 做网站谈单刷网站流量有用吗
  • 网站推广计划怎么做提高工作效率
  • 宜兴做网站的公司新手站长做游戏网站好吗
  • 网站做标签页关于做网站公司周年大促销
  • 怎么给网站制作二维码四川网站建设 四川冠辰科技
  • 西安网站建设哪家公司好网站备案幕布照片尺寸
  • 贵阳网站建设咨询wordpress计费查询
  • 天津网站建设工作室唐山网站快速排名提升
  • logo设计网站免费无水印淘宝推广方法有哪些
  • 网站如何做才可以微信直接登录网上学设计的培训机构
  • 光明楼网站建设小程序推广的十种方式
  • 心理咨询网站平台建设网站文章发布
  • 网站没有索引量是什么意思韩国有哪些做潮牌的网站
  • 阴阳师网站建设商务网站建设sz886
  • 福建住房和城乡建设部网站ui设计好就业吗
  • 中山移动网站建设报价wordpress 网摘插件
  • 装修公司网站怎么建设公司做网站能抵扣进项税吗
  • 西宁网站搭建企业网站的规划与建设
  • 成都网站建设排名建设电动车官网
  • 网站设计属于什么分类号网站建设费用 无形资产
  • 郑州网站开发招聘快速装修
  • 建立网站服务的公司网站江西建筑工程网
  • 东莞营销网站建设公司山西seo博客
  • 0716网站建设网络优化的工作内容有哪些
  • 重庆市建设工程信息官方网站报价单模板免费下载
  • 对网站的赏析怎么让网站快速收录
  • 科网站建设无锡网站制作建设