Skip to content

fix(txpipeline): keyless commands should take the slot of the keyed #3411

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Jun 24, 2025
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 60 additions & 1 deletion command.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,55 @@ import (
"github.com/redis/go-redis/v9/internal/util"
)

// keylessCommands contains Redis commands that have empty key specifications (9th slot empty)
// Only includes core Redis commands, excludes FT.*, ts.*, timeseries.*, search.* and subcommands
var keylessCommands = map[string]struct{}{
"acl": {},
"asking": {},
"auth": {},
"bgrewriteaof": {},
"bgsave": {},
"client": {},
"cluster": {},
"config": {},
"debug": {},
"discard": {},
"echo": {},
"exec": {},
"failover": {},
"function": {},
"hello": {},
"latency": {},
"lolwut": {},
"module": {},
"monitor": {},
"multi": {},
"pfselftest": {},
"ping": {},
"psubscribe": {},
"psync": {},
"publish": {},
"pubsub": {},
"punsubscribe": {},
"quit": {},
"readonly": {},
"readwrite": {},
"replconf": {},
"replicaof": {},
"role": {},
"save": {},
"script": {},
"select": {},
"shutdown": {},
"slaveof": {},
"slowlog": {},
"subscribe": {},
"swapdb": {},
"sync": {},
"unsubscribe": {},
"unwatch": {},
}

