Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

关于大量任务 #10

Open
fero2004 opened this issue Sep 18, 2023 · 1 comment
Open

关于大量任务 #10

fero2004 opened this issue Sep 18, 2023 · 1 comment

Comments

@fero2004
Copy link

fero2004 commented Sep 18, 2023

func (q *DelayQueue) SendScheduleMsgs(payloads []string, t []time.Time, opts ...interface{}) error {
	retryCount := q.defaultRetryCount
	for _, opt := range opts {
		switch o := opt.(type) {
		case retryCountOpt:
			retryCount = uint(o)
		case msgTTLOpt:
			q.msgTTL = time.Duration(o)
		}
	}
	pipe := q.redisCli.TxPipeline() // 这里是在warpper里添加的
	now := time.Now()
	ctx := context.Background()
	for i := 0; i < len(t); i++ {
		idStr := uuid.Must(uuid.NewRandom()).String()
		msgTTL := t[i].Sub(now) + q.msgTTL
		pipe.Set(ctx, q.genMsgKey(idStr), payloads[i], msgTTL)
		pipe.HSet(ctx, q.retryCountKey, idStr, strconv.Itoa(int(retryCount)))
		values := map[string]float64{idStr: float64(t[i].Unix())}
		var zs []redis.Z
		for member, score := range values {
			zs = append(zs, redis.Z{
				Score:  score,
				Member: member,
			})
		}
		pipe.ZAdd(ctx, q.pendingKey, zs...)
	}
	_, err := pipe.Exec(ctx)
	if err != nil {
		return fmt.Errorf("push to pending failed: %v", err)
	}
	return nil
}

就不提pr了,作者看下,这样写是否有问题.如果可行的话可以手动修改下

@HDT3213
Copy link
Owner

HDT3213 commented Sep 18, 2023

用普通的 pipeline 就行了, 不需要用事务

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants