package valkey import ( "context" "errors" "io" "net" "strconv" "sync" "sync/atomic" "time" "github.com/valkey-io/valkey-go/internal/cmds" "github.com/valkey-io/valkey-go/internal/util" ) // ErrNoSlot indicates that there is no valkey node owns the key slot. var ErrNoSlot = errors.New("the slot has no valkey node") var ErrReplicaOnlyConflict = errors.New("ReplicaOnly conflicts with SendToReplicas option") var ErrInvalidShardsRefreshInterval = errors.New("ShardsRefreshInterval must be greater than or equal to 0") var ErrReplicaOnlyConflictWithReplicaSelector = errors.New("ReplicaOnly conflicts with ReplicaSelector option") var ErrSendToReplicasNotSet = errors.New("SendToReplicas must be set when ReplicaSelector is set") type clusterClient struct { pslots [16384]conn retryHandler retryHandler opt *ClientOption rOpt *ClientOption conns map[string]connrole connFn connFn stopCh chan struct{} sc call rslots []conn mu sync.RWMutex stop uint32 cmd Builder retry bool } // NOTE: connrole and conn must be initialized at the same time type connrole struct { conn conn hidden bool //replica bool <- this field is removed because a server may have mixed roles at the same time in the future. https://github.com/valkey-io/valkey/issues/1372 } var replicaOnlySelector = func(_ uint16, replicas []ReplicaInfo) int { return util.FastRand(len(replicas)) } func newClusterClient(opt *ClientOption, connFn connFn, retryer retryHandler) (*clusterClient, error) { client := &clusterClient{ cmd: cmds.NewBuilder(cmds.InitSlot), connFn: connFn, opt: opt, conns: make(map[string]connrole), retry: !opt.DisableRetry, retryHandler: retryer, stopCh: make(chan struct{}), } if opt.ReplicaOnly && opt.SendToReplicas != nil { return nil, ErrReplicaOnlyConflict } if opt.ReplicaOnly && opt.ReplicaSelector != nil { return nil, ErrReplicaOnlyConflictWithReplicaSelector } if opt.ReplicaSelector != nil && opt.SendToReplicas == nil { return nil, ErrSendToReplicasNotSet } if opt.SendToReplicas != nil && opt.ReplicaSelector == nil { opt.ReplicaSelector = replicaOnlySelector } if opt.SendToReplicas != nil { rOpt := *opt rOpt.ReplicaOnly = true client.rOpt = &rOpt } client.connFn = func(dst string, opt *ClientOption) conn { cc := connFn(dst, opt) cc.SetOnCloseHook(func(err error) { client.lazyRefresh() }) return cc } if err := client.init(); err != nil { return nil, err } if err := client.refresh(context.Background()); err != nil { return client, err } if opt.ClusterOption.ShardsRefreshInterval > 0 { go client.runClusterTopologyRefreshment() } else if opt.ClusterOption.ShardsRefreshInterval < 0 { return nil, ErrInvalidShardsRefreshInterval } return client, nil } func (c *clusterClient) init() error { if len(c.opt.InitAddress) == 0 { return ErrNoAddr } results := make(chan error, len(c.opt.InitAddress)) for _, addr := range c.opt.InitAddress { cc := c.connFn(addr, c.opt) go func(addr string, cc conn) { if err := cc.Dial(); err == nil { c.mu.Lock() if _, ok := c.conns[addr]; ok { go cc.Close() // abort the new connection instead of closing the old one which may already been used } else { c.conns[addr] = connrole{ conn: cc, } } c.mu.Unlock() results <- nil } else { results <- err } }(addr, cc) } es := make([]error, cap(results)) for i := 0; i < cap(results); i++ { if err := <-results; err == nil { return nil } else { es[i] = err } } return es[0] } func (c *clusterClient) refresh(ctx context.Context) (err error) { return c.sc.Do(ctx, c._refresh) } func (c *clusterClient) lazyRefresh() { c.sc.LazyDo(time.Second, c._refresh) } type clusterslots struct { addr string reply ValkeyResult ver int } func (s clusterslots) parse(tls bool) map[string]group { if s.ver < 8 { return parseSlots(s.reply.val, s.addr) } return parseShards(s.reply.val, s.addr, tls) } func getClusterSlots(c conn, timeout time.Duration) clusterslots { var ctx context.Context var cancel context.CancelFunc if timeout > 0 { ctx, cancel = context.WithTimeout(context.Background(), timeout) defer cancel() } else { ctx = context.Background() } v := c.Version() if v < 8 { return clusterslots{reply: c.Do(ctx, cmds.SlotCmd), addr: c.Addr(), ver: v} } return clusterslots{reply: c.Do(ctx, cmds.ShardsCmd), addr: c.Addr(), ver: v} } func (c *clusterClient) _refresh() (err error) { c.mu.RLock() results := make(chan clusterslots, len(c.conns)) pending := make([]conn, 0, len(c.conns)) for _, cc := range c.conns { pending = append(pending, cc.conn) } c.mu.RUnlock() var result clusterslots for i := 0; i < cap(results); i++ { if i&3 == 0 { // batch CLUSTER SLOTS/CLUSTER SHARDS for every 4 connections for j := i; j < i+4 && j < len(pending); j++ { go func(c conn, timeout time.Duration) { results <- getClusterSlots(c, timeout) }(pending[j], c.opt.ConnWriteTimeout) } } result = <-results err = result.reply.Error() if len(result.reply.val.values) != 0 { break } } if err != nil { return err } pending = nil groups := result.parse(c.opt.TLSConfig != nil) conns := make(map[string]connrole, len(groups)) for master, g := range groups { conns[master] = connrole{conn: c.connFn(master, c.opt)} if c.rOpt != nil { for _, nodeInfo := range g.nodes[1:] { conns[nodeInfo.Addr] = connrole{conn: c.connFn(nodeInfo.Addr, c.rOpt)} } } else { for _, nodeInfo := range g.nodes[1:] { conns[nodeInfo.Addr] = connrole{conn: c.connFn(nodeInfo.Addr, c.opt)} } } } // make sure InitAddress always be present for _, addr := range c.opt.InitAddress { if _, ok := conns[addr]; !ok { conns[addr] = connrole{ conn: c.connFn(addr, c.opt), hidden: true, } } } var removes []conn c.mu.RLock() for addr, cc := range c.conns { if fresh, ok := conns[addr]; ok { fresh.conn = cc.conn conns[addr] = fresh } else { removes = append(removes, cc.conn) } } c.mu.RUnlock() pslots := [16384]conn{} var rslots []conn for master, g := range groups { switch { case c.opt.ReplicaOnly && len(g.nodes) > 1: nodesCount := len(g.nodes) for _, slot := range g.slots { for i := slot[0]; i <= slot[1] && i >= 0 && i < 16384; i++ { pslots[i] = conns[g.nodes[1+util.FastRand(nodesCount-1)].Addr].conn } } case c.rOpt != nil: if len(rslots) == 0 { // lazy init rslots = make([]conn, 16384) } if len(g.nodes) > 1 { n := len(g.nodes) - 1 if c.opt.EnableReplicaAZInfo { var wg sync.WaitGroup for i := 1; i <= n; i += 4 { // batch AZ() for every 4 connections for j := i; j <= i+4 && j <= n; j++ { wg.Add(1) go func(wg *sync.WaitGroup, conn conn, info *ReplicaInfo) { info.AZ = conn.AZ() wg.Done() }(&wg, conns[g.nodes[j].Addr].conn, &g.nodes[j]) } wg.Wait() } } for _, slot := range g.slots { for i := slot[0]; i <= slot[1] && i >= 0 && i < 16384; i++ { pslots[i] = conns[master].conn rIndex := c.opt.ReplicaSelector(uint16(i), g.nodes[1:]) if rIndex >= 0 && rIndex < n { rslots[i] = conns[g.nodes[1+rIndex].Addr].conn } else { rslots[i] = conns[master].conn } } } } else { for _, slot := range g.slots { for i := slot[0]; i <= slot[1] && i >= 0 && i < 16384; i++ { pslots[i] = conns[master].conn rslots[i] = conns[master].conn } } } default: for _, slot := range g.slots { for i := slot[0]; i <= slot[1] && i >= 0 && i < 16384; i++ { pslots[i] = conns[master].conn } } } } c.mu.Lock() c.pslots = pslots c.rslots = rslots c.conns = conns c.mu.Unlock() if len(removes) > 0 { go func(removes []conn) { time.Sleep(time.Second * 5) for _, cc := range removes { cc.Close() } }(removes) } return nil } func (c *clusterClient) single() (conn conn) { return c._pick(cmds.InitSlot, false) } func (c *clusterClient) nodes() []string { c.mu.RLock() nodes := make([]string, 0, len(c.conns)) for addr := range c.conns { nodes = append(nodes, addr) } c.mu.RUnlock() return nodes } type nodes []ReplicaInfo type group struct { nodes nodes slots [][2]int64 } func parseEndpoint(fallback, endpoint string, port int64) string { switch endpoint { case "": endpoint, _, _ = net.SplitHostPort(fallback) case "?": return "" } return net.JoinHostPort(endpoint, strconv.FormatInt(port, 10)) } // parseSlots - map valkey slots for each valkey nodes/addresses // defaultAddr is needed in case the node does not know its own IP func parseSlots(slots ValkeyMessage, defaultAddr string) map[string]group { groups := make(map[string]group, len(slots.values)) for _, v := range slots.values { master := parseEndpoint(defaultAddr, v.values[2].values[0].string, v.values[2].values[1].integer) if master == "" { continue } g, ok := groups[master] if !ok { g.slots = make([][2]int64, 0) g.nodes = make(nodes, 0, len(v.values)-2) for i := 2; i < len(v.values); i++ { if dst := parseEndpoint(defaultAddr, v.values[i].values[0].string, v.values[i].values[1].integer); dst != "" { g.nodes = append(g.nodes, ReplicaInfo{Addr: dst}) } } } g.slots = append(g.slots, [2]int64{v.values[0].integer, v.values[1].integer}) groups[master] = g } return groups } // parseShards - map valkey shards for each valkey nodes/addresses // defaultAddr is needed in case the node does not know its own IP func parseShards(shards ValkeyMessage, defaultAddr string, tls bool) map[string]group { groups := make(map[string]group, len(shards.values)) for _, v := range shards.values { m := -1 shard, _ := v.AsMap() slots := shard["slots"].values _nodes := shard["nodes"].values g := group{ nodes: make(nodes, 0, len(_nodes)), slots: make([][2]int64, len(slots)/2), } for i := range g.slots { g.slots[i][0], _ = slots[i*2].AsInt64() g.slots[i][1], _ = slots[i*2+1].AsInt64() } for _, n := range _nodes { dict, _ := n.AsMap() if dict["health"].string != "online" { continue } port := dict["port"].integer if tls && dict["tls-port"].integer > 0 { port = dict["tls-port"].integer } if dst := parseEndpoint(defaultAddr, dict["endpoint"].string, port); dst != "" { if dict["role"].string == "master" { m = len(g.nodes) } g.nodes = append(g.nodes, ReplicaInfo{Addr: dst}) } } if m >= 0 { g.nodes[0], g.nodes[m] = g.nodes[m], g.nodes[0] groups[g.nodes[0].Addr] = g } } return groups } func (c *clusterClient) runClusterTopologyRefreshment() { ticker := time.NewTicker(c.opt.ClusterOption.ShardsRefreshInterval) defer ticker.Stop() for { select { case <-c.stopCh: return case <-ticker.C: c.lazyRefresh() } } } func (c *clusterClient) _pick(slot uint16, toReplica bool) (p conn) { c.mu.RLock() if slot == cmds.InitSlot { for _, cc := range c.conns { p = cc.conn break } } else if toReplica && c.rslots != nil { p = c.rslots[slot] } else { p = c.pslots[slot] } c.mu.RUnlock() return p } func (c *clusterClient) pick(ctx context.Context, slot uint16, toReplica bool) (p conn, err error) { if p = c._pick(slot, toReplica); p == nil { if err := c.refresh(ctx); err != nil { return nil, err } if p = c._pick(slot, toReplica); p == nil { return nil, ErrNoSlot } } return p, nil } func (c *clusterClient) redirectOrNew(addr string, prev conn, slot uint16, mode RedirectMode) conn { c.mu.RLock() cc := c.conns[addr] c.mu.RUnlock() if cc.conn != nil && prev != cc.conn { return cc.conn } c.mu.Lock() if cc = c.conns[addr]; cc.conn == nil { p := c.connFn(addr, c.opt) cc = connrole{conn: p} c.conns[addr] = cc if mode == RedirectMove { c.pslots[slot] = p } } else if prev == cc.conn { // try reconnection if the MOVED redirects to the same host, // because the same hostname may actually be resolved into another destination // depending on the fail-over implementation. ex: AWS MemoryDB's resize process. go func(prev conn) { time.Sleep(time.Second * 5) prev.Close() }(prev) p := c.connFn(addr, c.opt) cc = connrole{conn: p} c.conns[addr] = cc if mode == RedirectMove { // MOVED should always point to the primary. c.pslots[slot] = p } } c.mu.Unlock() return cc.conn } func (c *clusterClient) B() Builder { return c.cmd } func (c *clusterClient) Do(ctx context.Context, cmd Completed) (resp ValkeyResult) { if resp = c.do(ctx, cmd); resp.NonValkeyError() == nil { // not recycle cmds if error, since cmds may be used later in pipe. consider recycle them by pipe cmds.PutCompleted(cmd) } return resp } func (c *clusterClient) do(ctx context.Context, cmd Completed) (resp ValkeyResult) { attempts := 1 retry: cc, err := c.pick(ctx, cmd.Slot(), c.toReplica(cmd)) if err != nil { return newErrResult(err) } resp = cc.Do(ctx, cmd) process: switch addr, mode := c.shouldRefreshRetry(resp.Error(), ctx); mode { case RedirectMove: resp = c.redirectOrNew(addr, cc, cmd.Slot(), mode).Do(ctx, cmd) goto process case RedirectAsk: results := c.redirectOrNew(addr, cc, cmd.Slot(), mode).DoMulti(ctx, cmds.AskingCmd, cmd) resp = results.s[1] resultsp.Put(results) goto process case RedirectRetry: if c.retry && cmd.IsReadOnly() { shouldRetry := c.retryHandler.WaitOrSkipRetry(ctx, attempts, cmd, resp.Error()) if shouldRetry { attempts++ goto retry } } } return resp } func (c *clusterClient) toReplica(cmd Completed) bool { if c.opt.SendToReplicas != nil { return c.opt.SendToReplicas(cmd) } return false } func (c *clusterClient) _pickMulti(multi []Completed) (retries *connretry, init bool) { last := cmds.InitSlot for _, cmd := range multi { if cmd.Slot() == cmds.InitSlot { init = true break } } c.mu.RLock() defer c.mu.RUnlock() count := conncountp.Get(len(c.conns), len(c.conns)) if !init && c.rslots != nil && c.opt.SendToReplicas != nil { for _, cmd := range multi { var cc conn if c.opt.SendToReplicas(cmd) { cc = c.rslots[cmd.Slot()] } else { cc = c.pslots[cmd.Slot()] } if cc == nil { return nil, false } count.m[cc]++ } retries = connretryp.Get(len(count.m), len(count.m)) for cc, n := range count.m { retries.m[cc] = retryp.Get(0, n) } conncountp.Put(count) for i, cmd := range multi { var cc conn if c.opt.SendToReplicas(cmd) { cc = c.rslots[cmd.Slot()] } else { cc = c.pslots[cmd.Slot()] } if cc == nil { // check cc == nil again in case of non-deterministic SendToReplicas. return nil, false } re := retries.m[cc] re.commands = append(re.commands, cmd) re.cIndexes = append(re.cIndexes, i) } return retries, init } inits := 0 for _, cmd := range multi { if cmd.Slot() == cmds.InitSlot { inits++ continue } if last == cmds.InitSlot { last = cmd.Slot() } else if init && last != cmd.Slot() { panic(panicMixCxSlot) } cc := c.pslots[cmd.Slot()] if cc == nil { return nil, false } count.m[cc]++ } if last == cmds.InitSlot { // if all commands have no slots, such as INFO, we pick a non-nil slot. for i, cc := range c.pslots { if cc != nil { last = uint16(i) count.m[cc] = inits break } } if last == cmds.InitSlot { return nil, false } } else if init { cc := c.pslots[last] count.m[cc] += inits } retries = connretryp.Get(len(count.m), len(count.m)) for cc, n := range count.m { retries.m[cc] = retryp.Get(0, n) } conncountp.Put(count) for i, cmd := range multi { var cc conn if cmd.Slot() != cmds.InitSlot { cc = c.pslots[cmd.Slot()] } else { cc = c.pslots[last] } re := retries.m[cc] re.commands = append(re.commands, cmd) re.cIndexes = append(re.cIndexes, i) } return retries, init } func (c *clusterClient) pickMulti(ctx context.Context, multi []Completed) (*connretry, bool, error) { conns, hasInit := c._pickMulti(multi) if conns == nil { if err := c.refresh(ctx); err != nil { return nil, false, err } if conns, hasInit = c._pickMulti(multi); conns == nil { return nil, false, ErrNoSlot } } return conns, hasInit, nil } func isMulti(cmd Completed) bool { return len(cmd.Commands()) == 1 && cmd.Commands()[0] == "MULTI" } func isExec(cmd Completed) bool { return len(cmd.Commands()) == 1 && cmd.Commands()[0] == "EXEC" } func (c *clusterClient) doresultfn( ctx context.Context, results *valkeyresults, retries *connretry, mu *sync.Mutex, cc conn, cIndexes []int, commands []Completed, resps []ValkeyResult, attempts int, hasInit bool, ) (clean bool) { mi := -1 ei := -1 clean = true for i, resp := range resps { clean = clean && resp.NonValkeyError() == nil ii := cIndexes[i] cm := commands[i] results.s[ii] = resp addr, mode := c.shouldRefreshRetry(resp.Error(), ctx) if mode != RedirectNone { nc := cc retryDelay := time.Duration(-1) if mode == RedirectRetry { if !c.retry || !cm.IsReadOnly() { continue } retryDelay = c.retryHandler.RetryDelay(attempts, cm, resp.Error()) } else { nc = c.redirectOrNew(addr, cc, cm.Slot(), mode) } if hasInit && ei < i { // find out if there is a transaction block or not. for mi = i; mi >= 0 && !isMulti(commands[mi]) && !isExec(commands[mi]); mi-- { } for ei = i; ei < len(commands) && !isMulti(commands[ei]) && !isExec(commands[ei]); ei++ { } if mi >= 0 && ei < len(commands) && isMulti(commands[mi]) && isExec(commands[ei]) && resps[mi].val.string == ok { // a transaction is found. mu.Lock() retries.Redirects++ nr := retries.m[nc] if nr == nil { nr = retryp.Get(0, len(commands)) retries.m[nc] = nr } for i := mi; i <= ei; i++ { ii := cIndexes[i] cm := commands[i] if mode == RedirectAsk { nr.aIndexes = append(nr.aIndexes, ii) nr.cAskings = append(nr.cAskings, cm) } else { nr.cIndexes = append(nr.cIndexes, ii) nr.commands = append(nr.commands, cm) } } mu.Unlock() continue // the transaction has been added to the retries, go to the next cmd. } } if hasInit && mi < i && i < ei && mi >= 0 && isMulti(commands[mi]) { continue // the current cmd is in the processed transaction and has been added to the retries. } mu.Lock() if mode != RedirectRetry { retries.Redirects++ } if mode == RedirectRetry && retryDelay >= 0 { retries.RetryDelay = max(retries.RetryDelay, retryDelay) } nr := retries.m[nc] if nr == nil { nr = retryp.Get(0, len(commands)) retries.m[nc] = nr } if mode == RedirectAsk { nr.aIndexes = append(nr.aIndexes, ii) nr.cAskings = append(nr.cAskings, cm) } else { nr.cIndexes = append(nr.cIndexes, ii) nr.commands = append(nr.commands, cm) } mu.Unlock() } } return clean } func (c *clusterClient) doretry( ctx context.Context, cc conn, results *valkeyresults, retries *connretry, re *retry, mu *sync.Mutex, wg *sync.WaitGroup, attempts int, hasInit bool, ) { clean := true if len(re.commands) != 0 { resps := cc.DoMulti(ctx, re.commands...) clean = c.doresultfn(ctx, results, retries, mu, cc, re.cIndexes, re.commands, resps.s, attempts, hasInit) resultsp.Put(resps) } if len(re.cAskings) != 0 { resps := askingMulti(cc, ctx, re.cAskings) clean = c.doresultfn(ctx, results, retries, mu, cc, re.aIndexes, re.cAskings, resps.s, attempts, hasInit) && clean resultsp.Put(resps) } if clean { retryp.Put(re) } wg.Done() } func (c *clusterClient) DoMulti(ctx context.Context, multi ...Completed) []ValkeyResult { if len(multi) == 0 { return nil } retries, hasInit, err := c.pickMulti(ctx, multi) if err != nil { return fillErrs(len(multi), err) } defer connretryp.Put(retries) var wg sync.WaitGroup var mu sync.Mutex results := resultsp.Get(len(multi), len(multi)) attempts := 1 retry: retries.RetryDelay = -1 // Assume no retry. Because client retry flag can be set to false. var cc1 conn var re1 *retry wg.Add(len(retries.m)) mu.Lock() for cc, re := range retries.m { delete(retries.m, cc) cc1 = cc re1 = re break } for cc, re := range retries.m { delete(retries.m, cc) go c.doretry(ctx, cc, results, retries, re, &mu, &wg, attempts, hasInit) } mu.Unlock() c.doretry(ctx, cc1, results, retries, re1, &mu, &wg, attempts, hasInit) wg.Wait() if len(retries.m) != 0 { if retries.Redirects > 0 { retries.Redirects = 0 goto retry } if retries.RetryDelay >= 0 { c.retryHandler.WaitForRetry(ctx, retries.RetryDelay) attempts++ goto retry } } for i, cmd := range multi { if results.s[i].NonValkeyError() == nil { cmds.PutCompleted(cmd) } } return results.s } func fillErrs(n int, err error) (results []ValkeyResult) { results = resultsp.Get(n, n).s for i := range results { results[i] = newErrResult(err) } return results } func (c *clusterClient) doCache(ctx context.Context, cmd Cacheable, ttl time.Duration) (resp ValkeyResult) { attempts := 1 retry: cc, err := c.pick(ctx, cmd.Slot(), c.toReplica(Completed(cmd))) if err != nil { return newErrResult(err) } resp = cc.DoCache(ctx, cmd, ttl) process: switch addr, mode := c.shouldRefreshRetry(resp.Error(), ctx); mode { case RedirectMove: resp = c.redirectOrNew(addr, cc, cmd.Slot(), mode).DoCache(ctx, cmd, ttl) goto process case RedirectAsk: results := askingMultiCache(c.redirectOrNew(addr, cc, cmd.Slot(), mode), ctx, []CacheableTTL{CT(cmd, ttl)}) resp = results.s[0] resultsp.Put(results) goto process case RedirectRetry: if c.retry { shouldRetry := c.retryHandler.WaitOrSkipRetry(ctx, attempts, Completed(cmd), resp.Error()) if shouldRetry { attempts++ goto retry } } } return resp } func (c *clusterClient) DoCache(ctx context.Context, cmd Cacheable, ttl time.Duration) (resp ValkeyResult) { resp = c.doCache(ctx, cmd, ttl) if err := resp.NonValkeyError(); err == nil || err == ErrDoCacheAborted { cmds.PutCacheable(cmd) } return resp } func askingMulti(cc conn, ctx context.Context, multi []Completed) *valkeyresults { var inTx bool commands := make([]Completed, 0, len(multi)*2) for _, cmd := range multi { if inTx { commands = append(commands, cmd) inTx = !isExec(cmd) } else { commands = append(commands, cmds.AskingCmd, cmd) inTx = isMulti(cmd) } } results := resultsp.Get(0, len(multi)) resps := cc.DoMulti(ctx, commands...) for i, resp := range resps.s { if commands[i] != cmds.AskingCmd { results.s = append(results.s, resp) } } resultsp.Put(resps) return results } func askingMultiCache(cc conn, ctx context.Context, multi []CacheableTTL) *valkeyresults { commands := make([]Completed, 0, len(multi)*6) for _, cmd := range multi { ck, _ := cmds.CacheKey(cmd.Cmd) commands = append(commands, cmds.OptInCmd, cmds.AskingCmd, cmds.MultiCmd, cmds.NewCompleted([]string{"PTTL", ck}), Completed(cmd.Cmd), cmds.ExecCmd) } results := resultsp.Get(0, len(multi)) resps := cc.DoMulti(ctx, commands...) for i := 5; i < len(resps.s); i += 6 { if arr, err := resps.s[i].ToArray(); err != nil { if preErr := resps.s[i-1].Error(); preErr != nil { // if {Cmd} get a ValkeyError err = preErr } results.s = append(results.s, newErrResult(err)) } else { results.s = append(results.s, newResult(arr[len(arr)-1], nil)) } } resultsp.Put(resps) return results } func (c *clusterClient) _pickMultiCache(multi []CacheableTTL) *connretrycache { c.mu.RLock() defer c.mu.RUnlock() count := conncountp.Get(len(c.conns), len(c.conns)) if c.opt.SendToReplicas == nil || c.rslots == nil { for _, cmd := range multi { p := c.pslots[cmd.Cmd.Slot()] if p == nil { return nil } count.m[p]++ } retries := connretrycachep.Get(len(count.m), len(count.m)) for cc, n := range count.m { retries.m[cc] = retrycachep.Get(0, n) } conncountp.Put(count) for i, cmd := range multi { cc := c.pslots[cmd.Cmd.Slot()] re := retries.m[cc] re.commands = append(re.commands, cmd) re.cIndexes = append(re.cIndexes, i) } return retries } else { for _, cmd := range multi { var p conn if c.opt.SendToReplicas(Completed(cmd.Cmd)) { p = c.rslots[cmd.Cmd.Slot()] } else { p = c.pslots[cmd.Cmd.Slot()] } if p == nil { return nil } count.m[p]++ } retries := connretrycachep.Get(len(count.m), len(count.m)) for cc, n := range count.m { retries.m[cc] = retrycachep.Get(0, n) } conncountp.Put(count) for i, cmd := range multi { var cc conn if c.opt.SendToReplicas(Completed(cmd.Cmd)) { cc = c.rslots[cmd.Cmd.Slot()] } else { cc = c.pslots[cmd.Cmd.Slot()] } re := retries.m[cc] re.commands = append(re.commands, cmd) re.cIndexes = append(re.cIndexes, i) } return retries } } func (c *clusterClient) pickMultiCache(ctx context.Context, multi []CacheableTTL) (*connretrycache, error) { conns := c._pickMultiCache(multi) if conns == nil { if err := c.refresh(ctx); err != nil { return nil, err } if conns = c._pickMultiCache(multi); conns == nil { return nil, ErrNoSlot } } return conns, nil } func (c *clusterClient) resultcachefn( ctx context.Context, results *valkeyresults, retries *connretrycache, mu *sync.Mutex, cc conn, cIndexes []int, commands []CacheableTTL, resps []ValkeyResult, attempts int, ) (clean bool) { clean = true for i, resp := range resps { clean = clean && resp.NonValkeyError() == nil ii := cIndexes[i] cm := commands[i] results.s[ii] = resp addr, mode := c.shouldRefreshRetry(resp.Error(), ctx) if mode != RedirectNone { nc := cc retryDelay := time.Duration(-1) if mode == RedirectRetry { if !c.retry { continue } retryDelay = c.retryHandler.RetryDelay(attempts, Completed(cm.Cmd), resp.Error()) } else { nc = c.redirectOrNew(addr, cc, cm.Cmd.Slot(), mode) } mu.Lock() if mode != RedirectRetry { retries.Redirects++ } if mode == RedirectRetry && retryDelay >= 0 { retries.RetryDelay = max(retries.RetryDelay, retryDelay) } nr := retries.m[nc] if nr == nil { nr = retrycachep.Get(0, len(commands)) retries.m[nc] = nr } if mode == RedirectAsk { nr.aIndexes = append(nr.aIndexes, ii) nr.cAskings = append(nr.cAskings, cm) } else { nr.cIndexes = append(nr.cIndexes, ii) nr.commands = append(nr.commands, cm) } mu.Unlock() } } return clean } func (c *clusterClient) doretrycache( ctx context.Context, cc conn, results *valkeyresults, retries *connretrycache, re *retrycache, mu *sync.Mutex, wg *sync.WaitGroup, attempts int, ) { clean := true if len(re.commands) != 0 { resps := cc.DoMultiCache(ctx, re.commands...) clean = c.resultcachefn(ctx, results, retries, mu, cc, re.cIndexes, re.commands, resps.s, attempts) resultsp.Put(resps) } if len(re.cAskings) != 0 { resps := askingMultiCache(cc, ctx, re.cAskings) clean = c.resultcachefn(ctx, results, retries, mu, cc, re.aIndexes, re.cAskings, resps.s, attempts) && clean resultsp.Put(resps) } if clean { retrycachep.Put(re) } wg.Done() } func (c *clusterClient) DoMultiCache(ctx context.Context, multi ...CacheableTTL) []ValkeyResult { if len(multi) == 0 { return nil } retries, err := c.pickMultiCache(ctx, multi) if err != nil { return fillErrs(len(multi), err) } defer connretrycachep.Put(retries) var wg sync.WaitGroup var mu sync.Mutex results := resultsp.Get(len(multi), len(multi)) attempts := 1 retry: retries.RetryDelay = -1 // Assume no retry. Because client retry flag can be set to false. var cc1 conn var re1 *retrycache wg.Add(len(retries.m)) mu.Lock() for cc, re := range retries.m { delete(retries.m, cc) cc1 = cc re1 = re break } for cc, re := range retries.m { delete(retries.m, cc) go c.doretrycache(ctx, cc, results, retries, re, &mu, &wg, attempts) } mu.Unlock() c.doretrycache(ctx, cc1, results, retries, re1, &mu, &wg, attempts) wg.Wait() if len(retries.m) != 0 { if retries.Redirects > 0 { retries.Redirects = 0 goto retry } if retries.RetryDelay >= 0 { c.retryHandler.WaitForRetry(ctx, retries.RetryDelay) attempts++ goto retry } } for i, cmd := range multi { if err := results.s[i].NonValkeyError(); err == nil || err == ErrDoCacheAborted { cmds.PutCacheable(cmd.Cmd) } } return results.s } func (c *clusterClient) Receive(ctx context.Context, subscribe Completed, fn func(msg PubSubMessage)) (err error) { attempts := 1 retry: cc, err := c.pick(ctx, subscribe.Slot(), c.toReplica(subscribe)) if err != nil { goto ret } err = cc.Receive(ctx, subscribe, fn) if _, mode := c.shouldRefreshRetry(err, ctx); c.retry && mode != RedirectNone { shouldRetry := c.retryHandler.WaitOrSkipRetry(ctx, attempts, subscribe, err) if shouldRetry { attempts++ goto retry } } ret: if err == nil { cmds.PutCompleted(subscribe) } return err } func (c *clusterClient) DoStream(ctx context.Context, cmd Completed) ValkeyResultStream { cc, err := c.pick(ctx, cmd.Slot(), c.toReplica(cmd)) if err != nil { return ValkeyResultStream{e: err} } ret := cc.DoStream(ctx, cmd) cmds.PutCompleted(cmd) return ret } func (c *clusterClient) DoMultiStream(ctx context.Context, multi ...Completed) MultiValkeyResultStream { if len(multi) == 0 { return ValkeyResultStream{e: io.EOF} } slot := multi[0].Slot() repl := c.toReplica(multi[0]) for i := 1; i < len(multi); i++ { if s := multi[i].Slot(); s != cmds.InitSlot { if slot == cmds.InitSlot { slot = s } else if slot != s { panic("DoMultiStream across multiple slots is not supported") } } repl = repl && c.toReplica(multi[i]) } cc, err := c.pick(ctx, slot, repl) if err != nil { return ValkeyResultStream{e: err} } ret := cc.DoMultiStream(ctx, multi...) for _, cmd := range multi { cmds.PutCompleted(cmd) } return ret } func (c *clusterClient) Dedicated(fn func(DedicatedClient) error) (err error) { dcc := &dedicatedClusterClient{cmd: c.cmd, client: c, slot: cmds.NoSlot, retry: c.retry, retryHandler: c.retryHandler} err = fn(dcc) dcc.release() return err } func (c *clusterClient) Dedicate() (DedicatedClient, func()) { dcc := &dedicatedClusterClient{cmd: c.cmd, client: c, slot: cmds.NoSlot, retry: c.retry, retryHandler: c.retryHandler} return dcc, dcc.release } func (c *clusterClient) Nodes() map[string]Client { c.mu.RLock() _nodes := make(map[string]Client, len(c.conns)) disableCache := c.opt != nil && c.opt.DisableCache for addr, cc := range c.conns { if !cc.hidden { _nodes[addr] = newSingleClientWithConn(cc.conn, c.cmd, c.retry, disableCache, c.retryHandler) } } c.mu.RUnlock() return _nodes } func (c *clusterClient) Close() { if atomic.CompareAndSwapUint32(&c.stop, 0, 1) { close(c.stopCh) } c.mu.RLock() for _, cc := range c.conns { go cc.conn.Close() } c.mu.RUnlock() } func (c *clusterClient) shouldRefreshRetry(err error, ctx context.Context) (addr string, mode RedirectMode) { if err != nil && err != Nil && err != ErrDoCacheAborted && atomic.LoadUint32(&c.stop) == 0 { if err, ok := err.(*ValkeyError); ok { if addr, ok = err.IsMoved(); ok { mode = RedirectMove } else if addr, ok = err.IsAsk(); ok { mode = RedirectAsk } else if err.IsClusterDown() || err.IsTryAgain() || err.IsLoading() { mode = RedirectRetry } } else if ctx.Err() == nil { mode = RedirectRetry } if mode != RedirectNone { c.lazyRefresh() } } return } type dedicatedClusterClient struct { conn conn wire wire retryHandler retryHandler client *clusterClient pshks *pshks mu sync.Mutex cmd Builder slot uint16 retry bool mark bool } func (c *dedicatedClusterClient) acquire(ctx context.Context, slot uint16) (wire wire, err error) { c.mu.Lock() defer c.mu.Unlock() if c.mark { return nil, ErrDedicatedClientRecycled } if c.slot == cmds.NoSlot { c.slot = slot } else if c.slot != slot && slot != cmds.InitSlot { panic(panicMsgCxSlot) } if c.wire != nil { return c.wire, nil } if c.conn, err = c.client.pick(ctx, c.slot, false); err != nil { if p := c.pshks; p != nil { c.pshks = nil p.close <- err close(p.close) } return nil, err } c.wire = c.conn.Acquire() if p := c.pshks; p != nil { c.pshks = nil ch := c.wire.SetPubSubHooks(p.hooks) go func(ch <-chan error) { for e := range ch { p.close <- e } close(p.close) }(ch) } return c.wire, nil } func (c *dedicatedClusterClient) release() { c.mu.Lock() if !c.mark { if p := c.pshks; p != nil { c.pshks = nil close(p.close) } if c.wire != nil { c.conn.Store(c.wire) } } c.mark = true c.mu.Unlock() } func (c *dedicatedClusterClient) B() Builder { return c.cmd } func (c *dedicatedClusterClient) Do(ctx context.Context, cmd Completed) (resp ValkeyResult) { attempts := 1 retry: if w, err := c.acquire(ctx, cmd.Slot()); err != nil { resp = newErrResult(err) } else { resp = w.Do(ctx, cmd) switch _, mode := c.client.shouldRefreshRetry(resp.Error(), ctx); mode { case RedirectRetry: if c.retry && cmd.IsReadOnly() && w.Error() == nil { shouldRetry := c.retryHandler.WaitOrSkipRetry( ctx, attempts, cmd, resp.Error(), ) if shouldRetry { attempts++ goto retry } } } } if resp.NonValkeyError() == nil { cmds.PutCompleted(cmd) } return resp } func (c *dedicatedClusterClient) DoMulti(ctx context.Context, multi ...Completed) (resp []ValkeyResult) { if len(multi) == 0 { return nil } slot := chooseSlot(multi) if slot == cmds.NoSlot { panic(panicMsgCxSlot) } retryable := c.retry if retryable { retryable = allReadOnly(multi) } attempts := 1 retry: if w, err := c.acquire(ctx, slot); err == nil { resp = w.DoMulti(ctx, multi...).s for i, r := range resp { _, mode := c.client.shouldRefreshRetry(r.Error(), ctx) if mode == RedirectRetry && retryable && w.Error() == nil { shouldRetry := c.retryHandler.WaitOrSkipRetry( ctx, attempts, multi[i], r.Error(), ) if shouldRetry { attempts++ goto retry } } if mode != RedirectNone { break } } } else { resp = resultsp.Get(len(multi), len(multi)).s for i := range resp { resp[i] = newErrResult(err) } } for i, cmd := range multi { if resp[i].NonValkeyError() == nil { cmds.PutCompleted(cmd) } } return resp } func (c *dedicatedClusterClient) Receive(ctx context.Context, subscribe Completed, fn func(msg PubSubMessage)) (err error) { var ( w wire attempts = 1 ) retry: if w, err = c.acquire(ctx, subscribe.Slot()); err == nil { err = w.Receive(ctx, subscribe, fn) if _, mode := c.client.shouldRefreshRetry(err, ctx); c.retry && mode == RedirectRetry && w.Error() == nil { shouldRetry := c.retryHandler.WaitOrSkipRetry(ctx, attempts, subscribe, err) if shouldRetry { attempts++ goto retry } } } if err == nil { cmds.PutCompleted(subscribe) } return err } func (c *dedicatedClusterClient) SetPubSubHooks(hooks PubSubHooks) <-chan error { c.mu.Lock() defer c.mu.Unlock() if c.mark { ch := make(chan error, 1) ch <- ErrDedicatedClientRecycled return ch } if p := c.pshks; p != nil { c.pshks = nil close(p.close) } if c.wire != nil { return c.wire.SetPubSubHooks(hooks) } if hooks.isZero() { return nil } ch := make(chan error, 1) c.pshks = &pshks{hooks: hooks, close: ch} return ch } func (c *dedicatedClusterClient) Close() { c.mu.Lock() if p := c.pshks; p != nil { c.pshks = nil p.close <- ErrClosing close(p.close) } if c.wire != nil { c.wire.Close() } c.mu.Unlock() c.release() } type RedirectMode int const ( RedirectNone RedirectMode = iota RedirectMove RedirectAsk RedirectRetry panicMsgCxSlot = "cross slot command in Dedicated is prohibited" panicMixCxSlot = "Mixing no-slot and cross slot commands in DoMulti is prohibited" )