type Cmder interface {
// command name.
// e.g. "set k v ex 10" -> "set", "cluster info" -> "cluster".
Expand Down Expand Up @@ -75,12 +124,22 @@ func writeCmd(wr *proto.Writer, cmd Cmder) error {
return wr.WriteArgs(cmd.Args())
}

// cmdFirstKeyPos returns the position of the first key in the command's arguments.
// If the command does not have a key, it returns 0.
// TODO: Use the data in CommandInfo to determine the first key position.
func cmdFirstKeyPos(cmd Cmder) int {
if pos := cmd.firstKeyPos(); pos != 0 {
return int(pos)
}

switch cmd.Name() {
name := cmd.Name()

// first check if the command is keyless
if _, ok := keylessCommands[name]; ok {
return 0
}

switch name {
case "eval", "evalsha", "eval_ro", "evalsha_ro":
if cmd.stringArg(2) != "0" {
return 3
Expand Down
11 changes: 9 additions & 2 deletions internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,15 +364,22 @@ var _ = Describe("ClusterClient", func() {
It("select slot from args for GETKEYSINSLOT command", func() {
cmd := NewStringSliceCmd(ctx, "cluster", "getkeysinslot", 100, 200)

slot := client.cmdSlot(cmd)
slot := client.cmdSlot(cmd, -1)
Expect(slot).To(Equal(100))
})

It("select slot from args for COUNTKEYSINSLOT command", func() {
cmd := NewStringSliceCmd(ctx, "cluster", "countkeysinslot", 100)

slot := client.cmdSlot(cmd)
slot := client.cmdSlot(cmd, -1)
Expect(slot).To(Equal(100))
})

It("follows preferred random slot", func() {
cmd := NewStatusCmd(ctx, "ping")

slot := client.cmdSlot(cmd, 101)
Expect(slot).To(Equal(101))
})
})
})
70 changes: 62 additions & 8 deletions osscluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -998,7 +998,7 @@ func (c *ClusterClient) Process(ctx context.Context, cmd Cmder) error {
}

func (c *ClusterClient) process(ctx context.Context, cmd Cmder) error {
slot := c.cmdSlot(cmd)
slot := c.cmdSlot(cmd, -1)
var node *clusterNode
var moved bool
var ask bool
Expand Down Expand Up @@ -1344,9 +1344,13 @@ func (c *ClusterClient) mapCmdsByNode(ctx context.Context, cmdsMap *cmdsMap, cmd
return err
}

preferredRandomSlot := -1
if c.opt.ReadOnly && c.cmdsAreReadOnly(ctx, cmds) {
for _, cmd := range cmds {
slot := c.cmdSlot(cmd)
slot := c.cmdSlot(cmd, preferredRandomSlot)
if preferredRandomSlot == -1 {
preferredRandomSlot = slot
}
node, err := c.slotReadOnlyNode(state, slot)
if err != nil {
return err
Expand All @@ -1357,7 +1361,10 @@ func (c *ClusterClient) mapCmdsByNode(ctx context.Context, cmdsMap *cmdsMap, cmd
}

for _, cmd := range cmds {
slot := c.cmdSlot(cmd)
slot := c.cmdSlot(cmd, preferredRandomSlot)
if preferredRandomSlot == -1 {
preferredRandomSlot = slot
}
node, err := state.slotMasterNode(slot)
if err != nil {
return err
Expand Down Expand Up @@ -1519,8 +1526,36 @@ func (c *ClusterClient) processTxPipeline(ctx context.Context, cmds []Cmder) err
return err
}

cmdsMap := c.mapCmdsBySlot(cmds)
cmdsMap := map[int][]Cmder{}
slot := -1
// get only the keyed commands
keyedCmds := c.keyedCmds(cmds)
if len(keyedCmds) == 0 {
// no keyed commands try random slot
slot = hashtag.RandomSlot()
} else {
// keyed commands, get slot from them
// if more than one slot, return cross slot error
cmdsBySlot := c.mapCmdsBySlot(keyedCmds)
if len(cmdsBySlot) > 1 {
// cross slot error, we have more than one slot for keyed commands
setCmdsErr(cmds, ErrCrossSlot)
return ErrCrossSlot
}
// get the slot, should be only one
for sl := range cmdsBySlot {
slot = sl
break
}
}
// slot was not determined, try random one
if slot == -1 {
slot = hashtag.RandomSlot()
}
cmdsMap[slot] = cmds

// TxPipeline does not support cross slot transaction.
// double check the commands are in the same slot
if len(cmdsMap) > 1 {
setCmdsErr(cmds, ErrCrossSlot)
return ErrCrossSlot
Expand Down Expand Up @@ -1566,13 +1601,29 @@ func (c *ClusterClient) processTxPipeline(ctx context.Context, cmds []Cmder) err

func (c *ClusterClient) mapCmdsBySlot(cmds []Cmder) map[int][]Cmder {
cmdsMap := make(map[int][]Cmder)
preferredRandomSlot := -1
for _, cmd := range cmds {
slot := c.cmdSlot(cmd)
slot := c.cmdSlot(cmd, preferredRandomSlot)
if preferredRandomSlot == -1 {
preferredRandomSlot = slot
}
cmdsMap[slot] = append(cmdsMap[slot], cmd)
}
return cmdsMap
}

// keyedCmds returns all the keyed commands from the cmds slice
// it determines keyed commands by checking if the command has a first key position
func (c *ClusterClient) keyedCmds(cmds []Cmder) []Cmder {
keyedCmds := make([]Cmder, 0, len(cmds))
for _, cmd := range cmds {
if cmdFirstKeyPos(cmd) != 0 {
keyedCmds = append(keyedCmds, cmd)
}
}
return keyedCmds
}

func (c *ClusterClient) processTxPipelineNode(
ctx context.Context, node *clusterNode, cmds []Cmder, failedCmds *cmdsMap,
) {
Expand Down Expand Up @@ -1885,17 +1936,20 @@ func (c *ClusterClient) cmdInfo(ctx context.Context, name string) *CommandInfo {
return info
}

func (c *ClusterClient) cmdSlot(cmd Cmder) int {
func (c *ClusterClient) cmdSlot(cmd Cmder, preferredRandomSlot int) int {
args := cmd.Args()
if args[0] == "cluster" && (args[1] == "getkeysinslot" || args[1] == "countkeysinslot") {
return args[2].(int)
}

return cmdSlot(cmd, cmdFirstKeyPos(cmd))
return cmdSlot(cmd, cmdFirstKeyPos(cmd), preferredRandomSlot)
}

func cmdSlot(cmd Cmder, pos int) int {
func cmdSlot(cmd Cmder, pos int, preferredRandomSlot int) int {
if pos == 0 {
if preferredRandomSlot != -1 {
return preferredRandomSlot
}
return hashtag.RandomSlot()
}
firstKey := cmd.stringArg(pos)
Expand Down
9 changes: 9 additions & 0 deletions osscluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,15 @@ var _ = Describe("ClusterClient", func() {
Expect(err).To(MatchError(redis.ErrCrossSlot))
})

It("works normally with keyless commands and no CrossSlot error", func() {
pipe.Set(ctx, "A{s}", "A_value", 0)
pipe.Ping(ctx)
pipe.Set(ctx, "B{s}", "B_value", 0)
pipe.Ping(ctx)
_, err := pipe.Exec(ctx)
Expect(err).To(Not(HaveOccurred()))
})

// doesn't fail when no commands are queued
It("returns no error when there are no commands", func() {
_, err := pipe.Exec(ctx)
Expand Down
12 changes: 6 additions & 6 deletions ring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,20 +304,20 @@ var _ = Describe("Redis Ring", func() {
ring = redis.NewRing(opt)
})
It("supports Process hook", func() {
err := ring.Ping(ctx).Err()
err := ring.Set(ctx, "key", "test", 0).Err()
Expect(err).NotTo(HaveOccurred())

var stack []string

ring.AddHook(&hook{
processHook: func(hook redis.ProcessHook) redis.ProcessHook {
return func(ctx context.Context, cmd redis.Cmder) error {
Expect(cmd.String()).To(Equal("ping: "))
Expect(cmd.String()).To(Equal("get key: "))
stack = append(stack, "ring.BeforeProcess")

err := hook(ctx, cmd)

Expect(cmd.String()).To(Equal("ping: PONG"))
Expect(cmd.String()).To(Equal("get key: test"))
stack = append(stack, "ring.AfterProcess")

return err
Expand All @@ -329,12 +329,12 @@ var _ = Describe("Redis Ring", func() {
shard.AddHook(&hook{
processHook: func(hook redis.ProcessHook) redis.ProcessHook {
return func(ctx context.Context, cmd redis.Cmder) error {
Expect(cmd.String()).To(Equal("ping: "))
Expect(cmd.String()).To(Equal("get key: "))
stack = append(stack, "shard.BeforeProcess")

err := hook(ctx, cmd)

Expect(cmd.String()).To(Equal("ping: PONG"))
Expect(cmd.String()).To(Equal("get key: test"))
stack = append(stack, "shard.AfterProcess")

return err
Expand All @@ -344,7 +344,7 @@ var _ = Describe("Redis Ring", func() {
return nil
})

err = ring.Ping(ctx).Err()
err = ring.Get(ctx, "key").Err()
Expect(err).NotTo(HaveOccurred())
Expect(stack).To(Equal([]string{
"ring.BeforeProcess",
Expand Down
Loading