Skip to content

fix(txpipeline): keyless commands should take the slot of the keyed #3411

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Jun 24, 2025
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 41 additions & 1 deletion osscluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1519,8 +1519,36 @@ func (c *ClusterClient) processTxPipeline(ctx context.Context, cmds []Cmder) err
return err
}

cmdsMap := c.mapCmdsBySlot(cmds)
cmdsMap := map[int][]Cmder{}
slot := -1
// split keyed and keyless commands
keyedCmds, _ := c.keyedAndKeyessCmds(cmds)
if len(keyedCmds) == 0 {
// no keyed commands try random slot
slot = hashtag.RandomSlot()
} else {
// keyed commands, get slot from them
// if more than one slot, return cross slot error
cmdsBySlot := c.mapCmdsBySlot(keyedCmds)
if len(cmdsBySlot) > 1 {
// cross slot error, we have more than one slot for keyed commands
setCmdsErr(cmds, ErrCrossSlot)
return ErrCrossSlot
}
// get the slot, should be only one
for sl := range cmdsBySlot {
slot = sl
break
}
}
// slot was not determined, try random one
if slot == -1 {
slot = hashtag.RandomSlot()
}
cmdsMap[slot] = cmds

// TxPipeline does not support cross slot transaction.
// double check the commands are in the same slot
if len(cmdsMap) > 1 {
setCmdsErr(cmds, ErrCrossSlot)
return ErrCrossSlot
Expand Down Expand Up @@ -1572,6 +1600,18 @@ func (c *ClusterClient) mapCmdsBySlot(cmds []Cmder) map[int][]Cmder {
}
return cmdsMap
}
func (c *ClusterClient) keyedAndKeyessCmds(cmds []Cmder) ([]Cmder, []Cmder) {
keyedCmds := make([]Cmder, 0, len(cmds))
keylessCmds := make([]Cmder, 0, len(cmds))
for _, cmd := range cmds {
if cmdFirstKeyPos(cmd) == 0 {
keylessCmds = append(keylessCmds, cmd)
} else {
keyedCmds = append(keyedCmds, cmd)
}
}
return keyedCmds, keylessCmds
}

func (c *ClusterClient) processTxPipelineNode(
ctx context.Context, node *clusterNode, cmds []Cmder, failedCmds *cmdsMap,
Expand Down
Loading