golang에서 지연된 작업을 구현하는 방법은 무엇입니까? 다음 글은 golang을 기반으로 한 일련의 지연된 작업 솔루션을 여러분과 공유할 것입니다. 이것이 여러분에게 도움이 되기를 바랍니다!
실제 비즈니스 시나리오에서는 때때로 지연 요구 사항이 발생할 수 있습니다. 예를 들어 전자 상거래 플랫폼에서 운영자가 관리 백그라운드에 제품을 추가한 후 프런트 데스크에 즉시 표시할 필요가 없습니다. , 그러나 특정 시점에 표시됩니다.
물론 우리는 이 문제를 해결하기 위한 많은 아이디어를 가지고 있습니다. 예를 들어, 출시할 제품 정보를 db에 추가한 후 예약된 작업을 통해 데이터 테이블을 폴링하여 현재 시점에 출시된 제품을 쿼리하는 또 다른 예는 모든 제품 정보를 redis에 추가하고 이를 완료하는 것입니다. SortSet 속성을 통해 작동합니다. 최종 선택은 당사의 비즈니스 시나리오와 운영 환경에 따라 다릅니다.
여기서는 golang을 기반으로 한 지연 작업 솔루션 세트를 여러분과 공유하고 싶습니다.
우리 모두는 모든 종류의 대기열이 실제로 생산자와 소비자라는 두 부분으로 구성되어 있다는 것을 알고 있습니다. 지연된 작업에는 일반 대기열에 비해 추가 지연 기능이 있습니다.
1. 생산자
생산자의 관점에서 사용자가 작업을 푸시하면 지연된 실행 시간 값이 전달됩니다. 이 작업이 예정된 시간에 실행되도록 하려면 이 작업을 일정 시간 동안 메모리에 저장해야 하며 시간은 1차원적이고 증가합니다. 그렇다면 이를 저장하기 위해 어떤 데이터 구조를 사용해야 할까요?
(1) 지도 중 하나를 선택하세요. 맵이 무질서하고 실행 시간에 따라 정렬할 수 없기 때문에 수행된 작업이 현재 시점에서 실행되어야 하는지 여부를 보장할 수 없으므로 이 옵션은 제외됩니다. (2) 선택 2: 채널. 실제로 채널은 때때로 대기열로 간주될 수 있습니다. 그러나 채널의 출력과 입력은 "선입선출" 원칙을 엄격하게 따릅니다. 불행하게도 고급 작업이 먼저 실행되지 않을 수 있으므로 채널은 적합하지 않습니다. (3) 선택 3:
slice. 슬라이스 요소는 순서가 지정되어 있기 때문에 슬라이싱이 가능해 보입니다. 따라서 모든 슬라이스 요소를 실행 시간 순서대로 배열할 수 있다면 매번 슬라이스의 헤드 요소(아마도 테일 요소)만 읽으면 됩니다. 우리가 원하는 작업.
2. 소비자
소비자 입장에서 보면 각 작업을 특정 시점에 어떻게 소비하게 만들느냐가 가장 큰 어려움입니다. 그렇다면 각 작업에 대해 실행하기 전에 일정 시간 동안 기다리게 하려면 어떻게 해야 할까요? 네, 타이머입니다. 결론적으로 "슬라이싱 + 타이머" 조합이 목적을 달성할 수 있어야 합니다.
단계별
1. 데이터 흐름
(1) 사용자는InitDelayQueue()를 호출하여 지연된 작업 개체를 초기화합니다.
(2) 코루틴을 열고 작업 작업 파이프라인(추가/삭제 신호)과 실행 시간 파이프라인(timer.C 신호)을 수신합니다.(3) 사용자가 추가/삭제 신호를 보냅니다. (4) (2)의 코루틴은 (3)의 신호를 캡처하고 작업 목록을 변경합니다.
(5) 작업 실행 시점이 도래하면(timer.C 파이프라인에서 출력되는 요소가 있는 경우) 작업을 실행합니다.2. 데이터 구조
(1) 지연된 작업 개체
// 延时任务对象 type DelayQueue struct { tasks []*task // 存储任务列表的切片 add chan *task // 用户添加任务的管道信号 remove chan string // 用户删除任务的管道信号 waitRemoveTaskMapping map[string]struct{} // 等待删除的任务id列表 }
여기에
waitRemoveTaskMapping필드가 있다는 점에 유의해야 합니다. 삭제할 작업이 아직 추가 파이프라인에 있고 작업 필드에서 적시에 업데이트되지 않았을 수 있으므로 고객이 삭제하려는 작업의 ID를 임시로 기록해야 합니다.
(2) 태스크 객체
// 任务对象 type task struct { id string // 任务id execTime time.Time // 执行时间 f func() // 执行函数 }
3. 초기화 지연 태스크 객체
// 初始化延时任务对象 func InitDelayQueue() *DelayQueue { q := &DelayQueue{ add: make(chan *task, 10000), remove: make(chan string, 100), waitRemoveTaskMapping: make(map[string]struct{}), } return q }이 과정에서 태스크에 대한 사용자의 조작 신호와 태스크의 실행 시간 신호를 모니터링해야 합니다.
func (q *DelayQueue) start() { for { // to do something... select { case now := <-timer.C: // 任务执行时间信号 // to do something... case t := <-q.add: // 任务推送信号 // to do something... case id := <-q.remove: // 任务删除信号 // to do something... } } }
// 初始化延时任务对象 func InitDelayQueue() *DelayQueue { q := &DelayQueue{ add: make(chan *task, 10000), remove: make(chan string, 100), waitRemoveTaskMapping: make(map[string]struct{}), } // 开启协程,监听任务相关信号 go q.start() return q }
4. 생산자가 작업을 푸시할 때 추가 파이프라인에 작업을 추가하기만 하면 됩니다. 여기서는 작업 ID를 생성하고 사용자에게 반환합니다. . 在这里,我们要将用户推送的任务放到延时任务的tasks字段中。由于,我们需要将任务按照执行时间顺序排序,所以,我们需要找到新增任务在切片中的插入位置。又因为,插入之前的任务列表已经是有序的,所以,我们可以采用二分法处理。 找到正确的插入位置后,我们才能将任务准确插入: 那么,在监听add管道的时候,我们直接调用上述addTask() 即可。 在这里,我们可以遍历任务列表,根据删除任务的id找到其在切片中的对应index。 然后,我们可以完善start()方法了。 start()执行的时候,分成两种情况:任务列表为空,只需要监听add管道即可;任务列表不为空的时候,需要监听所有管道。任务执行信号,主要是依靠timer来实现,属于第二种情况。 执行任务: 结束任务,刷新任务列表: delay_queue.go 测试代码:delay_queue_test.go 头脑风暴 上面的方案,的确实现了延时任务的效果,但是其中仍然有一些问题,仍然值得我们思考和优化。 1、按照上面的方案,如果大量延时任务的执行时间,集中在同一个时间点,会造成短时间内timer频繁地创建和销毁。 2、上述方案相比于time.AfterFunc()方法,我们需要在哪些场景下作出取舍。 3、如果服务崩溃或重启,如何去持久化队列中的任务。 本文和大家讨论了延时任务在golang中的一种实现方案,在这个过程中,一次性定时器timer、切片、管道等golang特色,以及二分插入等常见算法都体现得淋漓尽致。// 用户推送任务
func (q *DelayQueue) Push(timeInterval time.Duration, f func()) string {
// 生成一个任务id,方便删除使用
id := genTaskId()
t := &task{
id: id,
execTime: time.Now().Add(timeInterval),
f: f,
}
// 将任务推到add管道中
q.add <- t
return id
}
5、任务推送信号的处理
// 使用二分法判断新增任务的插入位置
func (q *DelayQueue) getTaskInsertIndex(t *task, leftIndex, rightIndex int) (index int) {
if len(q.tasks) == 0 {
return
}
length := rightIndex - leftIndex
if q.tasks[leftIndex].execTime.Sub(t.execTime) >= 0 {
// 如果当前切片中最小的元素都超过了插入的优先级,则插入位置应该是最左边
return leftIndex
}
if q.tasks[rightIndex].execTime.Sub(t.execTime) <= 0 {
// 如果当前切片中最大的元素都没超过插入的优先级,则插入位置应该是最右边
return rightIndex + 1
}
if length == 1 && q.tasks[leftIndex].execTime.Before(t.execTime) && q.tasks[rightIndex].execTime.Sub(t.execTime) >= 0 {
// 如果插入的优先级刚好在仅有的两个优先级之间,则中间的位置就是插入位置
return leftIndex + 1
}
middleVal := q.tasks[leftIndex+length/2].execTime
// 这里用二分法递归的方式,一直寻找正确的插入位置
if t.execTime.Sub(middleVal) <= 0 {
return q.getTaskInsertIndex(t, leftIndex, leftIndex+length/2)
} else {
return q.getTaskInsertIndex(t, leftIndex+length/2, rightIndex)
}
}
// 将任务添加到任务切片列表中
func (q *DelayQueue) addTask(t *task) {
// 寻找新增任务的插入位置
insertIndex := q.getTaskInsertIndex(t, 0, len(q.tasks)-1)
// 找到了插入位置,更新任务列表
q.tasks = append(q.tasks, &task{})
copy(q.tasks[insertIndex+1:], q.tasks[insertIndex:])
q.tasks[insertIndex] = t
}
func (q *DelayQueue) start() {
for {
// to do something...
select {
case now := <-timer.C:
// 任务执行时间信号
// to do something...
case t := <-q.add:
// 任务推送信号
q.addTask(t)
case id := <-q.remove:
// 任务删除信号
// to do something...
}
}
}
6、生产者删除任务
// 用户删除任务
func (q *DelayQueue) Delete(id string) {
q.remove <- id
}
7、任务删除信号的处理
// 删除指定任务
func (q *DelayQueue) deleteTask(id string) {
deleteIndex := -1
for index, t := range q.tasks {
if t.id == id {
// 找到了在切片中需要删除的所以呢
deleteIndex = index
break
}
}
if deleteIndex == -1 {
// 如果没有找到删除的任务,说明任务还在add管道中,来不及更新到tasks中,这里我们就将这个删除id临时记录下来
// 注意,这里暂时不考虑,任务id非法的特殊情况
q.waitRemoveTaskMapping[id] = struct{}{}
return
}
if len(q.tasks) == 1 {
// 删除后,任务列表就没有任务了
q.tasks = []*task{}
return
}
if deleteIndex == len(q.tasks)-1 {
// 如果删除的是,任务列表的最后一个元素,则执行下列代码
q.tasks = q.tasks[:len(q.tasks)-1]
return
}
// 如果删除的是,任务列表的其他元素,则需要将deleteIndex之后的元素,全部向前挪动一位
copy(q.tasks[deleteIndex:len(q.tasks)-1], q.tasks[deleteIndex+1:len(q.tasks)-1])
q.tasks = q.tasks[:len(q.tasks)-1]
return
}
func (q *DelayQueue) start() {
for {
// to do something...
select {
case now := <-timer.C:
// 任务执行时间信号
// to do something...
case t := <-q.add:
// 任务推送信号
q.addTask(t)
case id := <-q.remove:
// 任务删除信号
q.deleteTask(id)
}
}
}
8、任务执行信号的处理
func (q *DelayQueue) start() {
for {
if len(q.tasks) == 0 {
// 任务列表为空的时候,只需要监听add管道
select {
case t := <-q.add:
//添加任务
q.addTask(t)
}
continue
}
// 任务列表不为空的时候,需要监听所有管道
// 任务的等待时间=任务的执行时间-当前的时间
currentTask := q.tasks[0]
timer := time.NewTimer(currentTask.execTime.Sub(time.Now()))
select {
case now := <-timer.C:
// 任务执行信号
timer.Stop()
if _, isRemove := q.waitRemoveTaskMapping[currentTask.id]; isRemove {
// 之前客户已经发出过该任务的删除信号,因此需要结束任务,刷新任务列表
q.endTask()
delete(q.waitRemoveTaskMapping, currentTask.id)
continue
}
// 开启协程,异步执行任务
go q.execTask(currentTask, now)
// 任务结束,刷新任务列表
q.endTask()
case t := <-q.add:
// 任务推送信号
timer.Stop()
q.addTask(t)
case id := <-q.remove:
// 任务删除信号
timer.Stop()
q.deleteTask(id)
}
}
}
// 执行任务
func (q *DelayQueue) execTask(task *task, currentTime time.Time) {
if task.execTime.After(currentTime) {
// 如果当前任务的执行时间落后于当前时间,则不执行
return
}
// 执行任务
task.f()
return
}
// 一个任务去执行了,刷新任务列表
func (q *DelayQueue) endTask() {
if len(q.tasks) == 1 {
q.tasks = []*task{}
return
}
q.tasks = q.tasks[1:]
}
9、完整代码
package delay_queue
import (
"go.mongodb.org/mongo-driver/bson/primitive"
"time"
)
// 延时任务对象
type DelayQueue struct {
tasks []*task // 存储任务列表的切片
add chan *task // 用户添加任务的管道信号
remove chan string // 用户删除任务的管道信号
waitRemoveTaskMapping map[string]struct{} // 等待删除的任务id列表
}
// 任务对象
type task struct {
id string // 任务id
execTime time.Time // 执行时间
f func() // 执行函数
}
// 初始化延时任务对象
func InitDelayQueue() *DelayQueue {
q := &DelayQueue{
add: make(chan *task, 10000),
remove: make(chan string, 100),
waitRemoveTaskMapping: make(map[string]struct{}),
}
// 开启协程,监听任务相关信号
go q.start()
return q
}
// 用户删除任务
func (q *DelayQueue) Delete(id string) {
q.remove <- id
}
// 用户推送任务
func (q *DelayQueue) Push(timeInterval time.Duration, f func()) string {
// 生成一个任务id,方便删除使用
id := genTaskId()
t := &task{
id: id,
execTime: time.Now().Add(timeInterval),
f: f,
}
// 将任务推到add管道中
q.add <- t
return id
}
// 监听各种任务相关信号
func (q *DelayQueue) start() {
for {
if len(q.tasks) == 0 {
// 任务列表为空的时候,只需要监听add管道
select {
case t := <-q.add:
//添加任务
q.addTask(t)
}
continue
}
// 任务列表不为空的时候,需要监听所有管道
// 任务的等待时间=任务的执行时间-当前的时间
currentTask := q.tasks[0]
timer := time.NewTimer(currentTask.execTime.Sub(time.Now()))
select {
case now := <-timer.C:
timer.Stop()
if _, isRemove := q.waitRemoveTaskMapping[currentTask.id]; isRemove {
// 之前客户已经发出过该任务的删除信号,因此需要结束任务,刷新任务列表
q.endTask()
delete(q.waitRemoveTaskMapping, currentTask.id)
continue
}
// 开启协程,异步执行任务
go q.execTask(currentTask, now)
// 任务结束,刷新任务列表
q.endTask()
case t := <-q.add:
// 添加任务
timer.Stop()
q.addTask(t)
case id := <-q.remove:
// 删除任务
timer.Stop()
q.deleteTask(id)
}
}
}
// 执行任务
func (q *DelayQueue) execTask(task *task, currentTime time.Time) {
if task.execTime.After(currentTime) {
// 如果当前任务的执行时间落后于当前时间,则不执行
return
}
// 执行任务
task.f()
return
}
// 一个任务去执行了,刷新任务列表
func (q *DelayQueue) endTask() {
if len(q.tasks) == 1 {
q.tasks = []*task{}
return
}
q.tasks = q.tasks[1:]
}
// 将任务添加到任务切片列表中
func (q *DelayQueue) addTask(t *task) {
// 寻找新增任务的插入位置
insertIndex := q.getTaskInsertIndex(t, 0, len(q.tasks)-1)
// 找到了插入位置,更新任务列表
q.tasks = append(q.tasks, &task{})
copy(q.tasks[insertIndex+1:], q.tasks[insertIndex:])
q.tasks[insertIndex] = t
}
// 删除指定任务
func (q *DelayQueue) deleteTask(id string) {
deleteIndex := -1
for index, t := range q.tasks {
if t.id == id {
// 找到了在切片中需要删除的所以呢
deleteIndex = index
break
}
}
if deleteIndex == -1 {
// 如果没有找到删除的任务,说明任务还在add管道中,来不及更新到tasks中,这里我们就将这个删除id临时记录下来
// 注意,这里暂时不考虑,任务id非法的特殊情况
q.waitRemoveTaskMapping[id] = struct{}{}
return
}
if len(q.tasks) == 1 {
// 删除后,任务列表就没有任务了
q.tasks = []*task{}
return
}
if deleteIndex == len(q.tasks)-1 {
// 如果删除的是,任务列表的最后一个元素,则执行下列代码
q.tasks = q.tasks[:len(q.tasks)-1]
return
}
// 如果删除的是,任务列表的其他元素,则需要将deleteIndex之后的元素,全部向前挪动一位
copy(q.tasks[deleteIndex:len(q.tasks)-1], q.tasks[deleteIndex+1:len(q.tasks)-1])
q.tasks = q.tasks[:len(q.tasks)-1]
return
}
// 寻找任务的插入位置
func (q *DelayQueue) getTaskInsertIndex(t *task, leftIndex, rightIndex int) (index int) {
// 使用二分法判断新增任务的插入位置
if len(q.tasks) == 0 {
return
}
length := rightIndex - leftIndex
if q.tasks[leftIndex].execTime.Sub(t.execTime) >= 0 {
// 如果当前切片中最小的元素都超过了插入的优先级,则插入位置应该是最左边
return leftIndex
}
if q.tasks[rightIndex].execTime.Sub(t.execTime) <= 0 {
// 如果当前切片中最大的元素都没超过插入的优先级,则插入位置应该是最右边
return rightIndex + 1
}
if length == 1 && q.tasks[leftIndex].execTime.Before(t.execTime) && q.tasks[rightIndex].execTime.Sub(t.execTime) >= 0 {
// 如果插入的优先级刚好在仅有的两个优先级之间,则中间的位置就是插入位置
return leftIndex + 1
}
middleVal := q.tasks[leftIndex+length/2].execTime
// 这里用二分法递归的方式,一直寻找正确的插入位置
if t.execTime.Sub(middleVal) <= 0 {
return q.getTaskInsertIndex(t, leftIndex, leftIndex+length/2)
} else {
return q.getTaskInsertIndex(t, leftIndex+length/2, rightIndex)
}
}
func genTaskId() string {
return primitive.NewObjectID().Hex()
}
package delay_queue
import (
"fmt"
"testing"
"time"
)
func TestDelayQueue(t *testing.T) {
q := InitDelayQueue()
for i := 0; i < 100; i++ {
go func(i int) {
id := q.Push(time.Duration(i)*time.Second, func() {
fmt.Printf("%d秒后执行...\n", i)
return
})
if i%7 == 0 {
q.Delete(id)
}
}(i)
}
time.Sleep(time.Hour)
}
小结
위 내용은 golang이 지연된 작업을 구현하는 방법에 대한 간략한 분석의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!