package valkey import ( "context" "crypto/tls" "errors" "fmt" "io" "reflect" "sort" "strconv" "strings" "sync" "sync/atomic" "testing" "time" ) var slotsResp = newResult(ValkeyMessage{typ: '*', values: []ValkeyMessage{ {typ: '*', values: []ValkeyMessage{ {typ: ':', integer: 0}, {typ: ':', integer: 16383}, {typ: '*', values: []ValkeyMessage{ // master {typ: '+', string: "127.0.0.1"}, {typ: ':', integer: 0}, {typ: '+', string: ""}, }}, {typ: '*', values: []ValkeyMessage{ // replica {typ: '+', string: "127.0.1.1"}, {typ: ':', integer: 1}, {typ: '+', string: ""}, }}, }}, }}, nil) var slotsMultiResp = newResult(ValkeyMessage{typ: '*', values: []ValkeyMessage{ {typ: '*', values: []ValkeyMessage{ {typ: ':', integer: 0}, {typ: ':', integer: 8192}, {typ: '*', values: []ValkeyMessage{ // master {typ: '+', string: "127.0.0.1"}, {typ: ':', integer: 0}, {typ: '+', string: ""}, }}, {typ: '*', values: []ValkeyMessage{ // replica {typ: '+', string: "127.0.1.1"}, {typ: ':', integer: 1}, {typ: '+', string: ""}, }}, }}, {typ: '*', values: []ValkeyMessage{ {typ: ':', integer: 8193}, {typ: ':', integer: 16383}, {typ: '*', values: []ValkeyMessage{ // master {typ: '+', string: "127.0.2.1"}, {typ: ':', integer: 0}, {typ: '+', string: ""}, }}, {typ: '*', values: []ValkeyMessage{ // replica {typ: '+', string: "127.0.3.1"}, {typ: ':', integer: 1}, {typ: '+', string: ""}, }}, }}, }}, nil) var slotsMultiRespWithoutReplicas = newResult(ValkeyMessage{typ: '*', values: []ValkeyMessage{ {typ: '*', values: []ValkeyMessage{ {typ: ':', integer: 0}, {typ: ':', integer: 8192}, {typ: '*', values: []ValkeyMessage{ // master {typ: '+', string: "127.0.0.1"}, {typ: ':', integer: 0}, {typ: '+', string: ""}, }}, }}, {typ: '*', values: []ValkeyMessage{ {typ: ':', integer: 8193}, {typ: ':', integer: 16383}, {typ: '*', values: []ValkeyMessage{ // master {typ: '+', string: "127.0.1.1"}, {typ: ':', integer: 0}, {typ: '+', string: ""}, }}, }}, }}, nil) var slotsMultiRespWithMultiReplicas = newResult(ValkeyMessage{typ: '*', values: []ValkeyMessage{ {typ: '*', values: []ValkeyMessage{ {typ: ':', integer: 0}, {typ: ':', integer: 8192}, {typ: '*', values: []ValkeyMessage{ // master {typ: '+', string: "127.0.0.1"}, {typ: ':', integer: 0}, {typ: '+', string: ""}, }}, {typ: '*', values: []ValkeyMessage{ // replica1 {typ: '+', string: "127.0.0.2"}, {typ: ':', integer: 1}, {typ: '+', string: ""}, }}, {typ: '*', values: []ValkeyMessage{ // replica2 {typ: '+', string: "127.0.0.3"}, {typ: ':', integer: 2}, {typ: '+', string: ""}, }}, {typ: '*', values: []ValkeyMessage{ // replica3 {typ: '+', string: "127.0.0.4"}, {typ: ':', integer: 3}, {typ: '+', string: ""}, }}, }}, {typ: '*', values: []ValkeyMessage{ {typ: ':', integer: 8193}, {typ: ':', integer: 16383}, {typ: '*', values: []ValkeyMessage{ // master {typ: '+', string: "127.0.1.1"}, {typ: ':', integer: 0}, {typ: '+', string: ""}, }}, {typ: '*', values: []ValkeyMessage{ // replica1 {typ: '+', string: "127.0.1.2"}, {typ: ':', integer: 1}, {typ: '+', string: ""}, }}, {typ: '*', values: []ValkeyMessage{ // replica2 {typ: '+', string: "127.0.1.3"}, {typ: ':', integer: 2}, {typ: '+', string: ""}, }}, {typ: '*', values: []ValkeyMessage{ // replica3 {typ: '+', string: "127.0.1.4"}, {typ: ':', integer: 3}, {typ: '+', string: ""}, }}, }}, }}, nil) var singleSlotResp = newResult(ValkeyMessage{typ: '*', values: []ValkeyMessage{ {typ: '*', values: []ValkeyMessage{ {typ: ':', integer: 0}, {typ: ':', integer: 0}, {typ: '*', values: []ValkeyMessage{ // master {typ: '+', string: "127.0.0.1"}, {typ: ':', integer: 0}, {typ: '+', string: ""}, }}, }}, }}, nil) var singleSlotResp2 = newResult(ValkeyMessage{typ: '*', values: []ValkeyMessage{ {typ: '*', values: []ValkeyMessage{ {typ: ':', integer: 0}, {typ: ':', integer: 0}, {typ: '*', values: []ValkeyMessage{ // master {typ: '+', string: "127.0.3.1"}, {typ: ':', integer: 3}, {typ: '+', string: ""}, }}, }}, }}, nil) var singleSlotWithoutIP = newResult(ValkeyMessage{typ: '*', values: []ValkeyMessage{ {typ: '*', values: []ValkeyMessage{ {typ: ':', integer: 0}, {typ: ':', integer: 0}, {typ: '*', values: []ValkeyMessage{ // master {typ: '+', string: ""}, {typ: ':', integer: 4}, {typ: '+', string: ""}, }}, {typ: '*', values: []ValkeyMessage{ // replica {typ: '+', string: "?"}, {typ: ':', integer: 1}, {typ: '+', string: ""}, }}, }}, {typ: '*', values: []ValkeyMessage{ {typ: ':', integer: 0}, {typ: ':', integer: 0}, {typ: '*', values: []ValkeyMessage{ // master {typ: '+', string: "?"}, {typ: ':', integer: 4}, {typ: '+', string: ""}, }}, }}, }}, nil) var shardsResp = newResult(ValkeyMessage{typ: typeArray, values: []ValkeyMessage{ {typ: typeMap, values: []ValkeyMessage{ {typ: typeBlobString, string: "slots"}, {typ: typeArray, values: []ValkeyMessage{ {typ: typeBlobString, string: "0"}, {typ: typeBlobString, string: "16383"}, }}, {typ: typeBlobString, string: "nodes"}, {typ: typeArray, values: []ValkeyMessage{ {typ: typeMap, values: []ValkeyMessage{ // failed master {typ: typeBlobString, string: "id"}, {typ: typeBlobString, string: ""}, {typ: typeBlobString, string: "port"}, {typ: typeInteger, integer: 0}, {typ: typeBlobString, string: "ip"}, {typ: typeBlobString, string: "127.0.0.99"}, {typ: typeBlobString, string: "endpoint"}, {typ: typeBlobString, string: "127.0.0.99"}, {typ: typeBlobString, string: "role"}, {typ: typeBlobString, string: "master"}, {typ: typeBlobString, string: "replication-offset"}, {typ: typeInteger, integer: 72156}, {typ: typeBlobString, string: "health"}, {typ: typeBlobString, string: "fail"}, }}, {typ: typeMap, values: []ValkeyMessage{ // master {typ: typeBlobString, string: "id"}, {typ: typeBlobString, string: ""}, {typ: typeBlobString, string: "port"}, {typ: typeInteger, integer: 0}, {typ: typeBlobString, string: "ip"}, {typ: typeBlobString, string: "127.0.0.1"}, {typ: typeBlobString, string: "endpoint"}, {typ: typeBlobString, string: "127.0.0.1"}, {typ: typeBlobString, string: "role"}, {typ: typeBlobString, string: "master"}, {typ: typeBlobString, string: "replication-offset"}, {typ: typeInteger, integer: 72156}, {typ: typeBlobString, string: "health"}, {typ: typeBlobString, string: "online"}, }}, {typ: typeMap, values: []ValkeyMessage{ // replica {typ: typeBlobString, string: "id"}, {typ: typeBlobString, string: ""}, {typ: typeBlobString, string: "port"}, {typ: typeInteger, integer: 1}, {typ: typeBlobString, string: "ip"}, {typ: typeBlobString, string: "127.0.1.1"}, {typ: typeBlobString, string: "endpoint"}, {typ: typeBlobString, string: "127.0.1.1"}, {typ: typeBlobString, string: "role"}, {typ: typeBlobString, string: "replica"}, {typ: typeBlobString, string: "replication-offset"}, {typ: typeInteger, integer: 72156}, {typ: typeBlobString, string: "health"}, {typ: typeBlobString, string: "online"}, }}, }}, }}, }}, nil) var shardsRespTls = newResult(ValkeyMessage{typ: typeArray, values: []ValkeyMessage{ {typ: typeMap, values: []ValkeyMessage{ {typ: typeBlobString, string: "slots"}, {typ: typeArray, values: []ValkeyMessage{ {typ: typeBlobString, string: "0"}, {typ: typeBlobString, string: "16383"}, }}, {typ: typeBlobString, string: "nodes"}, {typ: typeArray, values: []ValkeyMessage{ {typ: typeMap, values: []ValkeyMessage{ // replica, tls {typ: typeBlobString, string: "id"}, {typ: typeBlobString, string: ""}, {typ: typeBlobString, string: "tls-port"}, {typ: typeInteger, integer: 2}, {typ: typeBlobString, string: "ip"}, {typ: typeBlobString, string: "127.0.2.1"}, {typ: typeBlobString, string: "endpoint"}, {typ: typeBlobString, string: "127.0.2.1"}, {typ: typeBlobString, string: "role"}, {typ: typeBlobString, string: "replica"}, {typ: typeBlobString, string: "replication-offset"}, {typ: typeInteger, integer: 72156}, {typ: typeBlobString, string: "health"}, {typ: typeBlobString, string: "online"}, }}, {typ: typeMap, values: []ValkeyMessage{ // failed master, tls + port {typ: typeBlobString, string: "id"}, {typ: typeBlobString, string: ""}, {typ: typeBlobString, string: "port"}, {typ: typeInteger, integer: 0}, {typ: typeBlobString, string: "tls-port"}, {typ: typeInteger, integer: 1}, {typ: typeBlobString, string: "ip"}, {typ: typeBlobString, string: "127.0.1.99"}, {typ: typeBlobString, string: "endpoint"}, {typ: typeBlobString, string: "127.0.1.99"}, {typ: typeBlobString, string: "role"}, {typ: typeBlobString, string: "master"}, {typ: typeBlobString, string: "replication-offset"}, {typ: typeInteger, integer: 72156}, {typ: typeBlobString, string: "health"}, {typ: typeBlobString, string: "fail"}, }}, {typ: typeMap, values: []ValkeyMessage{ // master, tls + port {typ: typeBlobString, string: "id"}, {typ: typeBlobString, string: ""}, {typ: typeBlobString, string: "port"}, {typ: typeInteger, integer: 0}, {typ: typeBlobString, string: "tls-port"}, {typ: typeInteger, integer: 1}, {typ: typeBlobString, string: "ip"}, {typ: typeBlobString, string: "127.0.1.1"}, {typ: typeBlobString, string: "endpoint"}, {typ: typeBlobString, string: "127.0.1.1"}, {typ: typeBlobString, string: "role"}, {typ: typeBlobString, string: "master"}, {typ: typeBlobString, string: "replication-offset"}, {typ: typeInteger, integer: 72156}, {typ: typeBlobString, string: "health"}, {typ: typeBlobString, string: "online"}, }}, {typ: typeMap, values: []ValkeyMessage{ // replica, port {typ: typeBlobString, string: "id"}, {typ: typeBlobString, string: ""}, {typ: typeBlobString, string: "port"}, {typ: typeInteger, integer: 3}, {typ: typeBlobString, string: "ip"}, {typ: typeBlobString, string: "127.0.3.1"}, {typ: typeBlobString, string: "endpoint"}, {typ: typeBlobString, string: "127.0.3.1"}, {typ: typeBlobString, string: "role"}, {typ: typeBlobString, string: "replica"}, {typ: typeBlobString, string: "replication-offset"}, {typ: typeInteger, integer: 72156}, {typ: typeBlobString, string: "health"}, {typ: typeBlobString, string: "online"}, }}, }}, }}, }}, nil) var shardsMultiResp = newResult(ValkeyMessage{typ: '*', values: []ValkeyMessage{ {typ: typeMap, values: []ValkeyMessage{ {typ: typeBlobString, string: "slots"}, {typ: typeArray, values: []ValkeyMessage{ {typ: typeBlobString, string: "0"}, {typ: typeBlobString, string: "8192"}, }}, {typ: typeBlobString, string: "nodes"}, {typ: typeArray, values: []ValkeyMessage{ {typ: typeMap, values: []ValkeyMessage{ // failed master {typ: typeBlobString, string: "id"}, {typ: typeBlobString, string: ""}, {typ: typeBlobString, string: "port"}, {typ: typeInteger, integer: 0}, {typ: typeBlobString, string: "ip"}, {typ: typeBlobString, string: "127.0.0.99"}, {typ: typeBlobString, string: "endpoint"}, {typ: typeBlobString, string: "127.0.0.99"}, {typ: typeBlobString, string: "role"}, {typ: typeBlobString, string: "master"}, {typ: typeBlobString, string: "replication-offset"}, {typ: typeInteger, integer: 72156}, {typ: typeBlobString, string: "health"}, {typ: typeBlobString, string: "fail"}, }}, {typ: typeMap, values: []ValkeyMessage{ // master {typ: typeBlobString, string: "id"}, {typ: typeBlobString, string: ""}, {typ: typeBlobString, string: "port"}, {typ: typeInteger, integer: 0}, {typ: typeBlobString, string: "ip"}, {typ: typeBlobString, string: "127.0.0.1"}, {typ: typeBlobString, string: "endpoint"}, {typ: typeBlobString, string: "127.0.0.1"}, {typ: typeBlobString, string: "role"}, {typ: typeBlobString, string: "master"}, {typ: typeBlobString, string: "replication-offset"}, {typ: typeInteger, integer: 72156}, {typ: typeBlobString, string: "health"}, {typ: typeBlobString, string: "online"}, }}, {typ: typeMap, values: []ValkeyMessage{ // replica {typ: typeBlobString, string: "id"}, {typ: typeBlobString, string: ""}, {typ: typeBlobString, string: "port"}, {typ: typeInteger, integer: 1}, {typ: typeBlobString, string: "ip"}, {typ: typeBlobString, string: "127.0.1.1"}, {typ: typeBlobString, string: "endpoint"}, {typ: typeBlobString, string: "127.0.1.1"}, {typ: typeBlobString, string: "role"}, {typ: typeBlobString, string: "replica"}, {typ: typeBlobString, string: "replication-offset"}, {typ: typeInteger, integer: 72156}, {typ: typeBlobString, string: "health"}, {typ: typeBlobString, string: "online"}, }}, }}, }}, {typ: typeMap, values: []ValkeyMessage{ {typ: typeBlobString, string: "slots"}, {typ: typeArray, values: []ValkeyMessage{ {typ: typeBlobString, string: "8193"}, {typ: typeBlobString, string: "16383"}, }}, {typ: typeBlobString, string: "nodes"}, {typ: typeArray, values: []ValkeyMessage{ {typ: typeMap, values: []ValkeyMessage{ // failed master {typ: typeBlobString, string: "id"}, {typ: typeBlobString, string: ""}, {typ: typeBlobString, string: "port"}, {typ: typeInteger, integer: 0}, {typ: typeBlobString, string: "ip"}, {typ: typeBlobString, string: "127.0.2.99"}, {typ: typeBlobString, string: "endpoint"}, {typ: typeBlobString, string: "127.0.2.99"}, {typ: typeBlobString, string: "role"}, {typ: typeBlobString, string: "master"}, {typ: typeBlobString, string: "replication-offset"}, {typ: typeInteger, integer: 72156}, {typ: typeBlobString, string: "health"}, {typ: typeBlobString, string: "fail"}, }}, {typ: typeMap, values: []ValkeyMessage{ // master {typ: typeBlobString, string: "id"}, {typ: typeBlobString, string: ""}, {typ: typeBlobString, string: "port"}, {typ: typeInteger, integer: 0}, {typ: typeBlobString, string: "ip"}, {typ: typeBlobString, string: "127.0.2.1"}, {typ: typeBlobString, string: "endpoint"}, {typ: typeBlobString, string: "127.0.2.1"}, {typ: typeBlobString, string: "role"}, {typ: typeBlobString, string: "master"}, {typ: typeBlobString, string: "replication-offset"}, {typ: typeInteger, integer: 72156}, {typ: typeBlobString, string: "health"}, {typ: typeBlobString, string: "online"}, }}, {typ: typeMap, values: []ValkeyMessage{ // replica {typ: typeBlobString, string: "id"}, {typ: typeBlobString, string: ""}, {typ: typeBlobString, string: "port"}, {typ: typeInteger, integer: 1}, {typ: typeBlobString, string: "ip"}, {typ: typeBlobString, string: "127.0.3.1"}, {typ: typeBlobString, string: "endpoint"}, {typ: typeBlobString, string: "127.0.3.1"}, {typ: typeBlobString, string: "role"}, {typ: typeBlobString, string: "replica"}, {typ: typeBlobString, string: "replication-offset"}, {typ: typeInteger, integer: 72156}, {typ: typeBlobString, string: "health"}, {typ: typeBlobString, string: "online"}, }}, }}, }}, }}, nil) var singleShardResp2 = newResult(ValkeyMessage{typ: '*', values: []ValkeyMessage{ {typ: typeMap, values: []ValkeyMessage{ {typ: typeBlobString, string: "slots"}, {typ: typeArray, values: []ValkeyMessage{ {typ: typeBlobString, string: "0"}, {typ: typeBlobString, string: "0"}, }}, {typ: typeBlobString, string: "nodes"}, {typ: typeArray, values: []ValkeyMessage{ {typ: typeMap, values: []ValkeyMessage{ // failed master {typ: typeBlobString, string: "id"}, {typ: typeBlobString, string: ""}, {typ: typeBlobString, string: "port"}, {typ: typeInteger, integer: 3}, {typ: typeBlobString, string: "ip"}, {typ: typeBlobString, string: "127.0.3.99"}, {typ: typeBlobString, string: "endpoint"}, {typ: typeBlobString, string: "127.0.3.99"}, {typ: typeBlobString, string: "role"}, {typ: typeBlobString, string: "master"}, {typ: typeBlobString, string: "replication-offset"}, {typ: typeInteger, integer: 72156}, {typ: typeBlobString, string: "health"}, {typ: typeBlobString, string: "fail"}, }}, {typ: typeMap, values: []ValkeyMessage{ // master {typ: typeBlobString, string: "id"}, {typ: typeBlobString, string: ""}, {typ: typeBlobString, string: "port"}, {typ: typeInteger, integer: 3}, {typ: typeBlobString, string: "ip"}, {typ: typeBlobString, string: "127.0.3.1"}, {typ: typeBlobString, string: "endpoint"}, {typ: typeBlobString, string: "127.0.3.1"}, {typ: typeBlobString, string: "role"}, {typ: typeBlobString, string: "master"}, {typ: typeBlobString, string: "replication-offset"}, {typ: typeInteger, integer: 72156}, {typ: typeBlobString, string: "health"}, {typ: typeBlobString, string: "online"}, }}, }}, }}, }}, nil) var singleShardWithoutIP = newResult(ValkeyMessage{typ: typeArray, values: []ValkeyMessage{ {typ: typeMap, values: []ValkeyMessage{ {typ: typeBlobString, string: "slots"}, {typ: typeArray, values: []ValkeyMessage{ {typ: typeBlobString, string: "0"}, {typ: typeBlobString, string: "0"}, }}, {typ: typeBlobString, string: "nodes"}, {typ: typeArray, values: []ValkeyMessage{ {typ: typeMap, values: []ValkeyMessage{ // failed master {typ: typeBlobString, string: "id"}, {typ: typeBlobString, string: ""}, {typ: typeBlobString, string: "port"}, {typ: typeInteger, integer: 4}, {typ: typeBlobString, string: "ip"}, {typ: typeBlobString, string: ""}, {typ: typeBlobString, string: "endpoint"}, {typ: typeBlobString, string: ""}, {typ: typeBlobString, string: "role"}, {typ: typeBlobString, string: "master"}, {typ: typeBlobString, string: "replication-offset"}, {typ: typeInteger, integer: 72156}, {typ: typeBlobString, string: "health"}, {typ: typeBlobString, string: "fail"}, }}, {typ: typeMap, values: []ValkeyMessage{ // master {typ: typeBlobString, string: "id"}, {typ: typeBlobString, string: ""}, {typ: typeBlobString, string: "port"}, {typ: typeInteger, integer: 4}, {typ: typeBlobString, string: "ip"}, {typ: typeBlobString, string: ""}, {typ: typeBlobString, string: "endpoint"}, {typ: typeBlobString, string: ""}, {typ: typeBlobString, string: "role"}, {typ: typeBlobString, string: "master"}, {typ: typeBlobString, string: "replication-offset"}, {typ: typeInteger, integer: 72156}, {typ: typeBlobString, string: "health"}, {typ: typeBlobString, string: "online"}, }}, {typ: typeMap, values: []ValkeyMessage{ // replica {typ: typeBlobString, string: "id"}, {typ: typeBlobString, string: ""}, {typ: typeBlobString, string: "port"}, {typ: typeInteger, integer: 1}, {typ: typeBlobString, string: "ip"}, {typ: typeBlobString, string: "?"}, {typ: typeBlobString, string: "endpoint"}, {typ: typeBlobString, string: "?"}, {typ: typeBlobString, string: "role"}, {typ: typeBlobString, string: "replica"}, {typ: typeBlobString, string: "replication-offset"}, {typ: typeInteger, integer: 72156}, {typ: typeBlobString, string: "health"}, {typ: typeBlobString, string: "online"}, }}, }}, }}, {typ: typeMap, values: []ValkeyMessage{ {typ: typeBlobString, string: "slots"}, {typ: typeArray, values: []ValkeyMessage{ {typ: typeBlobString, string: "0"}, {typ: typeBlobString, string: "0"}, }}, {typ: typeBlobString, string: "nodes"}, {typ: typeArray, values: []ValkeyMessage{ {typ: typeMap, values: []ValkeyMessage{ // master {typ: typeBlobString, string: "id"}, {typ: typeBlobString, string: ""}, {typ: typeBlobString, string: "port"}, {typ: typeInteger, integer: 4}, {typ: typeBlobString, string: "ip"}, {typ: typeBlobString, string: "?"}, {typ: typeBlobString, string: "endpoint"}, {typ: typeBlobString, string: "?"}, {typ: typeBlobString, string: "role"}, {typ: typeBlobString, string: "master"}, {typ: typeBlobString, string: "replication-offset"}, {typ: typeInteger, integer: 72156}, {typ: typeBlobString, string: "health"}, {typ: typeBlobString, string: "online"}, }}, }}, }}, }}, nil) //gocyclo:ignore func TestClusterClientInit(t *testing.T) { defer ShouldNotLeaked(SetupLeakDetection()) t.Run("Init no nodes", func(t *testing.T) { if _, err := newClusterClient( &ClientOption{InitAddress: []string{}}, func(dst string, opt *ClientOption) conn { return nil }, newRetryer(defaultRetryDelayFn), ); err != ErrNoAddr { t.Fatalf("unexpected err %v", err) } }) t.Run("Init no dialable", func(t *testing.T) { v := errors.New("dial err") if _, err := newClusterClient( &ClientOption{InitAddress: []string{":0"}}, func(dst string, opt *ClientOption) conn { return &mockConn{DialFn: func() error { return v }} }, newRetryer(defaultRetryDelayFn), ); err != v { t.Fatalf("unexpected err %v", err) } }) t.Run("Refresh err", func(t *testing.T) { v := errors.New("refresh err") if _, err := newClusterClient( &ClientOption{InitAddress: []string{":0"}}, func(dst string, opt *ClientOption) conn { return &mockConn{DoFn: func(cmd Completed) ValkeyResult { return newErrResult(v) }} }, newRetryer(defaultRetryDelayFn), ); err != v { t.Fatalf("unexpected err %v", err) } }) t.Run("Refresh skip zero slots", func(t *testing.T) { var first int64 if _, err := newClusterClient( &ClientOption{InitAddress: []string{"127.0.0.1:0", "127.0.1.1:1"}}, func(dst string, opt *ClientOption) conn { return &mockConn{ DoFn: func(cmd Completed) ValkeyResult { if atomic.AddInt64(&first, 1) == 1 { return newResult(ValkeyMessage{typ: '*', values: []ValkeyMessage{}}, nil) } return slotsResp }, } }, newRetryer(defaultRetryDelayFn), ); err != nil || atomic.AddInt64(&first, 1) < 2 { t.Fatalf("unexpected err %v", err) } }) t.Run("Refresh skip zero shards", func(t *testing.T) { var first int64 if _, err := newClusterClient( &ClientOption{InitAddress: []string{"127.0.0.1:0", "127.0.1.1:1"}}, func(dst string, opt *ClientOption) conn { return &mockConn{ DoFn: func(cmd Completed) ValkeyResult { if atomic.AddInt64(&first, 1) == 1 { return newResult(ValkeyMessage{typ: '*', values: []ValkeyMessage{}}, nil) } return shardsResp }, VersionFn: func() int { return 8 }, } }, newRetryer(defaultRetryDelayFn), ); err != nil || atomic.AddInt64(&first, 1) < 2 { t.Fatalf("unexpected err %v", err) } }) t.Run("Refresh no slots cluster", func(t *testing.T) { if _, err := newClusterClient( &ClientOption{InitAddress: []string{":0"}}, func(dst string, opt *ClientOption) conn { return &mockConn{ DoFn: func(cmd Completed) ValkeyResult { return newResult(ValkeyMessage{typ: '*', values: []ValkeyMessage{}}, nil) }, } }, newRetryer(defaultRetryDelayFn), ); err != nil { t.Fatalf("unexpected err %v", err) } }) t.Run("Refresh no shards cluster", func(t *testing.T) { if _, err := newClusterClient( &ClientOption{InitAddress: []string{":0"}}, func(dst string, opt *ClientOption) conn { return &mockConn{ DoFn: func(cmd Completed) ValkeyResult { return newResult(ValkeyMessage{typ: '*', values: []ValkeyMessage{}}, nil) }, VersionFn: func() int { return 8 }, } }, newRetryer(defaultRetryDelayFn), ); err != nil { t.Fatalf("unexpected err %v", err) } }) t.Run("Refresh cluster of 1 node without knowing its own ip", func(t *testing.T) { getClient := func(version int) (client *clusterClient, err error) { return newClusterClient( &ClientOption{InitAddress: []string{"127.0.4.1:4"}}, func(dst string, opt *ClientOption) conn { return &mockConn{ DoFn: func(cmd Completed) ValkeyResult { if strings.Join(cmd.Commands(), " ") == "CLUSTER SLOTS" { return singleSlotWithoutIP } return singleShardWithoutIP }, AddrFn: func() string { return "127.0.4.1:4" }, VersionFn: func() int { return version }, } }, newRetryer(defaultRetryDelayFn), ) } t.Run("slots", func(t *testing.T) { client, err := getClient(6) if err != nil { t.Fatalf("unexpected err %v", err) } nodes := client.nodes() sort.Strings(nodes) if len(nodes) != 1 || nodes[0] != "127.0.4.1:4" { t.Fatalf("unexpected nodes %v", nodes) } }) t.Run("shards", func(t *testing.T) { client, err := getClient(8) if err != nil { t.Fatalf("unexpected err %v", err) } nodes := client.nodes() sort.Strings(nodes) if len(nodes) != 1 || nodes[0] != "127.0.4.1:4" { t.Fatalf("unexpected nodes %v", nodes) } }) }) t.Run("Refresh replace", func(t *testing.T) { testFunc := func(t *testing.T, client *clusterClient, num *int64) { nodes := client.nodes() sort.Strings(nodes) if len(nodes) != 3 || nodes[0] != "127.0.0.1:0" || nodes[1] != "127.0.1.1:1" || nodes[2] != "127.0.2.1:2" { t.Fatalf("unexpected nodes %v", nodes) } atomic.AddInt64(num, 1) if err := client.refresh(context.Background()); err != nil { t.Fatalf("unexpected err %v", err) } nodes = client.nodes() sort.Strings(nodes) if len(nodes) != 3 || nodes[0] != "127.0.1.1:1" || nodes[1] != "127.0.2.1:2" || nodes[2] != "127.0.3.1:3" { t.Fatalf("unexpected nodes %v", nodes) } } t.Run("slots", func(t *testing.T) { var first int64 client, err := newClusterClient( &ClientOption{InitAddress: []string{"127.0.1.1:1", "127.0.2.1:2"}}, func(dst string, opt *ClientOption) conn { return &mockConn{ DoFn: func(cmd Completed) ValkeyResult { if atomic.LoadInt64(&first) == 1 { return singleSlotResp2 } return slotsResp }, } }, newRetryer(defaultRetryDelayFn), ) if err != nil { t.Fatalf("unexpected err %v", err) } testFunc(t, client, &first) }) t.Run("shards", func(t *testing.T) { var first int64 client, err := newClusterClient( &ClientOption{InitAddress: []string{"127.0.1.1:1", "127.0.2.1:2"}}, func(dst string, opt *ClientOption) conn { return &mockConn{ DoFn: func(cmd Completed) ValkeyResult { if atomic.LoadInt64(&first) == 1 { return singleShardResp2 } return shardsResp }, VersionFn: func() int { return 8 }, } }, newRetryer(defaultRetryDelayFn), ) if err != nil { t.Fatalf("unexpected err %v", err) } testFunc(t, client, &first) }) }) t.Run("Refresh InitAddress which is not in CLUSTER SLOTS / CLUSTER SHARDS should be hidden", func(t *testing.T) { testFunc := func(t *testing.T, client *clusterClient, num *int64) { nodesWithHidden := client.nodes() sort.Strings(nodesWithHidden) if len(nodesWithHidden) != 4 || nodesWithHidden[0] != "127.0.0.1:0" || nodesWithHidden[1] != "127.0.1.1:1" || nodesWithHidden[2] != "127.0.2.1:2" || nodesWithHidden[3] != "redis.example.com" { t.Fatalf("unexpected nodes %v", nodesWithHidden) } nodes := client.Nodes() _, ok := nodes["127.0.0.1:0"] _, ok2 := nodes["127.0.1.1:1"] if len(nodes) != 2 || !ok || !ok2 { t.Fatalf("unexpected nodes %v", nodes) } atomic.AddInt64(num, 1) if err := client.refresh(context.Background()); err != nil { t.Fatalf("unexpected err %v", err) } nodesWithHidden = client.nodes() sort.Strings(nodesWithHidden) if len(nodesWithHidden) != 4 || nodesWithHidden[0] != "127.0.1.1:1" || nodesWithHidden[1] != "127.0.2.1:2" || nodesWithHidden[2] != "127.0.3.1:3" || nodesWithHidden[3] != "redis.example.com" { t.Fatalf("unexpected nodes %v", nodesWithHidden) } nodes = client.Nodes() _, ok = nodes["127.0.3.1:3"] if len(nodes) != 1 || !ok { t.Fatalf("unexpected nodes %v", nodes) } } t.Run("slots", func(t *testing.T) { var first int64 client, err := newClusterClient( &ClientOption{InitAddress: []string{"127.0.1.1:1", "127.0.2.1:2", "redis.example.com"}}, func(dst string, opt *ClientOption) conn { return &mockConn{ DoFn: func(cmd Completed) ValkeyResult { if atomic.LoadInt64(&first) == 1 { return singleSlotResp2 } return slotsResp }, } }, newRetryer(defaultRetryDelayFn), ) if err != nil { t.Fatalf("unexpected err %v", err) } testFunc(t, client, &first) }) t.Run("shards", func(t *testing.T) { var first int64 client, err := newClusterClient( &ClientOption{InitAddress: []string{"127.0.1.1:1", "127.0.2.1:2", "redis.example.com"}}, func(dst string, opt *ClientOption) conn { return &mockConn{ DoFn: func(cmd Completed) ValkeyResult { if atomic.LoadInt64(&first) == 1 { return singleShardResp2 } return shardsResp }, VersionFn: func() int { return 8 }, } }, newRetryer(defaultRetryDelayFn), ) if err != nil { t.Fatalf("unexpected err %v", err) } testFunc(t, client, &first) }) }) t.Run("Shards tls", func(t *testing.T) { client, err := newClusterClient( &ClientOption{InitAddress: []string{"127.0.0.1:0"}, TLSConfig: &tls.Config{}}, func(dst string, opt *ClientOption) conn { return &mockConn{ DoFn: func(cmd Completed) ValkeyResult { return shardsRespTls }, VersionFn: func() int { return 8 }, } }, newRetryer(defaultRetryDelayFn), ) if err != nil { t.Fatalf("unexpected err %v", err) } nodes := client.nodes() sort.Strings(nodes) if len(nodes) != 4 || nodes[0] != "127.0.0.1:0" || nodes[1] != "127.0.1.1:1" || nodes[2] != "127.0.2.1:2" || nodes[3] != "127.0.3.1:3" { t.Fatalf("unexpected nodes %v", nodes) } }) t.Run("Refresh cluster which has only primary node per shard with SendToReplica option", func(t *testing.T) { m := &mockConn{ DoFn: func(cmd Completed) ValkeyResult { if strings.Join(cmd.Commands(), " ") == "CLUSTER SLOTS" { return slotsMultiRespWithoutReplicas } return ValkeyResult{} }, } client, err := newClusterClient( &ClientOption{ InitAddress: []string{"127.0.0.1:0"}, SendToReplicas: func(cmd Completed) bool { return true }, }, func(dst string, opt *ClientOption) conn { copiedM := *m return &copiedM }, newRetryer(defaultRetryDelayFn), ) if err != nil { t.Fatalf("unexpected err %v", err) } if client.pslots[0] != client.conns["127.0.0.1:0"].conn { t.Fatalf("unexpected node assigned to pslot 0") } if client.pslots[8192] != client.conns["127.0.0.1:0"].conn { t.Fatalf("unexpected node assigned to pslot 8192") } if client.pslots[8193] != client.conns["127.0.1.1:0"].conn { t.Fatalf("unexpected node assigned to pslot 8193") } if client.pslots[16383] != client.conns["127.0.1.1:0"].conn { t.Fatalf("unexpected node assigned to pslot 16383") } if client.rslots[0] != client.conns["127.0.0.1:0"].conn { t.Fatalf("unexpected node assigned to rslot 0") } if client.rslots[8192] != client.conns["127.0.0.1:0"].conn { t.Fatalf("unexpected node assigned to rslot 8192") } if client.rslots[8193] != client.conns["127.0.1.1:0"].conn { t.Fatalf("unexpected node assigned to rslot 8193") } if client.rslots[16383] != client.conns["127.0.1.1:0"].conn { t.Fatalf("unexpected node assigned to rslot 16383") } }) t.Run("Refresh cluster which has multi nodes per shard with SendToReplica option", func(t *testing.T) { primaryNodeConn := &mockConn{ DoFn: func(cmd Completed) ValkeyResult { if strings.Join(cmd.Commands(), " ") == "CLUSTER SLOTS" { return slotsMultiResp } return ValkeyResult{ err: errors.New("unexpected call"), } }, } replicaNodeConn := &mockConn{ DoFn: func(cmd Completed) ValkeyResult { return ValkeyResult{ err: errors.New("unexpected call"), } }, } client, err := newClusterClient( &ClientOption{ InitAddress: []string{"127.0.0.1:0"}, SendToReplicas: func(cmd Completed) bool { return true }, }, func(dst string, opt *ClientOption) conn { if dst == "127.0.0.1:0" || dst == "127.0.2.1:0" { if opt.ReplicaOnly { t.Fatalf("unexpected replicaOnly option in primary node") } return primaryNodeConn } else { if !opt.ReplicaOnly { t.Fatalf("unexpected replicaOnly option in replica node") } return replicaNodeConn } }, newRetryer(defaultRetryDelayFn), ) if err != nil { t.Fatalf("unexpected err %v", err) } if client.pslots[0] != client.conns["127.0.0.1:0"].conn { t.Fatalf("unexpected node assigned to pslot 0") } if client.pslots[8192] != client.conns["127.0.0.1:0"].conn { t.Fatalf("unexpected node assigned to pslot 8192") } if client.pslots[8193] != client.conns["127.0.2.1:0"].conn { t.Fatalf("unexpected node assigned to pslot 8193") } if client.pslots[16383] != client.conns["127.0.2.1:0"].conn { t.Fatalf("unexpected node assigned to pslot 16383") } if client.rslots[0] != client.conns["127.0.1.1:1"].conn { t.Fatalf("unexpected node assigned to rslot 0") } if client.rslots[8192] != client.conns["127.0.1.1:1"].conn { t.Fatalf("unexpected node assigned to rslot 8192") } if client.rslots[8193] != client.conns["127.0.3.1:1"].conn { t.Fatalf("unexpected node assigned to rslot 8193") } if client.rslots[16383] != client.conns["127.0.3.1:1"].conn { t.Fatalf("unexpected node assigned to rslot 16383") } }) t.Run("Negative ShardRefreshInterval", func(t *testing.T) { _, err := newClusterClient( &ClientOption{ InitAddress: []string{"127.0.0.1:0"}, ClusterOption: ClusterOption{ ShardsRefreshInterval: -1 * time.Millisecond, }, }, func(dst string, opt *ClientOption) conn { return &mockConn{ DoFn: func(cmd Completed) ValkeyResult { return singleSlotResp }, } }, newRetryer(defaultRetryDelayFn), ) if !errors.Is(err, ErrInvalidShardsRefreshInterval) { t.Fatalf("unexpected err %v", err) } }) t.Run("Refresh cluster which has only primary node per shard with ReplicaSelector option", func(t *testing.T) { m := &mockConn{ DoFn: func(cmd Completed) ValkeyResult { if strings.Join(cmd.Commands(), " ") == "CLUSTER SLOTS" { return slotsMultiRespWithoutReplicas } return ValkeyResult{} }, } client, err := newClusterClient( &ClientOption{ InitAddress: []string{"127.0.0.1:0"}, SendToReplicas: func(cmd Completed) bool { return true }, ReplicaSelector: func(slot uint16, replicas []ReplicaInfo) int { return 0 }, }, func(dst string, opt *ClientOption) conn { copiedM := *m return &copiedM }, newRetryer(defaultRetryDelayFn), ) if err != nil { t.Fatalf("unexpected err %v", err) } if client.pslots[0] != client.conns["127.0.0.1:0"].conn { t.Fatalf("unexpected node assigned to pslot 0") } if client.pslots[8192] != client.conns["127.0.0.1:0"].conn { t.Fatalf("unexpected node assigned to pslot 8192") } if client.pslots[8193] != client.conns["127.0.1.1:0"].conn { t.Fatalf("unexpected node assigned to pslot 8193") } if client.pslots[16383] != client.conns["127.0.1.1:0"].conn { t.Fatalf("unexpected node assigned to pslot 16383") } if client.rslots[0] != client.conns["127.0.0.1:0"].conn { t.Fatalf("unexpected node assigned to rslot 0") } if client.rslots[8192] != client.conns["127.0.0.1:0"].conn { t.Fatalf("unexpected node assigned to rslot 8192") } if client.rslots[8193] != client.conns["127.0.1.1:0"].conn { t.Fatalf("unexpected node assigned to rslot 8193") } if client.rslots[16383] != client.conns["127.0.1.1:0"].conn { t.Fatalf("unexpected node assigned to rslot 16383") } }) t.Run("Refresh cluster which has multi replicas per shard with ReplicaSelector option. Returned index is within range", func(t *testing.T) { primaryNodeConn := &mockConn{ DoFn: func(cmd Completed) ValkeyResult { if strings.Join(cmd.Commands(), " ") == "CLUSTER SLOTS" { return slotsMultiRespWithMultiReplicas } return ValkeyResult{ err: errors.New("unexpected call"), } }, } replicaNodeConn1 := &mockConn{ DoFn: func(cmd Completed) ValkeyResult { return ValkeyResult{ err: errors.New("unexpected call"), } }, } replicaNodeConn2 := &mockConn{ DoFn: func(cmd Completed) ValkeyResult { return ValkeyResult{ err: errors.New("unexpected call"), } }, } replicaNodeConn3 := &mockConn{ DoFn: func(cmd Completed) ValkeyResult { return ValkeyResult{ err: errors.New("unexpected call"), } }, } client, err := newClusterClient( &ClientOption{ InitAddress: []string{"127.0.0.1:0"}, SendToReplicas: func(cmd Completed) bool { return true }, ReplicaSelector: func(slot uint16, replicas []ReplicaInfo) int { return 1 }, }, func(dst string, opt *ClientOption) conn { switch { case dst == "127.0.0.2:1" || dst == "127.0.1.2:1": return replicaNodeConn1 case dst == "127.0.0.3:2" || dst == "127.0.1.3:2": return replicaNodeConn2 case dst == "127.0.0.4:3" || dst == "127.0.1.4:3": return replicaNodeConn3 default: return primaryNodeConn } }, newRetryer(defaultRetryDelayFn), ) if err != nil { t.Fatalf("unexpected err %v", err) } if client.pslots[0] != primaryNodeConn { t.Fatalf("unexpected node assigned to pslot 0") } if client.pslots[8192] != primaryNodeConn { t.Fatalf("unexpected node assigned to pslot 8192") } if client.pslots[8193] != primaryNodeConn { t.Fatalf("unexpected node assigned to pslot 8193") } if client.pslots[16383] != primaryNodeConn { t.Fatalf("unexpected node assigned to pslot 16383") } if client.rslots[0] != replicaNodeConn2 { t.Fatalf("unexpected node assigned to rslot 0") } if client.rslots[8192] != replicaNodeConn2 { t.Fatalf("unexpected node assigned to rslot 8192") } if client.rslots[8193] != replicaNodeConn2 { t.Fatalf("unexpected node assigned to rslot 8193") } if client.rslots[16383] != replicaNodeConn2 { t.Fatalf("unexpected node assigned to rslot 16383") } }) t.Run("Refresh cluster which has multi replicas per shard with ReplicaSelector option. Returned index is out of range", func(t *testing.T) { primaryNodeConn := &mockConn{ DoFn: func(cmd Completed) ValkeyResult { if strings.Join(cmd.Commands(), " ") == "CLUSTER SLOTS" { return slotsMultiRespWithMultiReplicas } return ValkeyResult{ err: errors.New("unexpected call"), } }, } replicaNodeConn1 := &mockConn{ DoFn: func(cmd Completed) ValkeyResult { return ValkeyResult{ err: errors.New("unexpected call"), } }, } replicaNodeConn2 := &mockConn{ DoFn: func(cmd Completed) ValkeyResult { return ValkeyResult{ err: errors.New("unexpected call"), } }, } replicaNodeConn3 := &mockConn{ DoFn: func(cmd Completed) ValkeyResult { return ValkeyResult{ err: errors.New("unexpected call"), } }, } client, err := newClusterClient( &ClientOption{ InitAddress: []string{"127.0.0.1:0"}, SendToReplicas: func(cmd Completed) bool { return true }, ReplicaSelector: func(slot uint16, replicas []ReplicaInfo) int { return -1 }, }, func(dst string, opt *ClientOption) conn { switch { case dst == "127.0.0.2:1" || dst == "127.0.1.2:1": return replicaNodeConn1 case dst == "127.0.0.3:2" || dst == "127.0.1.3:2": return replicaNodeConn2 case dst == "127.0.0.4:3" || dst == "127.0.1.4:3": return replicaNodeConn3 default: return primaryNodeConn } }, newRetryer(defaultRetryDelayFn), ) if err != nil { t.Fatalf("unexpected err %v", err) } if client.pslots[0] != primaryNodeConn { t.Fatalf("unexpected node assigned to pslot 0") } if client.pslots[8192] != primaryNodeConn { t.Fatalf("unexpected node assigned to pslot 8192") } if client.pslots[8193] != primaryNodeConn { t.Fatalf("unexpected node assigned to pslot 8193") } if client.pslots[16383] != primaryNodeConn { t.Fatalf("unexpected node assigned to pslot 16383") } if client.rslots[0] != primaryNodeConn { t.Fatalf("unexpected node assigned to rslot 0") } if client.rslots[8192] != primaryNodeConn { t.Fatalf("unexpected node assigned to rslot 8192") } if client.rslots[8193] != primaryNodeConn { t.Fatalf("unexpected node assigned to rslot 8193") } if client.rslots[16383] != primaryNodeConn { t.Fatalf("unexpected node assigned to rslot 16383") } }) t.Run("Refresh cluster which has multi replicas with az", func(t *testing.T) { primaryNodeConn := &mockConn{ DoFn: func(cmd Completed) ValkeyResult { if strings.Join(cmd.Commands(), " ") == "CLUSTER SLOTS" { return slotsMultiRespWithMultiReplicas } return ValkeyResult{ err: errors.New("unexpected call"), } }, AZFn: func() string { return "us-west-1a" }, } replicaNodeConn1 := &mockConn{ DoFn: func(cmd Completed) ValkeyResult { return ValkeyResult{ err: errors.New("unexpected call"), } }, AZFn: func() string { return "us-west-1a" }, } replicaNodeConn2 := &mockConn{ DoFn: func(cmd Completed) ValkeyResult { return ValkeyResult{ err: errors.New("unexpected call"), } }, AZFn: func() string { return "us-west-1b" }, } replicaNodeConn3 := &mockConn{ DoFn: func(cmd Completed) ValkeyResult { return ValkeyResult{ err: errors.New("unexpected call"), } }, AZFn: func() string { return "us-west-1c" }, } client, err := newClusterClient( &ClientOption{ InitAddress: []string{"127.0.0.1:0"}, SendToReplicas: func(cmd Completed) bool { return true }, ReplicaSelector: func(slot uint16, replicas []ReplicaInfo) int { for i, replica := range replicas { if replica.AZ == "us-west-1b" { return i } } return -1 }, EnableReplicaAZInfo: true, }, func(dst string, opt *ClientOption) conn { switch { case dst == "127.0.0.2:1" || dst == "127.0.1.2:1": return replicaNodeConn1 case dst == "127.0.0.3:2" || dst == "127.0.1.3:2": return replicaNodeConn2 case dst == "127.0.0.4:3" || dst == "127.0.1.4:3": return replicaNodeConn3 default: return primaryNodeConn } }, newRetryer(defaultRetryDelayFn), ) if err != nil { t.Fatalf("unexpected err %v", err) } if client.pslots[0] != primaryNodeConn { t.Fatalf("unexpected node assigned to pslot 0") } if client.pslots[8192] != primaryNodeConn { t.Fatalf("unexpected node assigned to pslot 8192") } if client.pslots[8193] != primaryNodeConn { t.Fatalf("unexpected node assigned to pslot 8193") } if client.pslots[16383] != primaryNodeConn { t.Fatalf("unexpected node assigned to pslot 16383") } if client.rslots[0] != replicaNodeConn2 { t.Fatalf("unexpected node assigned to rslot 0") } if client.rslots[8192] != replicaNodeConn2 { t.Fatalf("unexpected node assigned to rslot 8192") } if client.rslots[8193] != replicaNodeConn2 { t.Fatalf("unexpected node assigned to rslot 8193") } if client.rslots[16383] != replicaNodeConn2 { t.Fatalf("unexpected node assigned to rslot 16383") } }) } //gocyclo:ignore func TestClusterClient(t *testing.T) { defer ShouldNotLeaked(SetupLeakDetection()) m := &mockConn{ DoFn: func(cmd Completed) ValkeyResult { if strings.Join(cmd.Commands(), " ") == "CLUSTER SLOTS" { return slotsMultiResp } return ValkeyResult{} }, DoStreamFn: func(cmd Completed) ValkeyResultStream { return ValkeyResultStream{e: errors.New(cmd.Commands()[1])} }, DoMultiFn: func(multi ...Completed) *valkeyresults { resps := make([]ValkeyResult, len(multi)) for i, cmd := range multi { resps[i] = newResult(ValkeyMessage{typ: '+', string: strings.Join(cmd.Commands(), " ")}, nil) } return &valkeyresults{s: resps} }, DoMultiStreamFn: func(cmd ...Completed) MultiValkeyResultStream { return MultiValkeyResultStream{e: errors.New(cmd[0].Commands()[1])} }, DoMultiCacheFn: func(multi ...CacheableTTL) *valkeyresults { resps := make([]ValkeyResult, len(multi)) for i, cmd := range multi { resps[i] = newResult(ValkeyMessage{typ: '+', string: strings.Join(cmd.Cmd.Commands(), " ")}, nil) } return &valkeyresults{s: resps} }, DoOverride: map[string]func(cmd Completed) ValkeyResult{ "GET Do": func(cmd Completed) ValkeyResult { return newResult(ValkeyMessage{typ: '+', string: "Do"}, nil) }, "INFO": func(cmd Completed) ValkeyResult { return newResult(ValkeyMessage{typ: '+', string: "Info"}, nil) }, }, DoCacheOverride: map[string]func(cmd Cacheable, ttl time.Duration) ValkeyResult{ "GET DoCache": func(cmd Cacheable, ttl time.Duration) ValkeyResult { return newResult(ValkeyMessage{typ: '+', string: "DoCache"}, nil) }, }, } client, err := newClusterClient( &ClientOption{InitAddress: []string{"127.0.0.1:0"}}, func(dst string, opt *ClientOption) conn { return m }, newRetryer(defaultRetryDelayFn), ) if err != nil { t.Fatalf("unexpected err %v", err) } t.Run("Nodes", func(t *testing.T) { nodes := client.Nodes() if len(nodes) != 4 || nodes["127.0.0.1:0"] == nil || nodes["127.0.1.1:1"] == nil || nodes["127.0.2.1:0"] == nil || nodes["127.0.3.1:1"] == nil { t.Fatalf("unexpected Nodes") } }) t.Run("Delegate Do with no slot", func(t *testing.T) { c := client.B().Info().Build() if v, err := client.Do(context.Background(), c).ToString(); err != nil || v != "Info" { t.Fatalf("unexpected response %v %v", v, err) } }) t.Run("Delegate Do", func(t *testing.T) { c := client.B().Get().Key("Do").Build() if v, err := client.Do(context.Background(), c).ToString(); err != nil || v != "Do" { t.Fatalf("unexpected response %v %v", v, err) } }) t.Run("Delegate DoStream", func(t *testing.T) { c := client.B().Get().Key("Do").Build() if s := client.DoStream(context.Background(), c); s.Error().Error() != "Do" { t.Fatalf("unexpected response %v", s.Error()) } }) t.Run("Delegate DoMulti Empty", func(t *testing.T) { if resps := client.DoMulti(context.Background()); resps != nil { t.Fatalf("unexpected response %v", resps) } }) t.Run("Delegate DoMultiStream Empty", func(t *testing.T) { if s := client.DoMultiStream(context.Background()); s.Error() != io.EOF { t.Fatalf("unexpected response %v", err) } }) t.Run("Delegate DoMulti Single Slot", func(t *testing.T) { c1 := client.B().Get().Key("K1{a}").Build() c2 := client.B().Get().Key("K2{a}").Build() resps := client.DoMulti(context.Background(), c1, c2) if v, err := resps[0].ToString(); err != nil || v != "GET K1{a}" { t.Fatalf("unexpected response %v %v", v, err) } if v, err := resps[1].ToString(); err != nil || v != "GET K2{a}" { t.Fatalf("unexpected response %v %v", v, err) } }) t.Run("Delegate DoMultiStream Single Slot", func(t *testing.T) { c1 := client.B().Get().Key("K1{a}").Build() c2 := client.B().Get().Key("K2{a}").Build() if s := client.DoMultiStream(context.Background(), c1, c2); s.Error().Error() != "K1{a}" { t.Fatalf("unexpected response %v", s.Error()) } }) t.Run("Delegate DoMulti Single Slot + Init Slot", func(t *testing.T) { c1 := client.B().Get().Key("K1{a}").Build() c2 := client.B().Info().Build() resps := client.DoMulti(context.Background(), c1, c2) if v, err := resps[0].ToString(); err != nil || v != "GET K1{a}" { t.Fatalf("unexpected response %v %v", v, err) } if v, err := resps[1].ToString(); err != nil || v != "INFO" { t.Fatalf("unexpected response %v %v", v, err) } }) t.Run("Delegate DoMultiStream Single Slot + Init Slot", func(t *testing.T) { c1 := client.B().Info().Section("ANY").Build() c2 := client.B().Get().Key("K1{a}").Build() if s := client.DoMultiStream(context.Background(), c1, c2); s.Error().Error() != "ANY" { t.Fatalf("unexpected response %v", s.Error()) } }) t.Run("Delegate DoMulti Cross Slot + Init Slot", func(t *testing.T) { defer func() { if err := recover(); err != panicMixCxSlot { t.Errorf("DoMulti should panic if Cross Slot + Init Slot") } }() c1 := client.B().Get().Key("K1{a}").Build() c2 := client.B().Get().Key("K1{b}").Build() c3 := client.B().Info().Build() client.DoMulti(context.Background(), c1, c2, c3) }) t.Run("Delegate DoMultiStream Cross Slot + Init Slot", func(t *testing.T) { defer func() { if err := recover(); !strings.Contains(err.(string), "across multiple slots") { t.Errorf("DoMulti should panic if Cross Slot + Init Slot") } }() c1 := client.B().Get().Key("K1{a}").Build() c2 := client.B().Get().Key("K1{b}").Build() c3 := client.B().Info().Build() client.DoMultiStream(context.Background(), c1, c2, c3) }) t.Run("Delegate DoMulti Multi Slot", func(t *testing.T) { multi := make([]Completed, 500) for i := 0; i < len(multi); i++ { multi[i] = client.B().Get().Key(fmt.Sprintf("K1{%d}", i)).Build() } resps := client.DoMulti(context.Background(), multi...) for i := 0; i < len(multi); i++ { if v, err := resps[i].ToString(); err != nil || v != fmt.Sprintf("GET K1{%d}", i) { t.Fatalf("unexpected response %v %v", v, err) } } }) t.Run("Delegate DoCache", func(t *testing.T) { c := client.B().Get().Key("DoCache").Cache() if v, err := client.DoCache(context.Background(), c, 100).ToString(); err != nil || v != "DoCache" { t.Fatalf("unexpected response %v %v", v, err) } }) t.Run("Delegate DoMultiCache Empty", func(t *testing.T) { if resps := client.DoMultiCache(context.Background()); resps != nil { t.Fatalf("unexpected response %v", resps) } }) t.Run("Delegate DoMultiCache Single Slot", func(t *testing.T) { c1 := client.B().Get().Key("K1{a}").Cache() c2 := client.B().Get().Key("K2{a}").Cache() resps := client.DoMultiCache(context.Background(), CT(c1, time.Second), CT(c2, time.Second)) if v, err := resps[0].ToString(); err != nil || v != "GET K1{a}" { t.Fatalf("unexpected response %v %v", v, err) } if v, err := resps[1].ToString(); err != nil || v != "GET K2{a}" { t.Fatalf("unexpected response %v %v", v, err) } }) t.Run("Delegate DoMultiCache Multi Slot", func(t *testing.T) { multi := make([]CacheableTTL, 500) for i := 0; i < len(multi); i++ { multi[i] = CT(client.B().Get().Key(fmt.Sprintf("K1{%d}", i)).Cache(), time.Second) } resps := client.DoMultiCache(context.Background(), multi...) for i := 0; i < len(multi); i++ { if v, err := resps[i].ToString(); err != nil || v != fmt.Sprintf("GET K1{%d}", i) { t.Fatalf("unexpected response %v %v", v, err) } } }) t.Run("Delegate Receive", func(t *testing.T) { c := client.B().Subscribe().Channel("ch").Build() hdl := func(message PubSubMessage) {} m.ReceiveFn = func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error { if !reflect.DeepEqual(subscribe.Commands(), c.Commands()) { t.Fatalf("unexpected command %v", subscribe) } return nil } if err := client.Receive(context.Background(), c, hdl); err != nil { t.Fatalf("unexpected response %v", err) } }) t.Run("Delegate Receive Valkey Err", func(t *testing.T) { c := client.B().Subscribe().Channel("ch").Build() e := &ValkeyError{} m.ReceiveFn = func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error { return e } if err := client.Receive(context.Background(), c, func(message PubSubMessage) {}); err != e { t.Fatalf("unexpected response %v", err) } }) t.Run("Delegate Close", func(t *testing.T) { once := sync.Once{} called := make(chan struct{}) m.CloseFn = func() { once.Do(func() { close(called) }) } client.Close() <-called select { case _, ok := <-client.stopCh: if ok { t.Fatalf("stopCh should be closed") } } }) t.Run("Dedicated Err, but no retry", func(t *testing.T) { v := errors.New("fn err") if err := client.Dedicated(func(client DedicatedClient) error { return v }); err != v { t.Fatalf("unexpected err %v", err) } }) t.Run("Dedicated Cross Slot Err", func(t *testing.T) { defer func() { if err := recover(); err != panicMsgCxSlot { t.Errorf("Dedicated should panic if cross slots is used") } }() m.AcquireFn = func() wire { return &mockWire{} } client.Dedicated(func(c DedicatedClient) error { c.Do(context.Background(), c.B().Get().Key("a").Build()).Error() return c.Do(context.Background(), c.B().Get().Key("b").Build()).Error() }) }) t.Run("Dedicated Cross Slot Err Multi", func(t *testing.T) { defer func() { if err := recover(); err != panicMsgCxSlot { t.Errorf("Dedicated should panic if cross slots is used") } }() m.AcquireFn = func() wire { return &mockWire{ DoMultiFn: func(multi ...Completed) *valkeyresults { return &valkeyresults{s: []ValkeyResult{ newResult(ValkeyMessage{typ: '+', string: "OK"}, nil), newResult(ValkeyMessage{typ: '+', string: "OK"}, nil), newResult(ValkeyMessage{typ: '*', values: []ValkeyMessage{{typ: '+', string: "a"}}}, nil), }} }, } } client.Dedicated(func(c DedicatedClient) (err error) { c.DoMulti( context.Background(), c.B().Multi().Build(), c.B().Get().Key("a").Build(), c.B().Exec().Build(), ) c.DoMulti( context.Background(), c.B().Multi().Build(), c.B().Get().Key("b").Build(), c.B().Exec().Build(), ) return nil }) }) t.Run("Dedicated Multi Cross Slot Err", func(t *testing.T) { m.AcquireFn = func() wire { return &mockWire{} } err := client.Dedicated(func(c DedicatedClient) (err error) { defer func() { err = errors.New(recover().(string)) }() c.DoMulti( context.Background(), c.B().Get().Key("a").Build(), c.B().Get().Key("b").Build(), ) return nil }) if err == nil || err.Error() != panicMsgCxSlot { t.Errorf("Multi should panic if cross slots is used") } }) t.Run("Dedicated Delegate Receive Valkey Err", func(t *testing.T) { e := &ValkeyError{} w := &mockWire{ ReceiveFn: func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error { return e }, } m.AcquireFn = func() wire { return w } if err := client.Dedicated(func(c DedicatedClient) error { return c.Receive(context.Background(), c.B().Subscribe().Channel("a").Build(), func(msg PubSubMessage) {}) }); err != e { t.Fatalf("unexpected err %v", err) } }) t.Run("Dedicated Delegate", func(t *testing.T) { closed := false w := &mockWire{ DoFn: func(cmd Completed) ValkeyResult { return newResult(ValkeyMessage{typ: '+', string: "Delegate"}, nil) }, DoMultiFn: func(cmd ...Completed) *valkeyresults { if len(cmd) == 4 { return &valkeyresults{s: []ValkeyResult{ newResult(ValkeyMessage{typ: '+', string: "OK"}, nil), newResult(ValkeyMessage{typ: '+', string: "OK"}, nil), newResult(ValkeyMessage{typ: '+', string: "OK"}, nil), newResult(ValkeyMessage{typ: '*', values: []ValkeyMessage{ {typ: '+', string: "Delegate0"}, {typ: '+', string: "Delegate1"}, }}, nil), }} } return &valkeyresults{s: []ValkeyResult{ newResult(ValkeyMessage{typ: '+', string: "Delegate0"}, nil), newResult(ValkeyMessage{typ: '+', string: "Delegate1"}, nil), }} }, ReceiveFn: func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error { return ErrClosing }, SetPubSubHooksFn: func(hooks PubSubHooks) <-chan error { ch := make(chan error, 1) ch <- ErrClosing close(ch) return ch }, ErrorFn: func() error { return ErrClosing }, CloseFn: func() { closed = true }, } m.AcquireFn = func() wire { return w } stored := false m.StoreFn = func(ww wire) { if ww != w { t.Fatalf("received unexpected wire %v", ww) } stored = true } if err := client.Dedicated(func(c DedicatedClient) error { ch := c.SetPubSubHooks(PubSubHooks{OnMessage: func(m PubSubMessage) {}}) if v, err := c.Do(context.Background(), c.B().Get().Key("a").Build()).ToString(); err != nil || v != "Delegate" { t.Fatalf("unexpected response %v %v", v, err) } if v := c.DoMulti(context.Background()); len(v) != 0 { t.Fatalf("received unexpected response %v", v) } for i, resp := range c.DoMulti( context.Background(), c.B().Info().Build(), c.B().Info().Build(), ) { if v, err := resp.ToString(); err != nil || v != "Delegate"+strconv.Itoa(i) { t.Fatalf("unexpected response %v %v", v, err) } } for i, resp := range c.DoMulti( context.Background(), c.B().Get().Key("a").Build(), c.B().Get().Key("a").Build(), ) { if v, err := resp.ToString(); err != nil || v != "Delegate"+strconv.Itoa(i) { t.Fatalf("unexpected response %v %v", v, err) } } for i, resp := range c.DoMulti( context.Background(), c.B().Multi().Build(), c.B().Get().Key("a").Build(), c.B().Get().Key("a").Build(), c.B().Exec().Build(), )[3].val.values { if v, err := resp.ToString(); err != nil || v != "Delegate"+strconv.Itoa(i) { t.Fatalf("unexpected response %v %v", v, err) } } if err := c.Receive(context.Background(), c.B().Ssubscribe().Channel("a").Build(), func(msg PubSubMessage) {}); err != ErrClosing { t.Fatalf("unexpected ret %v", err) } if err := <-ch; err != ErrClosing { t.Fatalf("unexpected ret %v", err) } if err := <-c.SetPubSubHooks(PubSubHooks{OnMessage: func(m PubSubMessage) {}}); err != ErrClosing { t.Fatalf("unexpected ret %v", err) } c.Close() return nil }); err != nil { t.Fatalf("unexpected err %v", err) } if !stored { t.Fatalf("Dedicated desn't put back the wire") } if !closed { t.Fatalf("Dedicated desn't delegate Close") } }) t.Run("Dedicated Delegate", func(t *testing.T) { closed := false w := &mockWire{ DoFn: func(cmd Completed) ValkeyResult { return newResult(ValkeyMessage{typ: '+', string: "Delegate"}, nil) }, DoMultiFn: func(cmd ...Completed) *valkeyresults { if len(cmd) == 4 { return &valkeyresults{s: []ValkeyResult{ newResult(ValkeyMessage{typ: '+', string: "OK"}, nil), newResult(ValkeyMessage{typ: '+', string: "OK"}, nil), newResult(ValkeyMessage{typ: '+', string: "OK"}, nil), newResult(ValkeyMessage{typ: '*', values: []ValkeyMessage{ {typ: '+', string: "Delegate0"}, {typ: '+', string: "Delegate1"}, }}, nil), }} } return &valkeyresults{s: []ValkeyResult{ newResult(ValkeyMessage{typ: '+', string: "Delegate0"}, nil), newResult(ValkeyMessage{typ: '+', string: "Delegate1"}, nil), }} }, ReceiveFn: func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error { return ErrClosing }, SetPubSubHooksFn: func(hooks PubSubHooks) <-chan error { ch := make(chan error, 1) ch <- ErrClosing close(ch) return ch }, ErrorFn: func() error { return ErrClosing }, CloseFn: func() { closed = true }, } m.AcquireFn = func() wire { return w } stored := false m.StoreFn = func(ww wire) { if ww != w { t.Fatalf("received unexpected wire %v", ww) } stored = true } c, cancel := client.Dedicate() ch := c.SetPubSubHooks(PubSubHooks{OnMessage: func(m PubSubMessage) {}}) if v, err := c.Do(context.Background(), c.B().Get().Key("a").Build()).ToString(); err != nil || v != "Delegate" { t.Fatalf("unexpected response %v %v", v, err) } if v := c.DoMulti(context.Background()); len(v) != 0 { t.Fatalf("received unexpected response %v", v) } for i, resp := range c.DoMulti( context.Background(), c.B().Info().Build(), c.B().Info().Build(), ) { if v, err := resp.ToString(); err != nil || v != "Delegate"+strconv.Itoa(i) { t.Fatalf("unexpected response %v %v", v, err) } } for i, resp := range c.DoMulti( context.Background(), c.B().Get().Key("a").Build(), c.B().Get().Key("a").Build(), ) { if v, err := resp.ToString(); err != nil || v != "Delegate"+strconv.Itoa(i) { t.Fatalf("unexpected response %v %v", v, err) } } for i, resp := range c.DoMulti( context.Background(), c.B().Multi().Build(), c.B().Get().Key("a").Build(), c.B().Get().Key("a").Build(), c.B().Exec().Build(), )[3].val.values { if v, err := resp.ToString(); err != nil || v != "Delegate"+strconv.Itoa(i) { t.Fatalf("unexpected response %v %v", v, err) } } if err := c.Receive(context.Background(), c.B().Ssubscribe().Channel("a").Build(), func(msg PubSubMessage) {}); err != ErrClosing { t.Fatalf("unexpected ret %v", err) } if err := <-ch; err != ErrClosing { t.Fatalf("unexpected ret %v", err) } if err := <-c.SetPubSubHooks(PubSubHooks{OnMessage: func(m PubSubMessage) {}}); err != ErrClosing { t.Fatalf("unexpected ret %v", err) } c.Close() cancel() if !stored { t.Fatalf("Dedicated desn't put back the wire") } if !closed { t.Fatalf("Dedicated desn't delegate Close") } }) t.Run("Dedicate Delegate Release On Close", func(t *testing.T) { stored := 0 w := &mockWire{} m.AcquireFn = func() wire { return w } m.StoreFn = func(ww wire) { stored++ } c, _ := client.Dedicate() c.Do(context.Background(), c.B().Get().Key("a").Build()) c.Close() if stored != 1 { t.Fatalf("unexpected stored count %v", stored) } }) t.Run("Dedicate Delegate No Duplicate Release", func(t *testing.T) { stored := 0 w := &mockWire{} m.AcquireFn = func() wire { return w } m.StoreFn = func(ww wire) { stored++ } c, cancel := client.Dedicate() c.Do(context.Background(), c.B().Get().Key("a").Build()) c.Close() c.Close() // should have no effect cancel() // should have no effect cancel() // should have no effect if stored != 1 { t.Fatalf("unexpected stored count %v", stored) } }) t.Run("Dedicated SetPubSubHooks Released", func(t *testing.T) { c, cancel := client.Dedicate() ch1 := c.SetPubSubHooks(PubSubHooks{OnMessage: func(m PubSubMessage) {}}) ch2 := c.SetPubSubHooks(PubSubHooks{OnMessage: func(m PubSubMessage) {}}) <-ch1 cancel() <-ch2 }) t.Run("Dedicated SetPubSubHooks Close", func(t *testing.T) { c, cancel := client.Dedicate() defer cancel() ch := c.SetPubSubHooks(PubSubHooks{OnMessage: func(m PubSubMessage) {}}) c.Close() if err := <-ch; err != ErrClosing { t.Fatalf("unexpected ret %v", ch) } }) t.Run("Dedicated SetPubSubHooks Released", func(t *testing.T) { c, cancel := client.Dedicate() defer cancel() if ch := c.SetPubSubHooks(PubSubHooks{}); ch != nil { t.Fatalf("unexpected ret %v", ch) } }) t.Run("Dedicate ErrDedicatedClientRecycled after released", func(t *testing.T) { check := func(err error) { if !errors.Is(err, ErrDedicatedClientRecycled) { t.Fatalf("unexpected err %v", err) } } for _, closeFn := range []func(client DedicatedClient, cancel func()){ func(client DedicatedClient, cancel func()) { client.Close() }, func(client DedicatedClient, cancel func()) { cancel() }, } { c, cancel := client.Dedicate() closeFn(c, cancel) for _, fn := range []func(){ func() { resp := c.Do(context.Background(), c.B().Get().Key("k").Build()) check(resp.Error()) }, func() { resp := c.DoMulti(context.Background(), c.B().Get().Key("k").Build()) for _, r := range resp { check(r.Error()) } }, func() { err := c.Receive(context.Background(), c.B().Subscribe().Channel("k").Build(), func(msg PubSubMessage) {}) check(err) }, func() { ch := c.SetPubSubHooks(PubSubHooks{}) check(<-ch) }, } { fn() } } }) } //gocyclo:ignore func TestClusterClient_SendToOnlyPrimaryNodes(t *testing.T) { defer ShouldNotLeaked(SetupLeakDetection()) primaryNodeConn := &mockConn{ DoOverride: map[string]func(cmd Completed) ValkeyResult{ "CLUSTER SLOTS": func(cmd Completed) ValkeyResult { return slotsMultiResp }, "GET Do": func(cmd Completed) ValkeyResult { return newResult(ValkeyMessage{typ: '+', string: "GET Do"}, nil) }, "GET K1{a}": func(cmd Completed) ValkeyResult { return newResult(ValkeyMessage{typ: '+', string: "GET K1{a}"}, nil) }, "GET K2{a}": func(cmd Completed) ValkeyResult { return newResult(ValkeyMessage{typ: '+', string: "GET K2{a}"}, nil) }, "INFO": func(cmd Completed) ValkeyResult { return newResult(ValkeyMessage{typ: '+', string: "INFO"}, nil) }, }, DoMultiFn: func(multi ...Completed) *valkeyresults { resps := make([]ValkeyResult, len(multi)) for i, cmd := range multi { resps[i] = newResult(ValkeyMessage{typ: '+', string: strings.Join(cmd.Commands(), " ")}, nil) } return &valkeyresults{s: resps} }, DoCacheOverride: map[string]func(cmd Cacheable, ttl time.Duration) ValkeyResult{ "GET DoCache": func(cmd Cacheable, ttl time.Duration) ValkeyResult { return newResult(ValkeyMessage{typ: '+', string: "GET DoCache"}, nil) }, "GET K1{a}": func(cmd Cacheable, ttl time.Duration) ValkeyResult { return newResult(ValkeyMessage{typ: '+', string: "GET K1{a}"}, nil) }, "GET K2{a}": func(cmd Cacheable, ttl time.Duration) ValkeyResult { return newResult(ValkeyMessage{typ: '+', string: "GET K2{a}"}, nil) }, }, DoMultiCacheFn: func(multi ...CacheableTTL) *valkeyresults { resps := make([]ValkeyResult, len(multi)) for i, cmd := range multi { resps[i] = newResult(ValkeyMessage{typ: '+', string: strings.Join(cmd.Cmd.Commands(), " ")}, nil) } return &valkeyresults{s: resps} }, } replicaNodeConn := &mockConn{} client, err := newClusterClient( &ClientOption{ InitAddress: []string{"127.0.0.1:0"}, SendToReplicas: func(cmd Completed) bool { return false }, }, func(dst string, opt *ClientOption) conn { if dst == "127.0.0.1:0" || dst == "127.0.2.1:0" { // primary nodes return primaryNodeConn } else { // replica nodes return replicaNodeConn } }, newRetryer(defaultRetryDelayFn), ) if err != nil { t.Fatalf("unexpected err %v", err) } t.Run("Do", func(t *testing.T) { c := client.B().Get().Key("Do").Build() if v, err := client.Do(context.Background(), c).ToString(); err != nil || v != "GET Do" { t.Fatalf("unexpected response %v %v", v, err) } }) t.Run("DoMulti Single Slot", func(t *testing.T) { c1 := client.B().Get().Key("K1{a}").Build() c2 := client.B().Get().Key("K2{a}").Build() resps := client.DoMulti(context.Background(), c1, c2) if v, err := resps[0].ToString(); err != nil || v != "GET K1{a}" { t.Fatalf("unexpected response %v %v", v, err) } if v, err := resps[1].ToString(); err != nil || v != "GET K2{a}" { t.Fatalf("unexpected response %v %v", v, err) } }) t.Run("DoMulti Single Slot + Init Slot", func(t *testing.T) { c1 := client.B().Get().Key("K1{a}").Build() c2 := client.B().Info().Build() resps := client.DoMulti(context.Background(), c1, c2) if v, err := resps[0].ToString(); err != nil || v != "GET K1{a}" { t.Fatalf("unexpected response %v %v", v, err) } if v, err := resps[1].ToString(); err != nil || v != "INFO" { t.Fatalf("unexpected response %v %v", v, err) } }) t.Run("DoMulti Cross Slot + Init Slot", func(t *testing.T) { defer func() { if err := recover(); err != panicMixCxSlot { t.Errorf("DoMulti should panic if Cross Slot + Init Slot") } }() c1 := client.B().Get().Key("K1{a}").Build() c2 := client.B().Get().Key("K1{b}").Build() c3 := client.B().Info().Build() client.DoMulti(context.Background(), c1, c2, c3) }) t.Run("DoMulti Multi Slot", func(t *testing.T) { multi := make([]Completed, 500) for i := 0; i < len(multi); i++ { multi[i] = client.B().Get().Key(fmt.Sprintf("K1{%d}", i)).Build() } resps := client.DoMulti(context.Background(), multi...) for i := 0; i < len(multi); i++ { if v, err := resps[i].ToString(); err != nil || v != fmt.Sprintf("GET K1{%d}", i) { t.Fatalf("unexpected response %v %v", v, err) } } }) t.Run("DoCache", func(t *testing.T) { c := client.B().Get().Key("DoCache").Cache() if v, err := client.DoCache(context.Background(), c, 100).ToString(); err != nil || v != "GET DoCache" { t.Fatalf("unexpected response %v %v", v, err) } }) t.Run("DoMultiCache Single Slot", func(t *testing.T) { c1 := client.B().Get().Key("K1{a}").Cache() c2 := client.B().Get().Key("K2{a}").Cache() resps := client.DoMultiCache(context.Background(), CT(c1, time.Second), CT(c2, time.Second)) if v, err := resps[0].ToString(); err != nil || v != "GET K1{a}" { t.Fatalf("unexpected response %v %v", v, err) } if v, err := resps[1].ToString(); err != nil || v != "GET K2{a}" { t.Fatalf("unexpected response %v %v", v, err) } }) t.Run("DoMultiCache Multi Slot", func(t *testing.T) { multi := make([]CacheableTTL, 500) for i := 0; i < len(multi); i++ { multi[i] = CT(client.B().Get().Key(fmt.Sprintf("K1{%d}", i)).Cache(), time.Second) } resps := client.DoMultiCache(context.Background(), multi...) for i := 0; i < len(multi); i++ { if v, err := resps[i].ToString(); err != nil || v != fmt.Sprintf("GET K1{%d}", i) { t.Fatalf("unexpected response %v %v", v, err) } } }) t.Run("Receive", func(t *testing.T) { c := client.B().Subscribe().Channel("ch").Build() hdl := func(message PubSubMessage) {} primaryNodeConn.ReceiveFn = func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error { if !reflect.DeepEqual(subscribe.Commands(), c.Commands()) { t.Fatalf("unexpected command %v", subscribe) } return nil } replicaNodeConn.ReceiveFn = func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error { if !reflect.DeepEqual(subscribe.Commands(), c.Commands()) { t.Fatalf("unexpected command %v", subscribe) } return nil } if err := client.Receive(context.Background(), c, hdl); err != nil { t.Fatalf("unexpected response %v", err) } }) t.Run("Receive Valkey Err", func(t *testing.T) { c := client.B().Subscribe().Channel("ch").Build() e := &ValkeyError{} primaryNodeConn.ReceiveFn = func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error { return e } replicaNodeConn.ReceiveFn = func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error { return e } if err := client.Receive(context.Background(), c, func(message PubSubMessage) {}); err != e { t.Fatalf("unexpected response %v", err) } }) t.Run("Dedicated Err", func(t *testing.T) { v := errors.New("fn err") if err := client.Dedicated(func(client DedicatedClient) error { return v }); err != v { t.Fatalf("unexpected err %v", err) } }) t.Run("Dedicated Cross Slot Err", func(t *testing.T) { defer func() { if err := recover(); err != panicMsgCxSlot { t.Errorf("Dedicated should panic if cross slots is used") } }() primaryNodeConn.AcquireFn = func() wire { return &mockWire{} } client.Dedicated(func(c DedicatedClient) error { c.Do(context.Background(), c.B().Get().Key("a").Build()).Error() return c.Do(context.Background(), c.B().Get().Key("b").Build()).Error() }) }) t.Run("Dedicated Cross Slot Err Multi", func(t *testing.T) { defer func() { if err := recover(); err != panicMsgCxSlot { t.Errorf("Dedicated should panic if cross slots is used") } }() primaryNodeConn.AcquireFn = func() wire { return &mockWire{ DoMultiFn: func(multi ...Completed) *valkeyresults { return &valkeyresults{s: []ValkeyResult{ newResult(ValkeyMessage{typ: '+', string: "OK"}, nil), newResult(ValkeyMessage{typ: '+', string: "OK"}, nil), newResult(ValkeyMessage{typ: '*', values: []ValkeyMessage{{typ: '+', string: "a"}}}, nil), }} }, } } client.Dedicated(func(c DedicatedClient) (err error) { c.DoMulti( context.Background(), c.B().Multi().Build(), c.B().Get().Key("a").Build(), c.B().Exec().Build(), ) c.DoMulti( context.Background(), c.B().Multi().Build(), c.B().Get().Key("b").Build(), c.B().Exec().Build(), ) return nil }) }) t.Run("Dedicated Multi Cross Slot Err", func(t *testing.T) { primaryNodeConn.AcquireFn = func() wire { return &mockWire{} } err := client.Dedicated(func(c DedicatedClient) (err error) { defer func() { err = errors.New(recover().(string)) }() c.DoMulti( context.Background(), c.B().Get().Key("a").Build(), c.B().Get().Key("b").Build(), ) return nil }) if err == nil || err.Error() != panicMsgCxSlot { t.Errorf("Multi should panic if cross slots is used") } }) t.Run("Dedicated Receive Valkey Err", func(t *testing.T) { e := &ValkeyError{} w := &mockWire{ ReceiveFn: func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error { return e }, } primaryNodeConn.AcquireFn = func() wire { return w } if err := client.Dedicated(func(c DedicatedClient) error { return c.Receive(context.Background(), c.B().Ssubscribe().Channel("a").Build(), func(msg PubSubMessage) {}) }); err != e { t.Fatalf("unexpected err %v", err) } }) t.Run("Dedicated", func(t *testing.T) { closed := false w := &mockWire{ DoFn: func(cmd Completed) ValkeyResult { return newResult(ValkeyMessage{typ: '+', string: "Delegate"}, nil) }, DoMultiFn: func(cmd ...Completed) *valkeyresults { if len(cmd) == 4 { return &valkeyresults{s: []ValkeyResult{ newResult(ValkeyMessage{typ: '+', string: "OK"}, nil), newResult(ValkeyMessage{typ: '+', string: "OK"}, nil), newResult(ValkeyMessage{typ: '+', string: "OK"}, nil), newResult(ValkeyMessage{typ: '*', values: []ValkeyMessage{ {typ: '+', string: "Delegate0"}, {typ: '+', string: "Delegate1"}, }}, nil), }} } return &valkeyresults{s: []ValkeyResult{ newResult(ValkeyMessage{typ: '+', string: "Delegate0"}, nil), newResult(ValkeyMessage{typ: '+', string: "Delegate1"}, nil), }} }, ReceiveFn: func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error { return ErrClosing }, SetPubSubHooksFn: func(hooks PubSubHooks) <-chan error { ch := make(chan error, 1) ch <- ErrClosing close(ch) return ch }, ErrorFn: func() error { return ErrClosing }, CloseFn: func() { closed = true }, } primaryNodeConn.AcquireFn = func() wire { return w } stored := false primaryNodeConn.StoreFn = func(ww wire) { if ww != w { t.Fatalf("received unexpected wire %v", ww) } stored = true } if err := client.Dedicated(func(c DedicatedClient) error { ch := c.SetPubSubHooks(PubSubHooks{OnMessage: func(m PubSubMessage) {}}) if v, err := c.Do(context.Background(), c.B().Get().Key("a").Build()).ToString(); err != nil || v != "Delegate" { t.Fatalf("unexpected response %v %v", v, err) } if v := c.DoMulti(context.Background()); len(v) != 0 { t.Fatalf("received unexpected response %v", v) } for i, resp := range c.DoMulti( context.Background(), c.B().Info().Build(), c.B().Info().Build(), ) { if v, err := resp.ToString(); err != nil || v != "Delegate"+strconv.Itoa(i) { t.Fatalf("unexpected response %v %v", v, err) } } for i, resp := range c.DoMulti( context.Background(), c.B().Get().Key("a").Build(), c.B().Get().Key("a").Build(), ) { if v, err := resp.ToString(); err != nil || v != "Delegate"+strconv.Itoa(i) { t.Fatalf("unexpected response %v %v", v, err) } } for i, resp := range c.DoMulti( context.Background(), c.B().Multi().Build(), c.B().Get().Key("a").Build(), c.B().Get().Key("a").Build(), c.B().Exec().Build(), )[3].val.values { if v, err := resp.ToString(); err != nil || v != "Delegate"+strconv.Itoa(i) { t.Fatalf("unexpected response %v %v", v, err) } } if err := c.Receive(context.Background(), c.B().Ssubscribe().Channel("a").Build(), func(msg PubSubMessage) {}); err != ErrClosing { t.Fatalf("unexpected ret %v", err) } if err := <-ch; err != ErrClosing { t.Fatalf("unexpected ret %v", err) } if err := <-c.SetPubSubHooks(PubSubHooks{OnMessage: func(m PubSubMessage) {}}); err != ErrClosing { t.Fatalf("unexpected ret %v", err) } c.Close() return nil }); err != nil { t.Fatalf("unexpected err %v", err) } if !stored { t.Fatalf("Dedicated desn't put back the wire") } if !closed { t.Fatalf("Dedicated desn't delegate Close") } }) t.Run("Dedicate", func(t *testing.T) { closed := false w := &mockWire{ DoFn: func(cmd Completed) ValkeyResult { return newResult(ValkeyMessage{typ: '+', string: "Delegate"}, nil) }, DoMultiFn: func(cmd ...Completed) *valkeyresults { if len(cmd) == 4 { return &valkeyresults{s: []ValkeyResult{ newResult(ValkeyMessage{typ: '+', string: "OK"}, nil), newResult(ValkeyMessage{typ: '+', string: "OK"}, nil), newResult(ValkeyMessage{typ: '+', string: "OK"}, nil), newResult(ValkeyMessage{typ: '*', values: []ValkeyMessage{ {typ: '+', string: "Delegate0"}, {typ: '+', string: "Delegate1"}, }}, nil), }} } return &valkeyresults{s: []ValkeyResult{ newResult(ValkeyMessage{typ: '+', string: "Delegate0"}, nil), newResult(ValkeyMessage{typ: '+', string: "Delegate1"}, nil), }} }, ReceiveFn: func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error { return ErrClosing }, SetPubSubHooksFn: func(hooks PubSubHooks) <-chan error { ch := make(chan error, 1) ch <- ErrClosing close(ch) return ch }, ErrorFn: func() error { return ErrClosing }, CloseFn: func() { closed = true }, } primaryNodeConn.AcquireFn = func() wire { return w } stored := false primaryNodeConn.StoreFn = func(ww wire) { if ww != w { t.Fatalf("received unexpected wire %v", ww) } stored = true } c, cancel := client.Dedicate() ch := c.SetPubSubHooks(PubSubHooks{OnMessage: func(m PubSubMessage) {}}) if v, err := c.Do(context.Background(), c.B().Get().Key("a").Build()).ToString(); err != nil || v != "Delegate" { t.Fatalf("unexpected response %v %v", v, err) } if v := c.DoMulti(context.Background()); len(v) != 0 { t.Fatalf("received unexpected response %v", v) } for i, resp := range c.DoMulti( context.Background(), c.B().Info().Build(), c.B().Info().Build(), ) { if v, err := resp.ToString(); err != nil || v != "Delegate"+strconv.Itoa(i) { t.Fatalf("unexpected response %v %v", v, err) } } for i, resp := range c.DoMulti( context.Background(), c.B().Get().Key("a").Build(), c.B().Get().Key("a").Build(), ) { if v, err := resp.ToString(); err != nil || v != "Delegate"+strconv.Itoa(i) { t.Fatalf("unexpected response %v %v", v, err) } } for i, resp := range c.DoMulti( context.Background(), c.B().Multi().Build(), c.B().Get().Key("a").Build(), c.B().Get().Key("a").Build(), c.B().Exec().Build(), )[3].val.values { if v, err := resp.ToString(); err != nil || v != "Delegate"+strconv.Itoa(i) { t.Fatalf("unexpected response %v %v", v, err) } } if err := c.Receive(context.Background(), c.B().Ssubscribe().Channel("a").Build(), func(msg PubSubMessage) {}); err != ErrClosing { t.Fatalf("unexpected ret %v", err) } if err := <-ch; err != ErrClosing { t.Fatalf("unexpected ret %v", err) } if err := <-c.SetPubSubHooks(PubSubHooks{OnMessage: func(m PubSubMessage) {}}); err != ErrClosing { t.Fatalf("unexpected ret %v", err) } c.Close() cancel() if !stored { t.Fatalf("Dedicated desn't put back the wire") } if !closed { t.Fatalf("Dedicated desn't delegate Close") } }) } //gocyclo:ignore func TestClusterClient_SendToOnlyReplicaNodes(t *testing.T) { defer ShouldNotLeaked(SetupLeakDetection()) primaryNodeConn := &mockConn{ DoOverride: map[string]func(cmd Completed) ValkeyResult{ "CLUSTER SLOTS": func(cmd Completed) ValkeyResult { return slotsMultiResp }, "INFO": func(cmd Completed) ValkeyResult { return newResult(ValkeyMessage{typ: '+', string: "INFO"}, nil) }, "GET K1{a}": func(cmd Completed) ValkeyResult { return newResult(ValkeyMessage{typ: '+', string: "GET K1{a}"}, nil) }, }, } replicaNodeConn := &mockConn{ DoMultiFn: func(multi ...Completed) *valkeyresults { resps := make([]ValkeyResult, len(multi)) for i, cmd := range multi { resps[i] = newResult(ValkeyMessage{typ: '+', string: strings.Join(cmd.Commands(), " ")}, nil) } return &valkeyresults{s: resps} }, DoMultiCacheFn: func(multi ...CacheableTTL) *valkeyresults { resps := make([]ValkeyResult, len(multi)) for i, cmd := range multi { resps[i] = newResult(ValkeyMessage{typ: '+', string: strings.Join(cmd.Cmd.Commands(), " ")}, nil) } return &valkeyresults{s: resps} }, DoOverride: map[string]func(cmd Completed) ValkeyResult{ "GET Do": func(cmd Completed) ValkeyResult { return newResult(ValkeyMessage{typ: '+', string: "GET Do"}, nil) }, "GET K1{a}": func(cmd Completed) ValkeyResult { return newResult(ValkeyMessage{typ: '+', string: "GET K1{a}"}, nil) }, "GET K2{a}": func(cmd Completed) ValkeyResult { return newResult(ValkeyMessage{typ: '+', string: "GET K2{a}"}, nil) }, }, DoCacheOverride: map[string]func(cmd Cacheable, ttl time.Duration) ValkeyResult{ "GET DoCache": func(cmd Cacheable, ttl time.Duration) ValkeyResult { return newResult(ValkeyMessage{typ: '+', string: "GET DoCache"}, nil) }, }, } client, err := newClusterClient( &ClientOption{ InitAddress: []string{"127.0.0.1:0"}, SendToReplicas: func(cmd Completed) bool { return true }, }, func(dst string, opt *ClientOption) conn { if dst == "127.0.0.1:0" || dst == "127.0.2.1:0" { // primary nodes return primaryNodeConn } else { // replica nodes return replicaNodeConn } }, newRetryer(defaultRetryDelayFn), ) if err != nil { t.Fatalf("unexpected err %v", err) } t.Run("Do", func(t *testing.T) { c := client.B().Get().Key("Do").Build() if v, err := client.Do(context.Background(), c).ToString(); err != nil || v != "GET Do" { t.Fatalf("unexpected response %v %v", v, err) } }) t.Run("DoMulti Single Slot", func(t *testing.T) { c1 := client.B().Get().Key("K1{a}").Build() c2 := client.B().Get().Key("K2{a}").Build() resps := client.DoMulti(context.Background(), c1, c2) if v, err := resps[0].ToString(); err != nil || v != "GET K1{a}" { t.Fatalf("unexpected response %v %v", v, err) } if v, err := resps[1].ToString(); err != nil || v != "GET K2{a}" { t.Fatalf("unexpected response %v %v", v, err) } }) t.Run("DoMulti Single Slot + Init Slot", func(t *testing.T) { c1 := client.B().Get().Key("K1{a}").Build() c2 := client.B().Info().Build() resps := client.DoMulti(context.Background(), c1, c2) if v, err := resps[0].ToString(); err != nil || v != "GET K1{a}" { t.Fatalf("unexpected response %v %v", v, err) } if v, err := resps[1].ToString(); err != nil || v != "INFO" { t.Fatalf("unexpected response %v %v", v, err) } }) t.Run("DoMulti Cross Slot + Init Slot", func(t *testing.T) { defer func() { if err := recover(); err != panicMixCxSlot { t.Errorf("DoMulti should panic if Cross Slot + Init Slot") } }() c1 := client.B().Get().Key("K1{a}").Build() c2 := client.B().Get().Key("K1{b}").Build() c3 := client.B().Info().Build() client.DoMulti(context.Background(), c1, c2, c3) }) t.Run("DoMulti Multi Slot", func(t *testing.T) { multi := make([]Completed, 500) for i := 0; i < len(multi); i++ { multi[i] = client.B().Get().Key(fmt.Sprintf("K1{%d}", i)).Build() } resps := client.DoMulti(context.Background(), multi...) for i := 0; i < len(multi); i++ { if v, err := resps[i].ToString(); err != nil || v != fmt.Sprintf("GET K1{%d}", i) { t.Fatalf("unexpected response %v %v", v, err) } } }) t.Run("DoCache", func(t *testing.T) { c := client.B().Get().Key("DoCache").Cache() if v, err := client.DoCache(context.Background(), c, 100).ToString(); err != nil || v != "GET DoCache" { t.Fatalf("unexpected response %v %v", v, err) } }) t.Run("DoMultiCache Single Slot", func(t *testing.T) { c1 := client.B().Get().Key("K1{a}").Cache() c2 := client.B().Get().Key("K2{a}").Cache() resps := client.DoMultiCache(context.Background(), CT(c1, time.Second), CT(c2, time.Second)) if v, err := resps[0].ToString(); err != nil || v != "GET K1{a}" { t.Fatalf("unexpected response %v %v", v, err) } if v, err := resps[1].ToString(); err != nil || v != "GET K2{a}" { t.Fatalf("unexpected response %v %v", v, err) } }) t.Run("DoMultiCache Multi Slot", func(t *testing.T) { multi := make([]CacheableTTL, 500) for i := 0; i < len(multi); i++ { multi[i] = CT(client.B().Get().Key(fmt.Sprintf("K1{%d}", i)).Cache(), time.Second) } resps := client.DoMultiCache(context.Background(), multi...) for i := 0; i < len(multi); i++ { if v, err := resps[i].ToString(); err != nil || v != fmt.Sprintf("GET K1{%d}", i) { t.Fatalf("unexpected response %v %v", v, err) } } }) t.Run("Receive", func(t *testing.T) { c := client.B().Subscribe().Channel("ch").Build() hdl := func(message PubSubMessage) {} primaryNodeConn.ReceiveFn = func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error { if !reflect.DeepEqual(subscribe.Commands(), c.Commands()) { t.Fatalf("unexpected command %v", subscribe) } return nil } replicaNodeConn.ReceiveFn = func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error { if !reflect.DeepEqual(subscribe.Commands(), c.Commands()) { t.Fatalf("unexpected command %v", subscribe) } return nil } if err := client.Receive(context.Background(), c, hdl); err != nil { t.Fatalf("unexpected response %v", err) } }) t.Run("Receive Valkey Err", func(t *testing.T) { c := client.B().Ssubscribe().Channel("ch").Build() e := &ValkeyError{} primaryNodeConn.ReceiveFn = func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error { return e } replicaNodeConn.ReceiveFn = func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error { return e } if err := client.Receive(context.Background(), c, func(message PubSubMessage) {}); err != e { t.Fatalf("unexpected response %v", err) } }) t.Run("Dedicated Cross Slot Err", func(t *testing.T) { defer func() { if err := recover(); err != panicMsgCxSlot { t.Errorf("Dedicated should panic if cross slots is used") } }() primaryNodeConn.AcquireFn = func() wire { return &mockWire{} } client.Dedicated(func(c DedicatedClient) error { c.Do(context.Background(), c.B().Get().Key("a").Build()).Error() return c.Do(context.Background(), c.B().Get().Key("b").Build()).Error() }) }) t.Run("Dedicated Cross Slot Err Multi", func(t *testing.T) { defer func() { if err := recover(); err != panicMsgCxSlot { t.Errorf("Dedicated should panic if cross slots is used") } }() primaryNodeConn.AcquireFn = func() wire { return &mockWire{ DoMultiFn: func(multi ...Completed) *valkeyresults { return &valkeyresults{s: []ValkeyResult{ newResult(ValkeyMessage{typ: '+', string: "OK"}, nil), newResult(ValkeyMessage{typ: '+', string: "OK"}, nil), newResult(ValkeyMessage{typ: '*', values: []ValkeyMessage{{typ: '+', string: "a"}}}, nil), }} }, } } client.Dedicated(func(c DedicatedClient) (err error) { c.DoMulti( context.Background(), c.B().Multi().Build(), c.B().Get().Key("a").Build(), c.B().Exec().Build(), ) c.DoMulti( context.Background(), c.B().Multi().Build(), c.B().Get().Key("b").Build(), c.B().Exec().Build(), ) return nil }) }) t.Run("Dedicated Multi Cross Slot Err", func(t *testing.T) { primaryNodeConn.AcquireFn = func() wire { return &mockWire{} } err := client.Dedicated(func(c DedicatedClient) (err error) { defer func() { err = errors.New(recover().(string)) }() c.DoMulti( context.Background(), c.B().Get().Key("a").Build(), c.B().Get().Key("b").Build(), ) return nil }) if err == nil || err.Error() != panicMsgCxSlot { t.Errorf("Multi should panic if cross slots is used") } }) t.Run("Dedicated Receive Valkey Err", func(t *testing.T) { e := &ValkeyError{} w := &mockWire{ ReceiveFn: func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error { return e }, } primaryNodeConn.AcquireFn = func() wire { return w } replicaNodeConn.AcquireFn = func() wire { return w } // Subscribe can work on replicas if err := client.Dedicated(func(c DedicatedClient) error { return c.Receive(context.Background(), c.B().Subscribe().Channel("a").Build(), func(msg PubSubMessage) {}) }); err != e { t.Fatalf("unexpected err %v", err) } }) t.Run("Dedicated", func(t *testing.T) { closed := false w := &mockWire{ DoFn: func(cmd Completed) ValkeyResult { return newResult(ValkeyMessage{typ: '+', string: "Delegate"}, nil) }, DoMultiFn: func(cmd ...Completed) *valkeyresults { if len(cmd) == 4 { return &valkeyresults{s: []ValkeyResult{ newResult(ValkeyMessage{typ: '+', string: "OK"}, nil), newResult(ValkeyMessage{typ: '+', string: "OK"}, nil), newResult(ValkeyMessage{typ: '+', string: "OK"}, nil), newResult(ValkeyMessage{typ: '*', values: []ValkeyMessage{ {typ: '+', string: "Delegate0"}, {typ: '+', string: "Delegate1"}, }}, nil), }} } return &valkeyresults{s: []ValkeyResult{ newResult(ValkeyMessage{typ: '+', string: "Delegate0"}, nil), newResult(ValkeyMessage{typ: '+', string: "Delegate1"}, nil), }} }, ReceiveFn: func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error { return ErrClosing }, SetPubSubHooksFn: func(hooks PubSubHooks) <-chan error { ch := make(chan error, 1) ch <- ErrClosing close(ch) return ch }, ErrorFn: func() error { return ErrClosing }, CloseFn: func() { closed = true }, } primaryNodeConn.AcquireFn = func() wire { return w } stored := false primaryNodeConn.StoreFn = func(ww wire) { if ww != w { t.Fatalf("received unexpected wire %v", ww) } stored = true } if err := client.Dedicated(func(c DedicatedClient) error { ch := c.SetPubSubHooks(PubSubHooks{OnMessage: func(m PubSubMessage) {}}) if v, err := c.Do(context.Background(), c.B().Get().Key("a").Build()).ToString(); err != nil || v != "Delegate" { t.Fatalf("unexpected response %v %v", v, err) } if v := c.DoMulti(context.Background()); len(v) != 0 { t.Fatalf("received unexpected response %v", v) } for i, resp := range c.DoMulti( context.Background(), c.B().Info().Build(), c.B().Info().Build(), ) { if v, err := resp.ToString(); err != nil || v != "Delegate"+strconv.Itoa(i) { t.Fatalf("unexpected response %v %v", v, err) } } for i, resp := range c.DoMulti( context.Background(), c.B().Get().Key("a").Build(), c.B().Get().Key("a").Build(), ) { if v, err := resp.ToString(); err != nil || v != "Delegate"+strconv.Itoa(i) { t.Fatalf("unexpected response %v %v", v, err) } } for i, resp := range c.DoMulti( context.Background(), c.B().Multi().Build(), c.B().Get().Key("a").Build(), c.B().Get().Key("a").Build(), c.B().Exec().Build(), )[3].val.values { if v, err := resp.ToString(); err != nil || v != "Delegate"+strconv.Itoa(i) { t.Fatalf("unexpected response %v %v", v, err) } } if err := c.Receive(context.Background(), c.B().Ssubscribe().Channel("a").Build(), func(msg PubSubMessage) {}); err != ErrClosing { t.Fatalf("unexpected ret %v", err) } if err := <-ch; err != ErrClosing { t.Fatalf("unexpected ret %v", err) } if err := <-c.SetPubSubHooks(PubSubHooks{OnMessage: func(m PubSubMessage) {}}); err != ErrClosing { t.Fatalf("unexpected ret %v", err) } c.Close() return nil }); err != nil { t.Fatalf("unexpected err %v", err) } if !stored { t.Fatalf("Dedicated desn't put back the wire") } if !closed { t.Fatalf("Dedicated desn't delegate Close") } }) t.Run("Dedicate", func(t *testing.T) { closed := false w := &mockWire{ DoFn: func(cmd Completed) ValkeyResult { return newResult(ValkeyMessage{typ: '+', string: "Delegate"}, nil) }, DoMultiFn: func(cmd ...Completed) *valkeyresults { if len(cmd) == 4 { return &valkeyresults{s: []ValkeyResult{ newResult(ValkeyMessage{typ: '+', string: "OK"}, nil), newResult(ValkeyMessage{typ: '+', string: "OK"}, nil), newResult(ValkeyMessage{typ: '+', string: "OK"}, nil), newResult(ValkeyMessage{typ: '*', values: []ValkeyMessage{ {typ: '+', string: "Delegate0"}, {typ: '+', string: "Delegate1"}, }}, nil), }} } return &valkeyresults{s: []ValkeyResult{ newResult(ValkeyMessage{typ: '+', string: "Delegate0"}, nil), newResult(ValkeyMessage{typ: '+', string: "Delegate1"}, nil), }} }, ReceiveFn: func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error { return ErrClosing }, SetPubSubHooksFn: func(hooks PubSubHooks) <-chan error { ch := make(chan error, 1) ch <- ErrClosing close(ch) return ch }, ErrorFn: func() error { return ErrClosing }, CloseFn: func() { closed = true }, } primaryNodeConn.AcquireFn = func() wire { return w } stored := false primaryNodeConn.StoreFn = func(ww wire) { if ww != w { t.Fatalf("received unexpected wire %v", ww) } stored = true } c, cancel := client.Dedicate() ch := c.SetPubSubHooks(PubSubHooks{OnMessage: func(m PubSubMessage) {}}) if v, err := c.Do(context.Background(), c.B().Get().Key("a").Build()).ToString(); err != nil || v != "Delegate" { t.Fatalf("unexpected response %v %v", v, err) } if v := c.DoMulti(context.Background()); len(v) != 0 { t.Fatalf("received unexpected response %v", v) } for i, resp := range c.DoMulti( context.Background(), c.B().Info().Build(), c.B().Info().Build(), ) { if v, err := resp.ToString(); err != nil || v != "Delegate"+strconv.Itoa(i) { t.Fatalf("unexpected response %v %v", v, err) } } for i, resp := range c.DoMulti( context.Background(), c.B().Get().Key("a").Build(), c.B().Get().Key("a").Build(), ) { if v, err := resp.ToString(); err != nil || v != "Delegate"+strconv.Itoa(i) { t.Fatalf("unexpected response %v %v", v, err) } } for i, resp := range c.DoMulti( context.Background(), c.B().Multi().Build(), c.B().Get().Key("a").Build(), c.B().Get().Key("a").Build(), c.B().Exec().Build(), )[3].val.values { if v, err := resp.ToString(); err != nil || v != "Delegate"+strconv.Itoa(i) { t.Fatalf("unexpected response %v %v", v, err) } } if err := c.Receive(context.Background(), c.B().Ssubscribe().Channel("a").Build(), func(msg PubSubMessage) {}); err != ErrClosing { t.Fatalf("unexpected ret %v", err) } if err := <-ch; err != ErrClosing { t.Fatalf("unexpected ret %v", err) } if err := <-c.SetPubSubHooks(PubSubHooks{OnMessage: func(m PubSubMessage) {}}); err != ErrClosing { t.Fatalf("unexpected ret %v", err) } c.Close() cancel() if !stored { t.Fatalf("Dedicated desn't put back the wire") } if !closed { t.Fatalf("Dedicated desn't delegate Close") } }) } //gocyclo:ignore func TestClusterClient_SendReadOperationToReplicaNodesWriteOperationToPrimaryNodes(t *testing.T) { defer ShouldNotLeaked(SetupLeakDetection()) primaryNodeConn := &mockConn{ DoOverride: map[string]func(cmd Completed) ValkeyResult{ "CLUSTER SLOTS": func(cmd Completed) ValkeyResult { return slotsMultiResp }, "INFO": func(cmd Completed) ValkeyResult { return newResult(ValkeyMessage{typ: '+', string: "INFO"}, nil) }, "SET Do V": func(cmd Completed) ValkeyResult { return newResult(ValkeyMessage{typ: '+', string: "SET Do V"}, nil) }, "SET K2{a} V2{a}": func(cmd Completed) ValkeyResult { return newResult(ValkeyMessage{typ: '+', string: "SET K2{a} V2{a}"}, nil) }, }, DoMultiFn: func(multi ...Completed) *valkeyresults { resps := make([]ValkeyResult, len(multi)) for i, cmd := range multi { if strings.HasPrefix(strings.Join(cmd.Commands(), " "), "SET K1") { resps[i] = newResult(ValkeyMessage{typ: '+', string: strings.Join(cmd.Commands(), " ")}, nil) continue } if strings.HasPrefix(strings.Join(cmd.Commands(), " "), "SET K2") { resps[i] = newResult(ValkeyMessage{typ: '+', string: strings.Join(cmd.Commands(), " ")}, nil) continue } if strings.HasPrefix(strings.Join(cmd.Commands(), " "), "MULTI") { resps[i] = newResult(ValkeyMessage{typ: '+', string: "MULTI"}, nil) continue } if strings.HasPrefix(strings.Join(cmd.Commands(), " "), "EXEC") { resps[i] = newResult(ValkeyMessage{typ: '+', string: "EXEC"}, nil) continue } return &valkeyresults{ s: []ValkeyResult{}, } } return &valkeyresults{s: resps} }, } replicaNodeConn := &mockConn{ DoOverride: map[string]func(cmd Completed) ValkeyResult{ "GET Do": func(cmd Completed) ValkeyResult { return newResult(ValkeyMessage{typ: '+', string: "GET Do"}, nil) }, "GET K1{a}": func(cmd Completed) ValkeyResult { return newResult(ValkeyMessage{typ: '+', string: "GET K1{a}"}, nil) }, "GET K2{a}": func(cmd Completed) ValkeyResult { return newResult(ValkeyMessage{typ: '+', string: "GET K2{a}"}, nil) }, }, DoMultiFn: func(multi ...Completed) *valkeyresults { resps := make([]ValkeyResult, len(multi)) for i, cmd := range multi { if strings.HasPrefix(strings.Join(cmd.Commands(), " "), "GET K1") { resps[i] = newResult(ValkeyMessage{typ: '+', string: strings.Join(cmd.Commands(), " ")}, nil) continue } return &valkeyresults{ s: []ValkeyResult{}, } } return &valkeyresults{s: resps} }, DoCacheOverride: map[string]func(cmd Cacheable, ttl time.Duration) ValkeyResult{ "GET DoCache": func(cmd Cacheable, ttl time.Duration) ValkeyResult { return newResult(ValkeyMessage{typ: '+', string: "GET DoCache"}, nil) }, "GET K1{a}": func(cmd Cacheable, ttl time.Duration) ValkeyResult { return newResult(ValkeyMessage{typ: '+', string: "GET K1{a}"}, nil) }, "GET K2{a}": func(cmd Cacheable, ttl time.Duration) ValkeyResult { return newResult(ValkeyMessage{typ: '+', string: "GET K2{a}"}, nil) }, }, DoMultiCacheFn: func(multi ...CacheableTTL) *valkeyresults { resps := make([]ValkeyResult, len(multi)) for i, cmd := range multi { if strings.HasPrefix(strings.Join(cmd.Cmd.Commands(), " "), "GET K1") { resps[i] = newResult(ValkeyMessage{typ: '+', string: strings.Join(cmd.Cmd.Commands(), " ")}, nil) continue } return &valkeyresults{ s: []ValkeyResult{}, } } return &valkeyresults{s: resps} }, } client, err := newClusterClient( &ClientOption{ InitAddress: []string{"127.0.0.1:0"}, SendToReplicas: func(cmd Completed) bool { return cmd.IsReadOnly() }, }, func(dst string, opt *ClientOption) conn { if dst == "127.0.0.1:0" || dst == "127.0.2.1:0" { // primary nodes return primaryNodeConn } else { // replica nodes return replicaNodeConn } }, newRetryer(defaultRetryDelayFn), ) if err != nil { t.Fatalf("unexpected err %v", err) } t.Run("Do read operation", func(t *testing.T) { c := client.B().Get().Key("Do").Build() if v, err := client.Do(context.Background(), c).ToString(); err != nil || v != "GET Do" { t.Fatalf("unexpected response %v %v", v, err) } }) t.Run("Do write operation", func(t *testing.T) { c := client.B().Set().Key("Do").Value("V").Build() if v, err := client.Do(context.Background(), c).ToString(); err != nil || v != "SET Do V" { t.Fatalf("unexpected response %v %v", v, err) } }) t.Run("DoMulti Single Slot All Read Operations", func(t *testing.T) { c1 := client.B().Get().Key("K1{a}").Build() c2 := client.B().Get().Key("K2{a}").Build() resps := client.DoMulti(context.Background(), c1, c2) if v, err := resps[0].ToString(); err != nil || v != "GET K1{a}" { t.Fatalf("unexpected response %v %v", v, err) } if v, err := resps[1].ToString(); err != nil || v != "GET K2{a}" { t.Fatalf("unexpected response %v %v", v, err) } }) t.Run("DoMulti Single Slot Read Operation And Write Operation", func(t *testing.T) { c1 := client.B().Get().Key("K1{a}").Build() c2 := client.B().Set().Key("K2{a}").Value("V2{a}").Build() resps := client.DoMulti(context.Background(), c1, c2) if v, err := resps[0].ToString(); err != nil || v != "GET K1{a}" { t.Fatalf("unexpected response %v %v", v, err) } if v, err := resps[1].ToString(); err != nil || v != "SET K2{a} V2{a}" { t.Fatalf("unexpected response %v %v", v, err) } }) t.Run("DoMulti Single Slot Operations + Init Slot", func(t *testing.T) { c1 := client.B().Multi().Build() c2 := client.B().Set().Key("K1{a}").Value("V1{a}").Build() c3 := client.B().Set().Key("K2{a}").Value("V2{a}").Build() c4 := client.B().Exec().Build() resps := client.DoMulti(context.Background(), c1, c2, c3, c4) if v, err := resps[0].ToString(); err != nil || v != "MULTI" { t.Fatalf("unexpected response %v %v", v, err) } if v, err := resps[1].ToString(); err != nil || v != "SET K1{a} V1{a}" { t.Fatalf("unexpected response %v %v", v, err) } if v, err := resps[2].ToString(); err != nil || v != "SET K2{a} V2{a}" { t.Fatalf("unexpected response %v %v", v, err) } if v, err := resps[3].ToString(); err != nil || v != "EXEC" { t.Fatalf("unexpected response %v %v", v, err) } }) t.Run("DoMulti Cross Slot + Init Slot", func(t *testing.T) { defer func() { if err := recover(); err != panicMixCxSlot { t.Errorf("DoMulti should panic if Cross Slot + Init Slot") } }() c1 := client.B().Get().Key("K1{a}").Build() c2 := client.B().Get().Key("K1{b}").Build() c3 := client.B().Info().Build() client.DoMulti(context.Background(), c1, c2, c3) }) t.Run("DoMulti Multi Slot All Read Operations", func(t *testing.T) { multi := make([]Completed, 500) for i := 0; i < len(multi); i++ { multi[i] = client.B().Get().Key(fmt.Sprintf("K1{%d}", i)).Build() } resps := client.DoMulti(context.Background(), multi...) for i := 0; i < len(multi); i++ { if v, err := resps[i].ToString(); err != nil || v != fmt.Sprintf("GET K1{%d}", i) { t.Fatalf("unexpected response %v %v", v, err) } } }) t.Run("DoMulti Multi Slot Read & Write Operations", func(t *testing.T) { multi := make([]Completed, 500) for i := 0; i < len(multi); i++ { if i%2 == 0 { multi[i] = client.B().Get().Key(fmt.Sprintf("K1{%d}", i)).Build() } else { multi[i] = client.B().Set().Key(fmt.Sprintf("K2{%d}", i)).Value(fmt.Sprintf("V2{%d}", i)).Build() } } resps := client.DoMulti(context.Background(), multi...) for i := 0; i < len(multi); i++ { if i%2 == 0 { if v, err := resps[i].ToString(); err != nil || v != fmt.Sprintf("GET K1{%d}", i) { t.Fatalf("unexpected response %v %v", v, err) } } else { if v, err := resps[i].ToString(); err != nil || v != fmt.Sprintf("SET K2{%d} V2{%d}", i, i) { t.Fatalf("unexpected response %v %v", v, err) } } } }) t.Run("DoCache Operation", func(t *testing.T) { c := client.B().Get().Key("DoCache").Cache() if v, err := client.DoCache(context.Background(), c, 100).ToString(); err != nil || v != "GET DoCache" { t.Fatalf("unexpected response %v %v", v, err) } }) t.Run("DoMultiCache Single Slot", func(t *testing.T) { c1 := client.B().Get().Key("K1{a}").Cache() c2 := client.B().Get().Key("K2{a}").Cache() resps := client.DoMultiCache(context.Background(), CT(c1, time.Second), CT(c2, time.Second)) if v, err := resps[0].ToString(); err != nil || v != "GET K1{a}" { t.Fatalf("unexpected response %v %v", v, err) } if v, err := resps[1].ToString(); err != nil || v != "GET K2{a}" { t.Fatalf("unexpected response %v %v", v, err) } }) t.Run("DoMultiCache Multi Slot", func(t *testing.T) { multi := make([]CacheableTTL, 500) for i := 0; i < len(multi); i++ { multi[i] = CT(client.B().Get().Key(fmt.Sprintf("K1{%d}", i)).Cache(), time.Second) } resps := client.DoMultiCache(context.Background(), multi...) for i := 0; i < len(multi); i++ { if v, err := resps[i].ToString(); err != nil || v != fmt.Sprintf("GET K1{%d}", i) { t.Fatalf("unexpected response %v %v", v, err) } } }) t.Run("Receive", func(t *testing.T) { c := client.B().Subscribe().Channel("ch").Build() hdl := func(message PubSubMessage) {} primaryNodeConn.ReceiveFn = func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error { if !reflect.DeepEqual(subscribe.Commands(), c.Commands()) { t.Fatalf("unexpected command %v", subscribe) } return nil } replicaNodeConn.ReceiveFn = func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error { if !reflect.DeepEqual(subscribe.Commands(), c.Commands()) { t.Fatalf("unexpected command %v", subscribe) } return nil } if err := client.Receive(context.Background(), c, hdl); err != nil { t.Fatalf("unexpected response %v", err) } }) t.Run("Receive Valkey Err", func(t *testing.T) { c := client.B().Ssubscribe().Channel("ch").Build() e := &ValkeyError{} primaryNodeConn.ReceiveFn = func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error { return e } replicaNodeConn.ReceiveFn = func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error { return e } if err := client.Receive(context.Background(), c, func(message PubSubMessage) {}); err != e { t.Fatalf("unexpected response %v", err) } }) t.Run("Dedicated Cross Slot Err", func(t *testing.T) { defer func() { if err := recover(); err != panicMsgCxSlot { t.Errorf("Dedicated should panic if cross slots is used") } }() primaryNodeConn.AcquireFn = func() wire { return &mockWire{} } client.Dedicated(func(c DedicatedClient) error { c.Do(context.Background(), c.B().Get().Key("a").Build()).Error() return c.Do(context.Background(), c.B().Get().Key("b").Build()).Error() }) }) t.Run("Dedicated Cross Slot Err Multi", func(t *testing.T) { defer func() { if err := recover(); err != panicMsgCxSlot { t.Errorf("Dedicated should panic if cross slots is used") } }() primaryNodeConn.AcquireFn = func() wire { return &mockWire{ DoMultiFn: func(multi ...Completed) *valkeyresults { return &valkeyresults{s: []ValkeyResult{ newResult(ValkeyMessage{typ: '+', string: "OK"}, nil), newResult(ValkeyMessage{typ: '+', string: "OK"}, nil), newResult(ValkeyMessage{typ: '*', values: []ValkeyMessage{{typ: '+', string: "a"}}}, nil), }} }, } } client.Dedicated(func(c DedicatedClient) (err error) { c.DoMulti( context.Background(), c.B().Multi().Build(), c.B().Get().Key("a").Build(), c.B().Exec().Build(), ) c.DoMulti( context.Background(), c.B().Multi().Build(), c.B().Get().Key("b").Build(), c.B().Exec().Build(), ) return nil }) }) t.Run("Dedicated Multi Cross Slot Err", func(t *testing.T) { primaryNodeConn.AcquireFn = func() wire { return &mockWire{} } err := client.Dedicated(func(c DedicatedClient) (err error) { defer func() { err = errors.New(recover().(string)) }() c.DoMulti( context.Background(), c.B().Get().Key("a").Build(), c.B().Get().Key("b").Build(), ) return nil }) if err == nil || err.Error() != panicMsgCxSlot { t.Errorf("Multi should panic if cross slots is used") } }) t.Run("Dedicated Receive Valkey Err", func(t *testing.T) { e := &ValkeyError{} w := &mockWire{ ReceiveFn: func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error { return e }, } primaryNodeConn.AcquireFn = func() wire { return w } replicaNodeConn.AcquireFn = func() wire { return w } // Subscribe can work on replicas if err := client.Dedicated(func(c DedicatedClient) error { return c.Receive(context.Background(), c.B().Subscribe().Channel("a").Build(), func(msg PubSubMessage) {}) }); err != e { t.Fatalf("unexpected err %v", err) } }) t.Run("Dedicated", func(t *testing.T) { closed := false w := &mockWire{ DoFn: func(cmd Completed) ValkeyResult { return newResult(ValkeyMessage{typ: '+', string: "Delegate"}, nil) }, DoMultiFn: func(cmd ...Completed) *valkeyresults { if len(cmd) == 4 { return &valkeyresults{s: []ValkeyResult{ newResult(ValkeyMessage{typ: '+', string: "OK"}, nil), newResult(ValkeyMessage{typ: '+', string: "OK"}, nil), newResult(ValkeyMessage{typ: '+', string: "OK"}, nil), newResult(ValkeyMessage{typ: '*', values: []ValkeyMessage{ {typ: '+', string: "Delegate0"}, {typ: '+', string: "Delegate1"}, }}, nil), }} } return &valkeyresults{s: []ValkeyResult{ newResult(ValkeyMessage{typ: '+', string: "Delegate0"}, nil), newResult(ValkeyMessage{typ: '+', string: "Delegate1"}, nil), }} }, ReceiveFn: func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error { return ErrClosing }, SetPubSubHooksFn: func(hooks PubSubHooks) <-chan error { ch := make(chan error, 1) ch <- ErrClosing close(ch) return ch }, ErrorFn: func() error { return ErrClosing }, CloseFn: func() { closed = true }, } primaryNodeConn.AcquireFn = func() wire { return w } stored := false primaryNodeConn.StoreFn = func(ww wire) { if ww != w { t.Fatalf("received unexpected wire %v", ww) } stored = true } if err := client.Dedicated(func(c DedicatedClient) error { ch := c.SetPubSubHooks(PubSubHooks{OnMessage: func(m PubSubMessage) {}}) if v, err := c.Do(context.Background(), c.B().Get().Key("a").Build()).ToString(); err != nil || v != "Delegate" { t.Fatalf("unexpected response %v %v", v, err) } if v := c.DoMulti(context.Background()); len(v) != 0 { t.Fatalf("received unexpected response %v", v) } for i, resp := range c.DoMulti( context.Background(), c.B().Info().Build(), c.B().Info().Build(), ) { if v, err := resp.ToString(); err != nil || v != "Delegate"+strconv.Itoa(i) { t.Fatalf("unexpected response %v %v", v, err) } } for i, resp := range c.DoMulti( context.Background(), c.B().Get().Key("a").Build(), c.B().Get().Key("a").Build(), ) { if v, err := resp.ToString(); err != nil || v != "Delegate"+strconv.Itoa(i) { t.Fatalf("unexpected response %v %v", v, err) } } for i, resp := range c.DoMulti( context.Background(), c.B().Multi().Build(), c.B().Get().Key("a").Build(), c.B().Get().Key("a").Build(), c.B().Exec().Build(), )[3].val.values { if v, err := resp.ToString(); err != nil || v != "Delegate"+strconv.Itoa(i) { t.Fatalf("unexpected response %v %v", v, err) } } if err := c.Receive(context.Background(), c.B().Ssubscribe().Channel("a").Build(), func(msg PubSubMessage) {}); err != ErrClosing { t.Fatalf("unexpected ret %v", err) } if err := <-ch; err != ErrClosing { t.Fatalf("unexpected ret %v", err) } if err := <-c.SetPubSubHooks(PubSubHooks{OnMessage: func(m PubSubMessage) {}}); err != ErrClosing { t.Fatalf("unexpected ret %v", err) } c.Close() return nil }); err != nil { t.Fatalf("unexpected err %v", err) } if !stored { t.Fatalf("Dedicated desn't put back the wire") } if !closed { t.Fatalf("Dedicated desn't delegate Close") } }) t.Run("Dedicate", func(t *testing.T) { closed := false w := &mockWire{ DoFn: func(cmd Completed) ValkeyResult { return newResult(ValkeyMessage{typ: '+', string: "Delegate"}, nil) }, DoMultiFn: func(cmd ...Completed) *valkeyresults { if len(cmd) == 4 { return &valkeyresults{s: []ValkeyResult{ newResult(ValkeyMessage{typ: '+', string: "OK"}, nil), newResult(ValkeyMessage{typ: '+', string: "OK"}, nil), newResult(ValkeyMessage{typ: '+', string: "OK"}, nil), newResult(ValkeyMessage{typ: '*', values: []ValkeyMessage{ {typ: '+', string: "Delegate0"}, {typ: '+', string: "Delegate1"}, }}, nil), }} } return &valkeyresults{s: []ValkeyResult{ newResult(ValkeyMessage{typ: '+', string: "Delegate0"}, nil), newResult(ValkeyMessage{typ: '+', string: "Delegate1"}, nil), }} }, ReceiveFn: func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error { return ErrClosing }, SetPubSubHooksFn: func(hooks PubSubHooks) <-chan error { ch := make(chan error, 1) ch <- ErrClosing close(ch) return ch }, ErrorFn: func() error { return ErrClosing }, CloseFn: func() { closed = true }, } primaryNodeConn.AcquireFn = func() wire { return w } stored := false primaryNodeConn.StoreFn = func(ww wire) { if ww != w { t.Fatalf("received unexpected wire %v", ww) } stored = true } c, cancel := client.Dedicate() ch := c.SetPubSubHooks(PubSubHooks{OnMessage: func(m PubSubMessage) {}}) if v, err := c.Do(context.Background(), c.B().Get().Key("a").Build()).ToString(); err != nil || v != "Delegate" { t.Fatalf("unexpected response %v %v", v, err) } if v := c.DoMulti(context.Background()); len(v) != 0 { t.Fatalf("received unexpected response %v", v) } for i, resp := range c.DoMulti( context.Background(), c.B().Info().Build(), c.B().Info().Build(), ) { if v, err := resp.ToString(); err != nil || v != "Delegate"+strconv.Itoa(i) { t.Fatalf("unexpected response %v %v", v, err) } } for i, resp := range c.DoMulti( context.Background(), c.B().Get().Key("a").Build(), c.B().Get().Key("a").Build(), ) { if v, err := resp.ToString(); err != nil || v != "Delegate"+strconv.Itoa(i) { t.Fatalf("unexpected response %v %v", v, err) } } for i, resp := range c.DoMulti( context.Background(), c.B().Multi().Build(), c.B().Get().Key("a").Build(), c.B().Get().Key("a").Build(), c.B().Exec().Build(), )[3].val.values { if v, err := resp.ToString(); err != nil || v != "Delegate"+strconv.Itoa(i) { t.Fatalf("unexpected response %v %v", v, err) } } if err := c.Receive(context.Background(), c.B().Ssubscribe().Channel("a").Build(), func(msg PubSubMessage) {}); err != ErrClosing { t.Fatalf("unexpected ret %v", err) } if err := <-ch; err != ErrClosing { t.Fatalf("unexpected ret %v", err) } if err := <-c.SetPubSubHooks(PubSubHooks{OnMessage: func(m PubSubMessage) {}}); err != ErrClosing { t.Fatalf("unexpected ret %v", err) } c.Close() cancel() if !stored { t.Fatalf("Dedicated desn't put back the wire") } if !closed { t.Fatalf("Dedicated desn't delegate Close") } }) } //gocyclo:ignore func TestClusterClient_SendPrimaryNodeOnlyButOneSlotAssigned(t *testing.T) { defer ShouldNotLeaked(SetupLeakDetection()) primaryNodeConn := &mockConn{ DoOverride: map[string]func(cmd Completed) ValkeyResult{ "CLUSTER SLOTS": func(cmd Completed) ValkeyResult { return singleSlotResp }, "INFO": func(cmd Completed) ValkeyResult { return newResult(ValkeyMessage{typ: '+', string: "INFO"}, nil) }, }, DoMultiFn: func(multi ...Completed) *valkeyresults { resps := make([]ValkeyResult, len(multi)) for i, cmd := range multi { if strings.HasPrefix(strings.Join(cmd.Commands(), " "), "MULTI") { resps[i] = newResult(ValkeyMessage{typ: '+', string: "MULTI"}, nil) continue } if strings.HasPrefix(strings.Join(cmd.Commands(), " "), "EXEC") { resps[i] = newResult(ValkeyMessage{typ: '+', string: "EXEC"}, nil) continue } return &valkeyresults{ s: []ValkeyResult{}, } } return &valkeyresults{s: resps} }, } client, err := newClusterClient( &ClientOption{ InitAddress: []string{"127.0.0.1:0"}, SendToReplicas: func(cmd Completed) bool { return false }, }, func(dst string, opt *ClientOption) conn { return primaryNodeConn }, newRetryer(defaultRetryDelayFn), ) if err != nil { t.Fatalf("unexpected err %v", err) } t.Run("DoMulti Init Slot Operations", func(t *testing.T) { c1 := client.B().Multi().Build() c2 := client.B().Exec().Build() resps := client.DoMulti(context.Background(), c1, c2) if v, err := resps[0].ToString(); err != nil || v != "MULTI" { t.Fatalf("unexpected response %v %v", v, err) } if v, err := resps[1].ToString(); err != nil || v != "EXEC" { t.Fatalf("unexpected response %v %v", v, err) } }) } //gocyclo:ignore func TestClusterClientErr(t *testing.T) { defer ShouldNotLeaked(SetupLeakDetection()) t.Run("not refresh on context error", func(t *testing.T) { var count int64 ctx, cancel := context.WithCancel(context.Background()) cancel() v := ctx.Err() m := &mockConn{ DoFn: func(cmd Completed) ValkeyResult { if strings.Join(cmd.Commands(), " ") == "CLUSTER SLOTS" { atomic.AddInt64(&count, 1) return slotsResp } return newErrResult(v) }, DoStreamFn: func(cmd Completed) ValkeyResultStream { return ValkeyResultStream{e: v} }, DoMultiFn: func(multi ...Completed) *valkeyresults { res := make([]ValkeyResult, len(multi)) for i := range res { res[i] = newErrResult(v) } return &valkeyresults{s: res} }, DoMultiStreamFn: func(cmd ...Completed) MultiValkeyResultStream { return MultiValkeyResultStream{e: v} }, DoCacheFn: func(cmd Cacheable, ttl time.Duration) ValkeyResult { return newErrResult(v) }, DoMultiCacheFn: func(multi ...CacheableTTL) *valkeyresults { res := make([]ValkeyResult, len(multi)) for i := range res { res[i] = newErrResult(v) } return &valkeyresults{s: res} }, ReceiveFn: func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error { return v }, } client, err := newClusterClient( &ClientOption{InitAddress: []string{":0"}}, func(dst string, opt *ClientOption) conn { return m }, newRetryer(defaultRetryDelayFn), ) if err != nil { t.Fatalf("unexpected err %v", err) } if err := client.Do(ctx, client.B().Get().Key("a").Build()).Error(); err != v { t.Fatalf("unexpected err %v", err) } if s := client.DoStream(ctx, client.B().Get().Key("a").Build()); s.Error() != v { t.Fatalf("unexpected err %v", s.Error()) } if err := client.DoMulti(ctx, client.B().Get().Key("a").Build())[0].Error(); err != v { t.Fatalf("unexpected err %v", err) } if s := client.DoMultiStream(ctx, client.B().Get().Key("a").Build()); s.Error() != v { t.Fatalf("unexpected err %v", s.Error()) } if err := client.DoCache(ctx, client.B().Get().Key("a").Cache(), 100).Error(); err != v { t.Fatalf("unexpected err %v", err) } if err := client.DoMultiCache(ctx, CT(client.B().Get().Key("a").Cache(), 100))[0].Error(); err != v { t.Fatalf("unexpected err %v", err) } if err := client.Receive(ctx, client.B().Ssubscribe().Channel("a").Build(), func(msg PubSubMessage) {}); err != v { t.Fatalf("unexpected err %v", err) } if c := atomic.LoadInt64(&count); c != 1 { t.Fatalf("unexpected refresh count %v", c) } }) t.Run("refresh err on pick", func(t *testing.T) { var first int64 v := errors.New("refresh err") m := &mockConn{ DoFn: func(cmd Completed) ValkeyResult { if atomic.AddInt64(&first, 1) == 1 { return singleSlotResp } return newErrResult(v) }, ReceiveFn: func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error { return v }, } client, err := newClusterClient( &ClientOption{InitAddress: []string{":0"}}, func(dst string, opt *ClientOption) conn { return m }, newRetryer(defaultRetryDelayFn), ) if err != nil { t.Fatalf("unexpected err %v", err) } if err := client.Do(context.Background(), client.B().Get().Key("a").Build()).Error(); err != v { t.Fatalf("unexpected err %v", err) } if s := client.DoStream(context.Background(), client.B().Get().Key("a").Build()); s.Error() != v { t.Fatalf("unexpected err %v", s.Error()) } if err := client.DoMulti(context.Background(), client.B().Get().Key("a").Build())[0].Error(); err != v { t.Fatalf("unexpected err %v", err) } if s := client.DoMultiStream(context.Background(), client.B().Get().Key("a").Build()); s.Error() != v { t.Fatalf("unexpected err %v", s.Error()) } for _, resp := range client.DoMulti(context.Background(), client.B().Get().Key("a").Build(), client.B().Get().Key("b").Build()) { if err := resp.Error(); err != v { t.Fatalf("unexpected err %v", err) } } if err := client.DoCache(context.Background(), client.B().Get().Key("a").Cache(), 100).Error(); err != v { t.Fatalf("unexpected err %v", err) } if err := client.DoMultiCache(context.Background(), CT(client.B().Get().Key("a").Cache(), 100))[0].Error(); err != v { t.Fatalf("unexpected err %v", err) } for _, resp := range client.DoMultiCache(context.Background(), CT(client.B().Get().Key("a").Cache(), 100), CT(client.B().Get().Key("b").Cache(), 100)) { if err := resp.Error(); err != v { t.Fatalf("unexpected err %v", err) } } if err := client.Receive(context.Background(), client.B().Ssubscribe().Channel("a").Build(), func(msg PubSubMessage) {}); err != v { t.Fatalf("unexpected err %v", err) } }) t.Run("refresh empty on pick", func(t *testing.T) { m := &mockConn{DoFn: func(cmd Completed) ValkeyResult { return singleSlotResp }} client, err := newClusterClient( &ClientOption{InitAddress: []string{":0"}}, func(dst string, opt *ClientOption) conn { return m }, newRetryer(defaultRetryDelayFn), ) if err != nil { t.Fatalf("unexpected err %v", err) } if err := client.Do(context.Background(), client.B().Get().Key("a").Build()).Error(); err != ErrNoSlot { t.Fatalf("unexpected err %v", err) } if err := client.DoMulti(context.Background(), client.B().Get().Key("a").Build())[0].Error(); err != ErrNoSlot { t.Fatalf("unexpected err %v", err) } for _, resp := range client.DoMulti(context.Background(), client.B().Get().Key("a").Build(), client.B().Get().Key("b").Build()) { if err := resp.Error(); err != ErrNoSlot { t.Fatalf("unexpected err %v", err) } } if err := client.DoMultiCache(context.Background(), CT(client.B().Get().Key("a").Cache(), 100))[0].Error(); err != ErrNoSlot { t.Fatalf("unexpected err %v", err) } for _, resp := range client.DoMultiCache(context.Background(), CT(client.B().Get().Key("a").Cache(), 100), CT(client.B().Get().Key("b").Cache(), 100)) { if err := resp.Error(); err != ErrNoSlot { t.Fatalf("unexpected err %v", err) } } }) t.Run("refresh empty on pick in dedicated wire", func(t *testing.T) { m := &mockConn{DoFn: func(cmd Completed) ValkeyResult { return singleSlotResp }} client, err := newClusterClient( &ClientOption{InitAddress: []string{":0"}}, func(dst string, opt *ClientOption) conn { return m }, newRetryer(defaultRetryDelayFn), ) if err != nil { t.Fatalf("unexpected err %v", err) } var ch <-chan error if err := client.Dedicated(func(c DedicatedClient) error { ch = c.SetPubSubHooks(PubSubHooks{OnMessage: func(m PubSubMessage) {}}) return c.Do(context.Background(), c.B().Get().Key("a").Build()).Error() }); err != ErrNoSlot { t.Fatalf("unexpected err %v", err) } if err := <-ch; err != ErrNoSlot { t.Fatalf("unexpected err %v", err) } }) t.Run("refresh empty on pick in dedicated wire (multi)", func(t *testing.T) { m := &mockConn{DoFn: func(cmd Completed) ValkeyResult { return singleSlotResp }} client, err := newClusterClient( &ClientOption{InitAddress: []string{":0"}}, func(dst string, opt *ClientOption) conn { return m }, newRetryer(defaultRetryDelayFn), ) if err != nil { t.Fatalf("unexpected err %v", err) } var ch <-chan error if err := client.Dedicated(func(c DedicatedClient) error { ch = c.SetPubSubHooks(PubSubHooks{OnMessage: func(m PubSubMessage) {}}) for _, v := range c.DoMulti(context.Background(), c.B().Get().Key("a").Build()) { if err := v.Error(); err != nil { return err } } return nil }); err != ErrNoSlot { t.Fatalf("unexpected err %v", err) } if err := <-ch; err != ErrNoSlot { t.Fatalf("unexpected err %v", err) } }) t.Run("slot reconnect", func(t *testing.T) { var count, check int64 client, err := newClusterClient( &ClientOption{InitAddress: []string{":0"}}, func(dst string, opt *ClientOption) conn { atomic.AddInt64(&check, 1) return &mockConn{DoFn: func(cmd Completed) ValkeyResult { if strings.Join(cmd.Commands(), " ") == "CLUSTER SLOTS" { return slotsMultiResp } if atomic.AddInt64(&count, 1) <= 3 { return newResult(ValkeyMessage{typ: '-', string: "MOVED 0 :0"}, nil) } return newResult(ValkeyMessage{typ: '+', string: "b"}, nil) }} }, newRetryer(defaultRetryDelayFn), ) if err != nil { t.Fatalf("unexpected err %v", err) } if v, err := client.Do(context.Background(), client.B().Get().Key("a").Build()).ToString(); err != nil || v != "b" { t.Fatalf("unexpected resp %v %v", v, err) } if atomic.LoadInt64(&check) != 6 { t.Fatalf("unexpected check count %v", check) } }) t.Run("slot moved", func(t *testing.T) { var count int64 client, err := newClusterClient( &ClientOption{InitAddress: []string{":0"}}, func(dst string, opt *ClientOption) conn { return &mockConn{DoFn: func(cmd Completed) ValkeyResult { if strings.Join(cmd.Commands(), " ") == "CLUSTER SLOTS" { return slotsMultiResp } if atomic.AddInt64(&count, 1) <= 3 { return newResult(ValkeyMessage{typ: '-', string: "MOVED 0 :1"}, nil) } return newResult(ValkeyMessage{typ: '+', string: "b"}, nil) }} }, newRetryer(defaultRetryDelayFn), ) if err != nil { t.Fatalf("unexpected err %v", err) } if v, err := client.Do(context.Background(), client.B().Get().Key("a").Build()).ToString(); err != nil || v != "b" { t.Fatalf("unexpected resp %v %v", v, err) } }) t.Run("slot moved redirect once", func(t *testing.T) { var count int64 client, err := newClusterClient( &ClientOption{InitAddress: []string{":0"}}, func(dst string, opt *ClientOption) conn { return &mockConn{ DoFn: func(cmd Completed) ValkeyResult { if strings.Join(cmd.Commands(), " ") == "CLUSTER SLOTS" { return slotsMultiResp } if strings.Contains(dst, ":0") { atomic.AddInt64(&count, 1) return newResult(ValkeyMessage{typ: '-', string: "MOVED 0 :2"}, nil) } return newResult(ValkeyMessage{typ: '+', string: "b"}, nil) }, } }, newRetryer(defaultRetryDelayFn), ) if err != nil { t.Fatalf("unexpected err %v", err) } for i := 0; i < 10; i++ { if v, err := client.Do(context.Background(), client.B().Get().Key("a").Build()).ToString(); err != nil || v != "b" { t.Fatalf("unexpected resp %v %v", v, err) } } if atomic.LoadInt64(&count) != 1 { t.Fatalf("unexpected count %v", count) } }) t.Run("slot moved DoMulti (single)", func(t *testing.T) { var count int64 client, err := newClusterClient( &ClientOption{InitAddress: []string{":0"}}, func(dst string, opt *ClientOption) conn { return &mockConn{DoFn: func(cmd Completed) ValkeyResult { return slotsMultiResp }, DoMultiFn: func(multi ...Completed) *valkeyresults { ret := make([]ValkeyResult, len(multi)) if atomic.AddInt64(&count, 1) <= 3 { for i := range ret { ret[i] = newResult(ValkeyMessage{typ: '-', string: "MOVED 0 :1"}, nil) } return &valkeyresults{s: ret} } for i := range ret { ret[i] = newResult(ValkeyMessage{typ: '+', string: multi[i].Commands()[1]}, nil) } return &valkeyresults{s: ret} }} }, newRetryer(defaultRetryDelayFn), ) if err != nil { t.Fatalf("unexpected err %v", err) } if v, err := client.DoMulti(context.Background(), client.B().Get().Key("a").Build())[0].ToString(); err != nil || v != "a" { t.Fatalf("unexpected resp %v %v", v, err) } }) t.Run("slot moved DoMulti (single) redirect once", func(t *testing.T) { var count int64 client, err := newClusterClient( &ClientOption{InitAddress: []string{":0"}}, func(dst string, opt *ClientOption) conn { return &mockConn{ DoFn: func(cmd Completed) ValkeyResult { return slotsMultiResp }, DoMultiFn: func(multi ...Completed) *valkeyresults { ret := make([]ValkeyResult, len(multi)) if strings.Contains(dst, ":0") { atomic.AddInt64(&count, 1) for i := range ret { ret[i] = newResult(ValkeyMessage{typ: '-', string: "MOVED 0 :2"}, nil) } return &valkeyresults{s: ret} } for i := range ret { ret[i] = newResult(ValkeyMessage{typ: '+', string: multi[i].Commands()[1]}, nil) } return &valkeyresults{s: ret} }, } }, newRetryer(defaultRetryDelayFn), ) if err != nil { t.Fatalf("unexpected err %v", err) } for i := 0; i < 10; i++ { if v, err := client.DoMulti(context.Background(), client.B().Get().Key("a").Build())[0].ToString(); err != nil || v != "a" { t.Fatalf("unexpected resp %v %v", v, err) } } if atomic.LoadInt64(&count) != 1 { t.Fatalf("unexpected count %v", count) } }) t.Run("slot moved DoMulti transactions", func(t *testing.T) { var count int64 client, err := newClusterClient( &ClientOption{InitAddress: []string{":0"}}, func(dst string, opt *ClientOption) conn { return &mockConn{DoFn: func(cmd Completed) ValkeyResult { return slotsMultiResp }, DoMultiFn: func(multi ...Completed) *valkeyresults { switch atomic.AddInt64(&count, 1) { case 1: return &valkeyresults{s: []ValkeyResult{ newResult(ValkeyMessage{typ: '+', string: "1"}, nil), newResult(ValkeyMessage{typ: '+', string: "OK"}, nil), newResult(ValkeyMessage{typ: '+', string: "QUEUED"}, nil), newResult(ValkeyMessage{typ: '-', string: "MOVED 0 :1"}, nil), newResult(ValkeyMessage{typ: '-', string: "EXECABORT"}, nil), newResult(ValkeyMessage{typ: '+', string: "4"}, nil), newResult(ValkeyMessage{typ: '+', string: "OK"}, nil), newResult(ValkeyMessage{typ: '-', string: "MOVED 0 :1"}, nil), newResult(ValkeyMessage{typ: '+', string: "QUEUED"}, nil), newResult(ValkeyMessage{typ: '-', string: "EXECABORT"}, nil), newResult(ValkeyMessage{typ: '+', string: "7"}, nil), }} case 2: return &valkeyresults{s: []ValkeyResult{ newResult(ValkeyMessage{typ: '+', string: "OK"}, nil), newResult(ValkeyMessage{typ: '+', string: "QUEUED"}, nil), newResult(ValkeyMessage{typ: '+', string: "QUEUED"}, nil), newResult(ValkeyMessage{typ: '*', values: []ValkeyMessage{ {typ: '+', string: "2"}, {typ: '+', string: "3"}, }}, nil), newResult(ValkeyMessage{typ: '+', string: "OK"}, nil), newResult(ValkeyMessage{typ: '+', string: "QUEUED"}, nil), newResult(ValkeyMessage{typ: '+', string: "QUEUED"}, nil), newResult(ValkeyMessage{typ: '*', values: []ValkeyMessage{ {typ: '+', string: "5"}, {typ: '+', string: "6"}, }}, nil), }} } return nil }} }, newRetryer(defaultRetryDelayFn), ) if err != nil { t.Fatalf("unexpected err %v", err) } resps := client.DoMulti( context.Background(), client.B().Get().Key("1{t}").Build(), client.B().Multi().Build(), client.B().Get().Key("2{t}").Build(), client.B().Get().Key("3{t}").Build(), client.B().Exec().Build(), client.B().Get().Key("4{t}").Build(), client.B().Multi().Build(), client.B().Get().Key("5{t}").Build(), client.B().Get().Key("6{t}").Build(), client.B().Exec().Build(), client.B().Get().Key("7{t}").Build(), ) if v, err := resps[0].ToString(); err != nil || v != "1" { t.Fatalf("unexpected resp %v %v", v, err) } if v, err := resps[1].ToString(); err != nil || v != "OK" { t.Fatalf("unexpected resp %v %v", v, err) } if v, err := resps[2].ToString(); err != nil || v != "QUEUED" { t.Fatalf("unexpected resp %v %v", v, err) } if v, err := resps[3].ToString(); err != nil || v != "QUEUED" { t.Fatalf("unexpected resp %v %v", v, err) } if v, err := resps[4].AsStrSlice(); err != nil || !reflect.DeepEqual(v, []string{"2", "3"}) { t.Fatalf("unexpected resp %v %v", v, err) } if v, err := resps[5].ToString(); err != nil || v != "4" { t.Fatalf("unexpected resp %v %v", v, err) } if v, err := resps[6].ToString(); err != nil || v != "OK" { t.Fatalf("unexpected resp %v %v", v, err) } if v, err := resps[7].ToString(); err != nil || v != "QUEUED" { t.Fatalf("unexpected resp %v %v", v, err) } if v, err := resps[8].ToString(); err != nil || v != "QUEUED" { t.Fatalf("unexpected resp %v %v", v, err) } if v, err := resps[9].AsStrSlice(); err != nil || !reflect.DeepEqual(v, []string{"5", "6"}) { t.Fatalf("unexpected resp %v %v", v, err) } if v, err := resps[10].ToString(); err != nil || v != "7" { t.Fatalf("unexpected resp %v %v", v, err) } }) t.Run("slot moved DoMulti transactions ASKING", func(t *testing.T) { var count int64 client, err := newClusterClient( &ClientOption{InitAddress: []string{":0"}}, func(dst string, opt *ClientOption) conn { return &mockConn{DoFn: func(cmd Completed) ValkeyResult { return slotsMultiResp }, DoMultiFn: func(multi ...Completed) *valkeyresults { switch atomic.AddInt64(&count, 1) { case 1: return &valkeyresults{s: []ValkeyResult{ newResult(ValkeyMessage{typ: '+', string: "1"}, nil), newResult(ValkeyMessage{typ: '+', string: "OK"}, nil), newResult(ValkeyMessage{typ: '+', string: "QUEUED"}, nil), newResult(ValkeyMessage{typ: '-', string: "ASK 0 :1"}, nil), newResult(ValkeyMessage{typ: '-', string: "EXECABORT"}, nil), newResult(ValkeyMessage{typ: '+', string: "4"}, nil), newResult(ValkeyMessage{typ: '+', string: "OK"}, nil), newResult(ValkeyMessage{typ: '-', string: "ASK 0 :1"}, nil), newResult(ValkeyMessage{typ: '+', string: "QUEUED"}, nil), newResult(ValkeyMessage{typ: '-', string: "EXECABORT"}, nil), newResult(ValkeyMessage{typ: '+', string: "7"}, nil), }} case 2: return &valkeyresults{s: []ValkeyResult{ newResult(ValkeyMessage{typ: '+', string: "OK"}, nil), newResult(ValkeyMessage{typ: '+', string: "OK"}, nil), newResult(ValkeyMessage{typ: '+', string: "QUEUED"}, nil), newResult(ValkeyMessage{typ: '+', string: "QUEUED"}, nil), newResult(ValkeyMessage{typ: '*', values: []ValkeyMessage{ {typ: '+', string: "2"}, {typ: '+', string: "3"}, }}, nil), newResult(ValkeyMessage{typ: '+', string: "OK"}, nil), newResult(ValkeyMessage{typ: '+', string: "OK"}, nil), newResult(ValkeyMessage{typ: '+', string: "QUEUED"}, nil), newResult(ValkeyMessage{typ: '+', string: "QUEUED"}, nil), newResult(ValkeyMessage{typ: '*', values: []ValkeyMessage{ {typ: '+', string: "5"}, {typ: '+', string: "6"}, }}, nil), }} } return nil }} }, newRetryer(defaultRetryDelayFn), ) if err != nil { t.Fatalf("unexpected err %v", err) } resps := client.DoMulti( context.Background(), client.B().Get().Key("1{t}").Build(), client.B().Multi().Build(), client.B().Get().Key("2{t}").Build(), client.B().Get().Key("3{t}").Build(), client.B().Exec().Build(), client.B().Get().Key("4{t}").Build(), client.B().Multi().Build(), client.B().Get().Key("5{t}").Build(), client.B().Get().Key("6{t}").Build(), client.B().Exec().Build(), client.B().Get().Key("7{t}").Build(), ) if v, err := resps[0].ToString(); err != nil || v != "1" { t.Fatalf("unexpected resp %v %v", v, err) } if v, err := resps[1].ToString(); err != nil || v != "OK" { t.Fatalf("unexpected resp %v %v", v, err) } if v, err := resps[2].ToString(); err != nil || v != "QUEUED" { t.Fatalf("unexpected resp %v %v", v, err) } if v, err := resps[3].ToString(); err != nil || v != "QUEUED" { t.Fatalf("unexpected resp %v %v", v, err) } if v, err := resps[4].AsStrSlice(); err != nil || !reflect.DeepEqual(v, []string{"2", "3"}) { t.Fatalf("unexpected resp %v %v", v, err) } if v, err := resps[5].ToString(); err != nil || v != "4" { t.Fatalf("unexpected resp %v %v", v, err) } if v, err := resps[6].ToString(); err != nil || v != "OK" { t.Fatalf("unexpected resp %v %v", v, err) } if v, err := resps[7].ToString(); err != nil || v != "QUEUED" { t.Fatalf("unexpected resp %v %v", v, err) } if v, err := resps[8].ToString(); err != nil || v != "QUEUED" { t.Fatalf("unexpected resp %v %v", v, err) } if v, err := resps[9].AsStrSlice(); err != nil || !reflect.DeepEqual(v, []string{"5", "6"}) { t.Fatalf("unexpected resp %v %v", v, err) } if v, err := resps[10].ToString(); err != nil || v != "7" { t.Fatalf("unexpected resp %v %v", v, err) } }) t.Run("slot moved DoMulti except transactions", func(t *testing.T) { var count int64 client, err := newClusterClient( &ClientOption{InitAddress: []string{":0"}}, func(dst string, opt *ClientOption) conn { return &mockConn{DoFn: func(cmd Completed) ValkeyResult { return slotsMultiResp }, DoMultiFn: func(multi ...Completed) *valkeyresults { switch atomic.AddInt64(&count, 1) { case 1: return &valkeyresults{s: []ValkeyResult{ newResult(ValkeyMessage{typ: '-', string: "MOVED 0 :1"}, nil), newResult(ValkeyMessage{typ: '+', string: "OK"}, nil), newResult(ValkeyMessage{typ: '+', string: "QUEUED"}, nil), newResult(ValkeyMessage{typ: '+', string: "QUEUED"}, nil), newResult(ValkeyMessage{typ: '*', values: []ValkeyMessage{ {typ: '+', string: "2"}, {typ: '+', string: "3"}, }}, nil), newResult(ValkeyMessage{typ: '-', string: "MOVED 0 :1"}, nil), newResult(ValkeyMessage{typ: '+', string: "OK"}, nil), newResult(ValkeyMessage{typ: '+', string: "QUEUED"}, nil), newResult(ValkeyMessage{typ: '+', string: "QUEUED"}, nil), newResult(ValkeyMessage{typ: '*', values: []ValkeyMessage{ {typ: '+', string: "5"}, {typ: '+', string: "6"}, }}, nil), newResult(ValkeyMessage{typ: '-', string: "MOVED 0 :1"}, nil), }} case 2: return &valkeyresults{s: []ValkeyResult{ newResult(ValkeyMessage{typ: '+', string: "1"}, nil), newResult(ValkeyMessage{typ: '+', string: "4"}, nil), newResult(ValkeyMessage{typ: '+', string: "7"}, nil), }} } return nil }} }, newRetryer(defaultRetryDelayFn), ) if err != nil { t.Fatalf("unexpected err %v", err) } resps := client.DoMulti( context.Background(), client.B().Get().Key("1{t}").Build(), client.B().Multi().Build(), client.B().Get().Key("2{t}").Build(), client.B().Get().Key("3{t}").Build(), client.B().Exec().Build(), client.B().Get().Key("4{t}").Build(), client.B().Multi().Build(), client.B().Get().Key("5{t}").Build(), client.B().Get().Key("6{t}").Build(), client.B().Exec().Build(), client.B().Get().Key("7{t}").Build(), ) if v, err := resps[0].ToString(); err != nil || v != "1" { t.Fatalf("unexpected resp %v %v", v, err) } if v, err := resps[1].ToString(); err != nil || v != "OK" { t.Fatalf("unexpected resp %v %v", v, err) } if v, err := resps[2].ToString(); err != nil || v != "QUEUED" { t.Fatalf("unexpected resp %v %v", v, err) } if v, err := resps[3].ToString(); err != nil || v != "QUEUED" { t.Fatalf("unexpected resp %v %v", v, err) } if v, err := resps[4].AsStrSlice(); err != nil || !reflect.DeepEqual(v, []string{"2", "3"}) { t.Fatalf("unexpected resp %v %v", v, err) } if v, err := resps[5].ToString(); err != nil || v != "4" { t.Fatalf("unexpected resp %v %v", v, err) } if v, err := resps[6].ToString(); err != nil || v != "OK" { t.Fatalf("unexpected resp %v %v", v, err) } if v, err := resps[7].ToString(); err != nil || v != "QUEUED" { t.Fatalf("unexpected resp %v %v", v, err) } if v, err := resps[8].ToString(); err != nil || v != "QUEUED" { t.Fatalf("unexpected resp %v %v", v, err) } if v, err := resps[9].AsStrSlice(); err != nil || !reflect.DeepEqual(v, []string{"5", "6"}) { t.Fatalf("unexpected resp %v %v", v, err) } if v, err := resps[10].ToString(); err != nil || v != "7" { t.Fatalf("unexpected resp %v %v", v, err) } }) t.Run("slot moved DoMulti except transactions ASKING", func(t *testing.T) { var count int64 client, err := newClusterClient( &ClientOption{InitAddress: []string{":0"}}, func(dst string, opt *ClientOption) conn { return &mockConn{DoFn: func(cmd Completed) ValkeyResult { return slotsMultiResp }, DoMultiFn: func(multi ...Completed) *valkeyresults { switch atomic.AddInt64(&count, 1) { case 1: return &valkeyresults{s: []ValkeyResult{ newResult(ValkeyMessage{typ: '-', string: "ASK 0 :1"}, nil), newResult(ValkeyMessage{typ: '+', string: "OK"}, nil), newResult(ValkeyMessage{typ: '+', string: "QUEUED"}, nil), newResult(ValkeyMessage{typ: '+', string: "QUEUED"}, nil), newResult(ValkeyMessage{typ: '*', values: []ValkeyMessage{ {typ: '+', string: "2"}, {typ: '+', string: "3"}, }}, nil), newResult(ValkeyMessage{typ: '-', string: "ASK 0 :1"}, nil), newResult(ValkeyMessage{typ: '+', string: "OK"}, nil), newResult(ValkeyMessage{typ: '+', string: "QUEUED"}, nil), newResult(ValkeyMessage{typ: '+', string: "QUEUED"}, nil), newResult(ValkeyMessage{typ: '*', values: []ValkeyMessage{ {typ: '+', string: "5"}, {typ: '+', string: "6"}, }}, nil), newResult(ValkeyMessage{typ: '-', string: "ASK 0 :1"}, nil), }} case 2: return &valkeyresults{s: []ValkeyResult{ newResult(ValkeyMessage{typ: '+', string: "OK"}, nil), newResult(ValkeyMessage{typ: '+', string: "1"}, nil), newResult(ValkeyMessage{typ: '+', string: "OK"}, nil), newResult(ValkeyMessage{typ: '+', string: "4"}, nil), newResult(ValkeyMessage{typ: '+', string: "OK"}, nil), newResult(ValkeyMessage{typ: '+', string: "7"}, nil), }} } return nil }} }, newRetryer(defaultRetryDelayFn), ) if err != nil { t.Fatalf("unexpected err %v", err) } resps := client.DoMulti( context.Background(), client.B().Get().Key("1{t}").Build(), client.B().Multi().Build(), client.B().Get().Key("2{t}").Build(), client.B().Get().Key("3{t}").Build(), client.B().Exec().Build(), client.B().Get().Key("4{t}").Build(), client.B().Multi().Build(), client.B().Get().Key("5{t}").Build(), client.B().Get().Key("6{t}").Build(), client.B().Exec().Build(), client.B().Get().Key("7{t}").Build(), ) if v, err := resps[0].ToString(); err != nil || v != "1" { t.Fatalf("unexpected resp %v %v", v, err) } if v, err := resps[1].ToString(); err != nil || v != "OK" { t.Fatalf("unexpected resp %v %v", v, err) } if v, err := resps[2].ToString(); err != nil || v != "QUEUED" { t.Fatalf("unexpected resp %v %v", v, err) } if v, err := resps[3].ToString(); err != nil || v != "QUEUED" { t.Fatalf("unexpected resp %v %v", v, err) } if v, err := resps[4].AsStrSlice(); err != nil || !reflect.DeepEqual(v, []string{"2", "3"}) { t.Fatalf("unexpected resp %v %v", v, err) } if v, err := resps[5].ToString(); err != nil || v != "4" { t.Fatalf("unexpected resp %v %v", v, err) } if v, err := resps[6].ToString(); err != nil || v != "OK" { t.Fatalf("unexpected resp %v %v", v, err) } if v, err := resps[7].ToString(); err != nil || v != "QUEUED" { t.Fatalf("unexpected resp %v %v", v, err) } if v, err := resps[8].ToString(); err != nil || v != "QUEUED" { t.Fatalf("unexpected resp %v %v", v, err) } if v, err := resps[9].AsStrSlice(); err != nil || !reflect.DeepEqual(v, []string{"5", "6"}) { t.Fatalf("unexpected resp %v %v", v, err) } if v, err := resps[10].ToString(); err != nil || v != "7" { t.Fatalf("unexpected resp %v %v", v, err) } }) t.Run("slot moved DoMulti transactions mixed", func(t *testing.T) { var count int64 client, err := newClusterClient( &ClientOption{InitAddress: []string{":0"}}, func(dst string, opt *ClientOption) conn { return &mockConn{DoFn: func(cmd Completed) ValkeyResult { return slotsMultiResp }, DoMultiFn: func(multi ...Completed) *valkeyresults { switch atomic.AddInt64(&count, 1) { case 1: return &valkeyresults{s: []ValkeyResult{ newResult(ValkeyMessage{typ: '+', string: "1"}, nil), newResult(ValkeyMessage{typ: '+', string: "OK"}, nil), newResult(ValkeyMessage{typ: '-', string: "MOVED 0 :1"}, nil), newResult(ValkeyMessage{typ: '-', string: "MOVED 0 :1"}, nil), newResult(ValkeyMessage{typ: '-', string: "EXECABORT"}, nil), newResult(ValkeyMessage{typ: '-', string: "MOVED 0 :1"}, nil), newResult(ValkeyMessage{typ: '+', string: "OK"}, nil), newResult(ValkeyMessage{typ: '-', string: "MOVED 0 :1"}, nil), newResult(ValkeyMessage{typ: '-', string: "MOVED 0 :1"}, nil), newResult(ValkeyMessage{typ: '-', string: "EXECABORT"}, nil), newResult(ValkeyMessage{typ: '+', string: "7"}, nil), }} case 2: return &valkeyresults{s: []ValkeyResult{ newResult(ValkeyMessage{typ: '+', string: "OK"}, nil), newResult(ValkeyMessage{typ: '+', string: "QUEUED"}, nil), newResult(ValkeyMessage{typ: '+', string: "QUEUED"}, nil), newResult(ValkeyMessage{typ: '*', values: []ValkeyMessage{ {typ: '+', string: "2"}, {typ: '+', string: "3"}, }}, nil), newResult(ValkeyMessage{typ: '+', string: "4"}, nil), newResult(ValkeyMessage{typ: '+', string: "OK"}, nil), newResult(ValkeyMessage{typ: '+', string: "QUEUED"}, nil), newResult(ValkeyMessage{typ: '+', string: "QUEUED"}, nil), newResult(ValkeyMessage{typ: '*', values: []ValkeyMessage{ {typ: '+', string: "5"}, {typ: '+', string: "6"}, }}, nil), }} } return nil }} }, newRetryer(defaultRetryDelayFn), ) if err != nil { t.Fatalf("unexpected err %v", err) } resps := client.DoMulti( context.Background(), client.B().Get().Key("1{t}").Build(), client.B().Multi().Build(), client.B().Get().Key("2{t}").Build(), client.B().Get().Key("3{t}").Build(), client.B().Exec().Build(), client.B().Get().Key("4{t}").Build(), client.B().Multi().Build(), client.B().Get().Key("5{t}").Build(), client.B().Get().Key("6{t}").Build(), client.B().Exec().Build(), client.B().Get().Key("7{t}").Build(), ) if v, err := resps[0].ToString(); err != nil || v != "1" { t.Fatalf("unexpected resp %v %v", v, err) } if v, err := resps[1].ToString(); err != nil || v != "OK" { t.Fatalf("unexpected resp %v %v", v, err) } if v, err := resps[2].ToString(); err != nil || v != "QUEUED" { t.Fatalf("unexpected resp %v %v", v, err) } if v, err := resps[3].ToString(); err != nil || v != "QUEUED" { t.Fatalf("unexpected resp %v %v", v, err) } if v, err := resps[4].AsStrSlice(); err != nil || !reflect.DeepEqual(v, []string{"2", "3"}) { t.Fatalf("unexpected resp %v %v", v, err) } if v, err := resps[5].ToString(); err != nil || v != "4" { t.Fatalf("unexpected resp %v %v", v, err) } if v, err := resps[6].ToString(); err != nil || v != "OK" { t.Fatalf("unexpected resp %v %v", v, err) } if v, err := resps[7].ToString(); err != nil || v != "QUEUED" { t.Fatalf("unexpected resp %v %v", v, err) } if v, err := resps[8].ToString(); err != nil || v != "QUEUED" { t.Fatalf("unexpected resp %v %v", v, err) } if v, err := resps[9].AsStrSlice(); err != nil || !reflect.DeepEqual(v, []string{"5", "6"}) { t.Fatalf("unexpected resp %v %v", v, err) } if v, err := resps[10].ToString(); err != nil || v != "7" { t.Fatalf("unexpected resp %v %v", v, err) } }) t.Run("slot moved DoMulti transactions mixed ASKING", func(t *testing.T) { var count int64 client, err := newClusterClient( &ClientOption{InitAddress: []string{":0"}}, func(dst string, opt *ClientOption) conn { return &mockConn{DoFn: func(cmd Completed) ValkeyResult { return slotsMultiResp }, DoMultiFn: func(multi ...Completed) *valkeyresults { switch atomic.AddInt64(&count, 1) { case 1: return &valkeyresults{s: []ValkeyResult{ newResult(ValkeyMessage{typ: '+', string: "1"}, nil), newResult(ValkeyMessage{typ: '+', string: "OK"}, nil), newResult(ValkeyMessage{typ: '-', string: "ASK 0 :1"}, nil), newResult(ValkeyMessage{typ: '-', string: "ASK 0 :1"}, nil), newResult(ValkeyMessage{typ: '-', string: "EXECABORT"}, nil), newResult(ValkeyMessage{typ: '-', string: "ASK 0 :1"}, nil), newResult(ValkeyMessage{typ: '+', string: "OK"}, nil), newResult(ValkeyMessage{typ: '-', string: "ASK 0 :1"}, nil), newResult(ValkeyMessage{typ: '-', string: "ASK 0 :1"}, nil), newResult(ValkeyMessage{typ: '-', string: "EXECABORT"}, nil), newResult(ValkeyMessage{typ: '+', string: "7"}, nil), }} case 2: return &valkeyresults{s: []ValkeyResult{ newResult(ValkeyMessage{typ: '+', string: "OK"}, nil), newResult(ValkeyMessage{typ: '+', string: "OK"}, nil), newResult(ValkeyMessage{typ: '+', string: "QUEUED"}, nil), newResult(ValkeyMessage{typ: '+', string: "QUEUED"}, nil), newResult(ValkeyMessage{typ: '*', values: []ValkeyMessage{ {typ: '+', string: "2"}, {typ: '+', string: "3"}, }}, nil), newResult(ValkeyMessage{typ: '+', string: "OK"}, nil), newResult(ValkeyMessage{typ: '+', string: "4"}, nil), newResult(ValkeyMessage{typ: '+', string: "OK"}, nil), newResult(ValkeyMessage{typ: '+', string: "OK"}, nil), newResult(ValkeyMessage{typ: '+', string: "QUEUED"}, nil), newResult(ValkeyMessage{typ: '+', string: "QUEUED"}, nil), newResult(ValkeyMessage{typ: '*', values: []ValkeyMessage{ {typ: '+', string: "5"}, {typ: '+', string: "6"}, }}, nil), }} } return nil }} }, newRetryer(defaultRetryDelayFn), ) if err != nil { t.Fatalf("unexpected err %v", err) } resps := client.DoMulti( context.Background(), client.B().Get().Key("1{t}").Build(), client.B().Multi().Build(), client.B().Get().Key("2{t}").Build(), client.B().Get().Key("3{t}").Build(), client.B().Exec().Build(), client.B().Get().Key("4{t}").Build(), client.B().Multi().Build(), client.B().Get().Key("5{t}").Build(), client.B().Get().Key("6{t}").Build(), client.B().Exec().Build(), client.B().Get().Key("7{t}").Build(), ) if v, err := resps[0].ToString(); err != nil || v != "1" { t.Fatalf("unexpected resp %v %v", v, err) } if v, err := resps[1].ToString(); err != nil || v != "OK" { t.Fatalf("unexpected resp %v %v", v, err) } if v, err := resps[2].ToString(); err != nil || v != "QUEUED" { t.Fatalf("unexpected resp %v %v", v, err) } if v, err := resps[3].ToString(); err != nil || v != "QUEUED" { t.Fatalf("unexpected resp %v %v", v, err) } if v, err := resps[4].AsStrSlice(); err != nil || !reflect.DeepEqual(v, []string{"2", "3"}) { t.Fatalf("unexpected resp %v %v", v, err) } if v, err := resps[5].ToString(); err != nil || v != "4" { t.Fatalf("unexpected resp %v %v", v, err) } if v, err := resps[6].ToString(); err != nil || v != "OK" { t.Fatalf("unexpected resp %v %v", v, err) } if v, err := resps[7].ToString(); err != nil || v != "QUEUED" { t.Fatalf("unexpected resp %v %v", v, err) } if v, err := resps[8].ToString(); err != nil || v != "QUEUED" { t.Fatalf("unexpected resp %v %v", v, err) } if v, err := resps[9].AsStrSlice(); err != nil || !reflect.DeepEqual(v, []string{"5", "6"}) { t.Fatalf("unexpected resp %v %v", v, err) } if v, err := resps[10].ToString(); err != nil || v != "7" { t.Fatalf("unexpected resp %v %v", v, err) } }) t.Run("slot moved DoMulti transactions edge cases 1", func(t *testing.T) { var count int64 client, err := newClusterClient( &ClientOption{InitAddress: []string{":0"}}, func(dst string, opt *ClientOption) conn { return &mockConn{DoFn: func(cmd Completed) ValkeyResult { return slotsMultiResp }, DoMultiFn: func(multi ...Completed) *valkeyresults { switch atomic.AddInt64(&count, 1) { case 1: return &valkeyresults{s: []ValkeyResult{ newResult(ValkeyMessage{typ: '+', string: "1"}, nil), newResult(ValkeyMessage{typ: '+', string: "OK"}, nil), newResult(ValkeyMessage{typ: '-', string: "ERR Command not allowed inside a transaction"}, nil), newResult(ValkeyMessage{typ: '-', string: "MOVED 0 :1"}, nil), newResult(ValkeyMessage{typ: '-', string: "MOVED 0 :1"}, nil), newResult(ValkeyMessage{typ: '-', string: "EXECABORT"}, nil), newResult(ValkeyMessage{typ: '-', string: "MOVED 0 :1"}, nil), }} case 2: return &valkeyresults{s: []ValkeyResult{ newResult(ValkeyMessage{typ: '+', string: "4"}, nil), }} } return nil }} }, newRetryer(defaultRetryDelayFn), ) if err != nil { t.Fatalf("unexpected err %v", err) } resps := client.DoMulti( context.Background(), client.B().Get().Key("1{t}").Build(), client.B().Multi().Build(), client.B().Multi().Build(), // nested multi client.B().Get().Key("2{t}").Build(), client.B().Get().Key("3{t}").Build(), client.B().Exec().Build(), client.B().Get().Key("4{t}").Build(), ) if v, err := resps[0].ToString(); err != nil || v != "1" { t.Fatalf("unexpected resp %v %v", v, err) } if v, err := resps[1].ToString(); err != nil || v != "OK" { t.Fatalf("unexpected resp %v %v", v, err) } if err := resps[2].Error(); err == nil || !strings.Contains(err.Error(), "Command not allowed inside a transaction") { t.Fatalf("unexpected err %v", err) } if err := resps[3].Error(); err == nil || !strings.Contains(err.Error(), "MOVED 0 :1") { t.Fatalf("unexpected err %v", err) } if err := resps[4].Error(); err == nil || !strings.Contains(err.Error(), "MOVED 0 :1") { t.Fatalf("unexpected err %v", err) } if err := resps[5].Error(); err == nil || !strings.Contains(err.Error(), "EXECABORT") { t.Fatalf("unexpected err %v", err) } if v, err := resps[6].ToString(); err != nil || v != "4" { t.Fatalf("unexpected resp %v %v", v, err) } }) t.Run("slot moved DoMulti transactions edge cases 2", func(t *testing.T) { var count int64 client, err := newClusterClient( &ClientOption{InitAddress: []string{":0"}}, func(dst string, opt *ClientOption) conn { return &mockConn{DoFn: func(cmd Completed) ValkeyResult { return slotsMultiResp }, DoMultiFn: func(multi ...Completed) *valkeyresults { switch atomic.AddInt64(&count, 1) { case 1: return &valkeyresults{s: []ValkeyResult{ newResult(ValkeyMessage{typ: '+', string: "1"}, nil), newResult(ValkeyMessage{typ: '+', string: "OK"}, nil), newResult(ValkeyMessage{typ: '-', string: "MOVED 0 :1"}, nil), newResult(ValkeyMessage{typ: '-', string: "ERR Command not allowed inside a transaction"}, nil), newResult(ValkeyMessage{typ: '-', string: "MOVED 0 :1"}, nil), newResult(ValkeyMessage{typ: '-', string: "EXECABORT"}, nil), newResult(ValkeyMessage{typ: '-', string: "MOVED 0 :1"}, nil), }} case 2: return &valkeyresults{s: []ValkeyResult{ newResult(ValkeyMessage{typ: '+', string: "4"}, nil), }} } return nil }} }, newRetryer(defaultRetryDelayFn), ) if err != nil { t.Fatalf("unexpected err %v", err) } resps := client.DoMulti( context.Background(), client.B().Get().Key("1{t}").Build(), client.B().Multi().Build(), client.B().Get().Key("2{t}").Build(), client.B().Multi().Build(), // nested multi client.B().Get().Key("3{t}").Build(), client.B().Exec().Build(), client.B().Get().Key("4{t}").Build(), ) if v, err := resps[0].ToString(); err != nil || v != "1" { t.Fatalf("unexpected resp %v %v", v, err) } if v, err := resps[1].ToString(); err != nil || v != "OK" { t.Fatalf("unexpected resp %v %v", v, err) } if err := resps[2].Error(); err == nil || !strings.Contains(err.Error(), "MOVED 0 :1") { t.Fatalf("unexpected err %v", err) } if err := resps[3].Error(); err == nil || !strings.Contains(err.Error(), "Command not allowed inside a transaction") { t.Fatalf("unexpected err %v", err) } if err := resps[4].Error(); err == nil || !strings.Contains(err.Error(), "MOVED 0 :1") { t.Fatalf("unexpected err %v", err) } if err := resps[5].Error(); err == nil || !strings.Contains(err.Error(), "EXECABORT") { t.Fatalf("unexpected err %v", err) } if v, err := resps[6].ToString(); err != nil || v != "4" { t.Fatalf("unexpected resp %v %v", v, err) } }) t.Run("slot moved DoMulti transactions edge cases 3", func(t *testing.T) { var count int64 client, err := newClusterClient( &ClientOption{InitAddress: []string{":0"}}, func(dst string, opt *ClientOption) conn { return &mockConn{DoFn: func(cmd Completed) ValkeyResult { return slotsMultiResp }, DoMultiFn: func(multi ...Completed) *valkeyresults { switch atomic.AddInt64(&count, 1) { case 1: return &valkeyresults{s: []ValkeyResult{ newResult(ValkeyMessage{typ: '+', string: "1"}, nil), newResult(ValkeyMessage{typ: '+', string: "OK"}, nil), newResult(ValkeyMessage{typ: '-', string: "MOVED 0 :1"}, nil), newResult(ValkeyMessage{typ: '-', string: "ERR Command not allowed inside a transaction"}, nil), newResult(ValkeyMessage{typ: '-', string: "MOVED 0 :1"}, nil), }} } return nil }} }, newRetryer(defaultRetryDelayFn), ) if err != nil { t.Fatalf("unexpected err %v", err) } resps := client.DoMulti( context.Background(), client.B().Get().Key("1{t}").Build(), client.B().Multi().Build(), client.B().Get().Key("2{t}").Build(), client.B().Multi().Build(), // nested multi client.B().Get().Key("3{t}").Build(), ) if v, err := resps[0].ToString(); err != nil || v != "1" { t.Fatalf("unexpected resp %v %v", v, err) } if v, err := resps[1].ToString(); err != nil || v != "OK" { t.Fatalf("unexpected resp %v %v", v, err) } if err := resps[2].Error(); err == nil || !strings.Contains(err.Error(), "MOVED 0 :1") { t.Fatalf("unexpected err %v", err) } if err := resps[3].Error(); err == nil || !strings.Contains(err.Error(), "Command not allowed inside a transaction") { t.Fatalf("unexpected err %v", err) } if err := resps[4].Error(); err == nil || !strings.Contains(err.Error(), "MOVED 0 :1") { t.Fatalf("unexpected err %v", err) } }) t.Run("slot moved DoMulti (multi)", func(t *testing.T) { var count int64 client, err := newClusterClient( &ClientOption{InitAddress: []string{":0"}}, func(dst string, opt *ClientOption) conn { return &mockConn{DoFn: func(cmd Completed) ValkeyResult { return slotsMultiResp }, DoMultiFn: func(multi ...Completed) *valkeyresults { ret := make([]ValkeyResult, len(multi)) if atomic.AddInt64(&count, 1) <= 3 { for i := range ret { ret[i] = newResult(ValkeyMessage{typ: '-', string: "MOVED 0 :1"}, nil) } return &valkeyresults{s: ret} } for i := range ret { ret[i] = newResult(ValkeyMessage{typ: '+', string: multi[i].Commands()[1]}, nil) } return &valkeyresults{s: ret} }} }, newRetryer(defaultRetryDelayFn), ) if err != nil { t.Fatalf("unexpected err %v", err) } for i, resp := range client.DoMulti(context.Background(), client.B().Set().Key("a").Value("a").Build(), client.B().Get().Key("b").Build()) { if v, err := resp.ToString(); err != nil { t.Fatalf("unexpected resp %v %v", v, err) } else if i == 0 && v != "a" { t.Fatalf("unexpected resp %v %v", v, err) } else if i == 1 && v != "b" { t.Fatalf("unexpected resp %v %v", v, err) } } }) t.Run("slot moved DoMulti (multi) TRYAGAIN", func(t *testing.T) { var count int64 client, err := newClusterClient( &ClientOption{InitAddress: []string{":0"}}, func(dst string, opt *ClientOption) conn { return &mockConn{DoFn: func(cmd Completed) ValkeyResult { return slotsMultiResp }, DoMultiFn: func(multi ...Completed) *valkeyresults { ret := make([]ValkeyResult, len(multi)) if atomic.AddInt64(&count, 1) <= 2 { for i := range ret { ret[i] = newResult(ValkeyMessage{typ: '-', string: "TRYAGAIN"}, nil) } return &valkeyresults{s: ret} } for i := range ret { ret[i] = newResult(ValkeyMessage{typ: '+', string: multi[i].Commands()[1]}, nil) } return &valkeyresults{s: ret} }} }, newRetryer(defaultRetryDelayFn), ) if err != nil { t.Fatalf("unexpected err %v", err) } for i, resp := range client.DoMulti(context.Background(), client.B().Set().Key("a").Value("a").Build(), client.B().Get().Key("b").Build()) { if v, err := resp.ToString(); err != nil && i != 0 && !strings.Contains(err.Error(), "TRYAGAIN") { t.Fatalf("unexpected resp %v %v", v, err) } else if i == 1 && v != "b" { t.Fatalf("unexpected resp %v %v", v, err) } } }) t.Run("slot moved new", func(t *testing.T) { var count, check int64 client, err := newClusterClient( &ClientOption{InitAddress: []string{":0"}}, func(dst string, opt *ClientOption) conn { if dst == ":2" { atomic.AddInt64(&check, 1) } return &mockConn{DoFn: func(cmd Completed) ValkeyResult { if strings.Join(cmd.Commands(), " ") == "CLUSTER SLOTS" { return slotsMultiResp } if atomic.AddInt64(&count, 1) <= 3 { return newResult(ValkeyMessage{typ: '-', string: "MOVED 0 :2"}, nil) } return newResult(ValkeyMessage{typ: '+', string: "b"}, nil) }} }, newRetryer(defaultRetryDelayFn), ) if err != nil { t.Fatalf("unexpected err %v", err) } if v, err := client.Do(context.Background(), client.B().Get().Key("a").Build()).ToString(); err != nil || v != "b" { t.Fatalf("unexpected resp %v %v", v, err) } if atomic.LoadInt64(&check) == 0 { t.Fatalf("unexpected check value %v", check) } }) t.Run("slot moved new (multi 1)", func(t *testing.T) { var count, check int64 client, err := newClusterClient( &ClientOption{InitAddress: []string{":0"}}, func(dst string, opt *ClientOption) conn { if dst == ":2" { atomic.AddInt64(&check, 1) } return &mockConn{DoFn: func(cmd Completed) ValkeyResult { return slotsMultiResp }, DoMultiFn: func(multi ...Completed) *valkeyresults { ret := make([]ValkeyResult, len(multi)) if atomic.AddInt64(&count, 1) <= 3 { for i := range ret { ret[i] = newResult(ValkeyMessage{typ: '-', string: "MOVED 0 :2"}, nil) } return &valkeyresults{s: ret} } for i := range ret { ret[i] = newResult(ValkeyMessage{typ: '+', string: multi[i].Commands()[1]}, nil) } return &valkeyresults{s: ret} }} }, newRetryer(defaultRetryDelayFn), ) if err != nil { t.Fatalf("unexpected err %v", err) } if v, err := client.DoMulti(context.Background(), client.B().Get().Key("a").Build())[0].ToString(); err != nil || v != "a" { t.Fatalf("unexpected resp %v %v", v, err) } if atomic.LoadInt64(&check) == 0 { t.Fatalf("unexpected check value %v", check) } }) t.Run("slot moved new (multi 2)", func(t *testing.T) { var count, check int64 client, err := newClusterClient( &ClientOption{InitAddress: []string{":0"}}, func(dst string, opt *ClientOption) conn { if dst == ":2" { atomic.AddInt64(&check, 1) } return &mockConn{DoFn: func(cmd Completed) ValkeyResult { return slotsMultiResp }, DoMultiFn: func(multi ...Completed) *valkeyresults { ret := make([]ValkeyResult, len(multi)) if atomic.AddInt64(&count, 1) <= 3 { for i := range ret { ret[i] = newResult(ValkeyMessage{typ: '-', string: "MOVED 0 :2"}, nil) } return &valkeyresults{s: ret} } for i := range ret { ret[i] = newResult(ValkeyMessage{typ: '+', string: multi[i].Commands()[1]}, nil) } return &valkeyresults{s: ret} }} }, newRetryer(defaultRetryDelayFn), ) if err != nil { t.Fatalf("unexpected err %v", err) } for i, resp := range client.DoMulti(context.Background(), client.B().Set().Key("a").Value("a").Build(), client.B().Get().Key("b").Build()) { if v, err := resp.ToString(); err != nil { t.Fatalf("unexpected resp %v %v", v, err) } else if i == 0 && v != "a" { t.Fatalf("unexpected resp %v %v", v, err) } else if i == 1 && v != "b" { t.Fatalf("unexpected resp %v %v", v, err) } } if atomic.LoadInt64(&check) == 0 { t.Fatalf("unexpected check value %v", check) } }) t.Run("slot moved new (multi 2) TRYAGAIN", func(t *testing.T) { var count int64 client, err := newClusterClient( &ClientOption{InitAddress: []string{":0"}}, func(dst string, opt *ClientOption) conn { return &mockConn{DoFn: func(cmd Completed) ValkeyResult { return slotsMultiResp }, DoMultiFn: func(multi ...Completed) *valkeyresults { ret := make([]ValkeyResult, len(multi)) if atomic.AddInt64(&count, 1) <= 2 { for i := range ret { ret[i] = newResult(ValkeyMessage{typ: '-', string: "TRYAGAIN"}, nil) } return &valkeyresults{s: ret} } for i := range ret { ret[i] = newResult(ValkeyMessage{typ: '+', string: multi[i].Commands()[1]}, nil) } return &valkeyresults{s: ret} }} }, newRetryer(defaultRetryDelayFn), ) if err != nil { t.Fatalf("unexpected err %v", err) } for i, resp := range client.DoMulti(context.Background(), client.B().Set().Key("a").Value("a").Build(), client.B().Get().Key("b").Build()) { if v, err := resp.ToString(); err != nil && i != 0 && !strings.Contains(err.Error(), "TRYAGAIN") { t.Fatalf("unexpected resp %v %v", v, err) } else if i == 1 && v != "b" { t.Fatalf("unexpected resp %v %v", v, err) } } }) t.Run("slot moved (cache)", func(t *testing.T) { var count int64 client, err := newClusterClient( &ClientOption{InitAddress: []string{":0"}}, func(dst string, opt *ClientOption) conn { return &mockConn{ DoFn: func(cmd Completed) ValkeyResult { return slotsMultiResp }, DoCacheFn: func(cmd Cacheable, ttl time.Duration) ValkeyResult { if atomic.AddInt64(&count, 1) <= 3 { return newResult(ValkeyMessage{typ: '-', string: "MOVED 0 :1"}, nil) } return newResult(ValkeyMessage{typ: '+', string: "b"}, nil) }, } }, newRetryer(defaultRetryDelayFn), ) if err != nil { t.Fatalf("unexpected err %v", err) } if v, err := client.DoCache(context.Background(), client.B().Get().Key("a").Cache(), 100).ToString(); err != nil || v != "b" { t.Fatalf("unexpected resp %v %v", v, err) } }) t.Run("slot moved (cache) redirect once", func(t *testing.T) { var count int64 client, err := newClusterClient( &ClientOption{InitAddress: []string{":0"}}, func(dst string, opt *ClientOption) conn { return &mockConn{ DoFn: func(cmd Completed) ValkeyResult { return slotsMultiResp }, DoCacheFn: func(cmd Cacheable, ttl time.Duration) ValkeyResult { if strings.Contains(dst, ":0") { atomic.AddInt64(&count, 1) return newResult(ValkeyMessage{typ: '-', string: "MOVED 0 :2"}, nil) } return newResult(ValkeyMessage{typ: '+', string: "b"}, nil) }, } }, newRetryer(defaultRetryDelayFn), ) if err != nil { t.Fatalf("unexpected err %v", err) } for i := 0; i < 10; i++ { if v, err := client.DoCache(context.Background(), client.B().Get().Key("a").Cache(), 100).ToString(); err != nil || v != "b" { t.Fatalf("unexpected resp %v %v", v, err) } } if atomic.LoadInt64(&count) != 1 { t.Fatalf("unexpected count %v", count) } }) t.Run("slot moved (cache multi 1)", func(t *testing.T) { var count int64 client, err := newClusterClient( &ClientOption{InitAddress: []string{":0"}}, func(dst string, opt *ClientOption) conn { return &mockConn{DoFn: func(cmd Completed) ValkeyResult { return slotsMultiResp }, DoMultiCacheFn: func(multi ...CacheableTTL) *valkeyresults { ret := make([]ValkeyResult, len(multi)) if atomic.AddInt64(&count, 1) <= 3 { for i := range ret { ret[i] = newResult(ValkeyMessage{typ: '-', string: "MOVED 0 :1"}, nil) } return &valkeyresults{s: ret} } for i := range ret { ret[i] = newResult(ValkeyMessage{typ: '+', string: multi[i].Cmd.Commands()[1]}, nil) } return &valkeyresults{s: ret} }} }, newRetryer(defaultRetryDelayFn), ) if err != nil { t.Fatalf("unexpected err %v", err) } if v, err := client.DoMultiCache(context.Background(), CT(client.B().Get().Key("a").Cache(), 100))[0].ToString(); err != nil || v != "a" { t.Fatalf("unexpected resp %v %v", v, err) } }) t.Run("slot moved (cache multi 1) redirect once", func(t *testing.T) { var count int64 client, err := newClusterClient( &ClientOption{InitAddress: []string{":0"}}, func(dst string, opt *ClientOption) conn { return &mockConn{ DoFn: func(cmd Completed) ValkeyResult { return slotsMultiResp }, DoMultiCacheFn: func(multi ...CacheableTTL) *valkeyresults { ret := make([]ValkeyResult, len(multi)) if strings.Contains(dst, ":0") { atomic.AddInt64(&count, 1) for i := range ret { ret[i] = newResult(ValkeyMessage{typ: '-', string: "MOVED 0 :2"}, nil) } return &valkeyresults{s: ret} } for i := range ret { ret[i] = newResult(ValkeyMessage{typ: '+', string: multi[i].Cmd.Commands()[1]}, nil) } return &valkeyresults{s: ret} }, } }, newRetryer(defaultRetryDelayFn), ) if err != nil { t.Fatalf("unexpected err %v", err) } for i := 0; i < 10; i++ { if v, err := client.DoMultiCache(context.Background(), CT(client.B().Get().Key("a").Cache(), 100))[0].ToString(); err != nil || v != "a" { t.Fatalf("unexpected resp %v %v", v, err) } } if atomic.LoadInt64(&count) != 1 { t.Fatalf("unexpected count %v", count) } }) t.Run("slot moved (cache multi 2)", func(t *testing.T) { var count int64 client, err := newClusterClient( &ClientOption{InitAddress: []string{":0"}}, func(dst string, opt *ClientOption) conn { return &mockConn{DoFn: func(cmd Completed) ValkeyResult { return slotsMultiResp }, DoMultiCacheFn: func(multi ...CacheableTTL) *valkeyresults { ret := make([]ValkeyResult, len(multi)) if atomic.AddInt64(&count, 1) <= 3 { for i := range ret { ret[i] = newResult(ValkeyMessage{typ: '-', string: "MOVED 0 :1"}, nil) } return &valkeyresults{s: ret} } for i := range ret { ret[i] = newResult(ValkeyMessage{typ: '+', string: multi[i].Cmd.Commands()[1]}, nil) } return &valkeyresults{s: ret} }} }, newRetryer(defaultRetryDelayFn), ) if err != nil { t.Fatalf("unexpected err %v", err) } for i, resp := range client.DoMultiCache(context.Background(), CT(client.B().Get().Key("a").Cache(), 100), CT(client.B().Get().Key("b").Cache(), 100)) { if v, err := resp.ToString(); err != nil { t.Fatalf("unexpected resp %v %v", v, err) } else if i == 0 && v != "a" { t.Fatalf("unexpected resp %v %v", v, err) } else if i == 1 && v != "b" { t.Fatalf("unexpected resp %v %v", v, err) } } }) t.Run("slot asking", func(t *testing.T) { var count int64 client, err := newClusterClient( &ClientOption{InitAddress: []string{":0"}}, func(dst string, opt *ClientOption) conn { return &mockConn{ DoFn: func(cmd Completed) ValkeyResult { if strings.Join(cmd.Commands(), " ") == "CLUSTER SLOTS" { return slotsMultiResp } return newResult(ValkeyMessage{typ: '-', string: "ASK 0 :1"}, nil) }, DoMultiFn: func(multi ...Completed) *valkeyresults { if atomic.AddInt64(&count, 1) <= 3 { return &valkeyresults{s: []ValkeyResult{{}, newResult(ValkeyMessage{typ: '-', string: "ASK 0 :1"}, nil)}} } return &valkeyresults{s: []ValkeyResult{{}, newResult(ValkeyMessage{typ: '+', string: "b"}, nil)}} }, } }, newRetryer(defaultRetryDelayFn), ) if err != nil { t.Fatalf("unexpected err %v", err) } if v, err := client.Do(context.Background(), client.B().Get().Key("a").Build()).ToString(); err != nil || v != "b" { t.Fatalf("unexpected resp %v %v", v, err) } }) t.Run("slot asking DoMulti (single)", func(t *testing.T) { var count int64 client, err := newClusterClient( &ClientOption{InitAddress: []string{":0"}}, func(dst string, opt *ClientOption) conn { return &mockConn{ DoFn: func(cmd Completed) ValkeyResult { return slotsMultiResp }, DoMultiFn: func(multi ...Completed) *valkeyresults { ret := make([]ValkeyResult, len(multi)) if atomic.AddInt64(&count, 1) <= 3 { for i := range ret { ret[i] = newResult(ValkeyMessage{typ: '-', string: "ASK 0 :1"}, nil) } return &valkeyresults{s: ret} } for i := 0; i < len(multi); i += 2 { ret[i] = newResult(ValkeyMessage{typ: '+', string: "OK"}, nil) ret[i+1] = newResult(ValkeyMessage{typ: '+', string: multi[i+1].Commands()[1]}, nil) } return &valkeyresults{s: ret} }, } }, newRetryer(defaultRetryDelayFn), ) if err != nil { t.Fatalf("unexpected err %v", err) } if v, err := client.DoMulti(context.Background(), client.B().Get().Key("a").Build())[0].ToString(); err != nil || v != "a" { t.Fatalf("unexpected resp %v %v", v, err) } }) t.Run("slot asking DoMulti (multi)", func(t *testing.T) { var count int64 client, err := newClusterClient( &ClientOption{InitAddress: []string{":0"}}, func(dst string, opt *ClientOption) conn { return &mockConn{ DoFn: func(cmd Completed) ValkeyResult { return slotsMultiResp }, DoMultiFn: func(multi ...Completed) *valkeyresults { ret := make([]ValkeyResult, len(multi)) if atomic.AddInt64(&count, 1) <= 3 { for i := range ret { ret[i] = newResult(ValkeyMessage{typ: '-', string: "ASK 0 :1"}, nil) } return &valkeyresults{s: ret} } for i := 0; i < len(multi); i += 2 { ret[i] = newResult(ValkeyMessage{typ: '+', string: "OK"}, nil) ret[i+1] = newResult(ValkeyMessage{typ: '+', string: multi[i+1].Commands()[1]}, nil) } return &valkeyresults{s: ret} }, } }, newRetryer(defaultRetryDelayFn), ) if err != nil { t.Fatalf("unexpected err %v", err) } for i, resp := range client.DoMulti(context.Background(), client.B().Get().Key("a").Build(), client.B().Get().Key("b").Build()) { if v, err := resp.ToString(); err != nil { t.Fatalf("unexpected resp %v %v", v, err) } else if i == 0 && v != "a" { t.Fatalf("unexpected resp %v %v", v, err) } else if i == 1 && v != "b" { t.Fatalf("unexpected resp %v %v", v, err) } } }) t.Run("slot asking (cache)", func(t *testing.T) { var count int64 client, err := newClusterClient( &ClientOption{InitAddress: []string{":0"}}, func(dst string, opt *ClientOption) conn { return &mockConn{ DoFn: func(cmd Completed) ValkeyResult { return slotsMultiResp }, DoCacheFn: func(cmd Cacheable, ttl time.Duration) ValkeyResult { return newResult(ValkeyMessage{typ: '-', string: "ASK 0 :1"}, nil) }, DoMultiFn: func(multi ...Completed) *valkeyresults { if atomic.AddInt64(&count, 1) <= 3 { return &valkeyresults{s: []ValkeyResult{{}, {}, {}, {}, {}, newResult(ValkeyMessage{typ: '-', string: "ASK 0 :1"}, nil)}} } return &valkeyresults{s: []ValkeyResult{{}, {}, {}, {}, {}, newResult(ValkeyMessage{typ: '*', values: []ValkeyMessage{{}, {typ: '+', string: "b"}}}, nil)}} }, } }, newRetryer(defaultRetryDelayFn), ) if err != nil { t.Fatalf("unexpected err %v", err) } if v, err := client.DoCache(context.Background(), client.B().Get().Key("a").Cache(), 100).ToString(); err != nil || v != "b" { t.Fatalf("unexpected resp %v %v", v, err) } }) t.Run("slot asking (cache multi 1)", func(t *testing.T) { var count int64 client, err := newClusterClient( &ClientOption{InitAddress: []string{":0"}}, func(dst string, opt *ClientOption) conn { return &mockConn{ DoFn: func(cmd Completed) ValkeyResult { return slotsMultiResp }, DoMultiCacheFn: func(multi ...CacheableTTL) *valkeyresults { return &valkeyresults{s: []ValkeyResult{newResult(ValkeyMessage{typ: '-', string: "ASK 0 :1"}, nil)}} }, DoMultiFn: func(multi ...Completed) *valkeyresults { if atomic.AddInt64(&count, 1) <= 3 { return &valkeyresults{s: []ValkeyResult{{}, {}, {}, {}, {}, newResult(ValkeyMessage{typ: '-', string: "ASK 0 :1"}, nil)}} } return &valkeyresults{s: []ValkeyResult{{}, {}, {}, {}, {}, newResult(ValkeyMessage{typ: '*', values: []ValkeyMessage{{}, {typ: '+', string: "b"}}}, nil)}} }, } }, newRetryer(defaultRetryDelayFn), ) if err != nil { t.Fatalf("unexpected err %v", err) } if v, err := client.DoMultiCache(context.Background(), CT(client.B().Get().Key("a").Cache(), 100))[0].ToString(); err != nil || v != "b" { t.Fatalf("unexpected resp %v %v", v, err) } }) t.Run("slot asking (cache multi 2)", func(t *testing.T) { var count int64 client, err := newClusterClient( &ClientOption{InitAddress: []string{":0"}}, func(dst string, opt *ClientOption) conn { return &mockConn{ DoFn: func(cmd Completed) ValkeyResult { return slotsMultiResp }, DoMultiCacheFn: func(multi ...CacheableTTL) *valkeyresults { return &valkeyresults{s: []ValkeyResult{newResult(ValkeyMessage{typ: '-', string: "ASK 0 :1"}, nil)}} }, DoMultiFn: func(multi ...Completed) *valkeyresults { if atomic.AddInt64(&count, 1) <= 3 { return &valkeyresults{s: []ValkeyResult{ {}, {}, {}, {}, {}, newResult(ValkeyMessage{typ: '-', string: "ASK 0 :1"}, nil), {}, {}, {}, {}, {}, newResult(ValkeyMessage{typ: '-', string: "ASK 0 :1"}, nil), }} } return &valkeyresults{s: []ValkeyResult{ {}, {}, {}, {}, {}, newResult(ValkeyMessage{typ: '*', values: []ValkeyMessage{{}, {}, {typ: '+', string: multi[4].Commands()[1]}}}, nil), {}, {}, {}, {}, {}, newResult(ValkeyMessage{typ: '*', values: []ValkeyMessage{{}, {}, {typ: '+', string: multi[10].Commands()[1]}}}, nil), }} }, } }, newRetryer(defaultRetryDelayFn), ) if err != nil { t.Fatalf("unexpected err %v", err) } for i, resp := range client.DoMultiCache(context.Background(), CT(client.B().Get().Key("a").Cache(), 100), CT(client.B().Get().Key("b").Cache(), 100)) { if v, err := resp.ToString(); err != nil { t.Fatalf("unexpected resp %v %v", v, err) } else if i == 0 && v != "a" { t.Fatalf("unexpected resp %v %v", v, err) } else if i == 1 && v != "b" { t.Fatalf("unexpected resp %v %v", v, err) } } }) t.Run("slot try again", func(t *testing.T) { var count int64 client, err := newClusterClient( &ClientOption{InitAddress: []string{":0"}}, func(dst string, opt *ClientOption) conn { return &mockConn{DoFn: func(cmd Completed) ValkeyResult { if strings.Join(cmd.Commands(), " ") == "CLUSTER SLOTS" { return slotsMultiResp } if atomic.AddInt64(&count, 1) <= 3 { return newResult(ValkeyMessage{typ: '-', string: "TRYAGAIN"}, nil) } return newResult(ValkeyMessage{typ: '+', string: "b"}, nil) }} }, newRetryer(defaultRetryDelayFn), ) if err != nil { t.Fatalf("unexpected err %v", err) } if v, err := client.Do(context.Background(), client.B().Get().Key("a").Build()).ToString(); err != nil || v != "b" { t.Fatalf("unexpected resp %v %v", v, err) } }) t.Run("slot try again DoMulti 1", func(t *testing.T) { var count int64 client, err := newClusterClient( &ClientOption{InitAddress: []string{":0"}}, func(dst string, opt *ClientOption) conn { return &mockConn{DoFn: func(cmd Completed) ValkeyResult { return slotsMultiResp }, DoMultiFn: func(multi ...Completed) *valkeyresults { if atomic.AddInt64(&count, 1) <= 3 { return &valkeyresults{s: []ValkeyResult{newResult(ValkeyMessage{typ: '-', string: "TRYAGAIN"}, nil)}} } ret := make([]ValkeyResult, len(multi)) ret[0] = newResult(ValkeyMessage{typ: '+', string: "b"}, nil) return &valkeyresults{s: ret} }} }, newRetryer(defaultRetryDelayFn), ) if err != nil { t.Fatalf("unexpected err %v", err) } if v, err := client.DoMulti(context.Background(), client.B().Get().Key("a").Build())[0].ToString(); err != nil || v != "b" { t.Fatalf("unexpected resp %v %v", v, err) } }) t.Run("slot try again DoMulti 2", func(t *testing.T) { var count int64 client, err := newClusterClient( &ClientOption{InitAddress: []string{":0"}}, func(dst string, opt *ClientOption) conn { return &mockConn{DoFn: func(cmd Completed) ValkeyResult { return slotsMultiResp }, DoMultiFn: func(multi ...Completed) *valkeyresults { if atomic.AddInt64(&count, 1) <= 3 { return &valkeyresults{s: []ValkeyResult{newResult(ValkeyMessage{typ: '-', string: "TRYAGAIN"}, nil)}} } ret := make([]ValkeyResult, len(multi)) ret[0] = newResult(ValkeyMessage{typ: '+', string: multi[0].Commands()[1]}, nil) return &valkeyresults{s: ret} }} }, newRetryer(defaultRetryDelayFn), ) if err != nil { t.Fatalf("unexpected err %v", err) } for i, resp := range client.DoMulti(context.Background(), client.B().Get().Key("a").Build(), client.B().Get().Key("b").Build()) { if v, err := resp.ToString(); err != nil { t.Fatalf("unexpected resp %v %v", v, err) } else if i == 0 && v != "a" { t.Fatalf("unexpected resp %v %v", v, err) } else if i == 1 && v != "b" { t.Fatalf("unexpected resp %v %v", v, err) } } }) t.Run("slot try again (cache)", func(t *testing.T) { var count int64 client, err := newClusterClient( &ClientOption{InitAddress: []string{":0"}}, func(dst string, opt *ClientOption) conn { return &mockConn{ DoFn: func(cmd Completed) ValkeyResult { return slotsMultiResp }, DoCacheFn: func(cmd Cacheable, ttl time.Duration) ValkeyResult { if atomic.AddInt64(&count, 1) <= 3 { return newResult(ValkeyMessage{typ: '-', string: "TRYAGAIN"}, nil) } return newResult(ValkeyMessage{typ: '+', string: "b"}, nil) }, } }, newRetryer(defaultRetryDelayFn), ) if err != nil { t.Fatalf("unexpected err %v", err) } if v, err := client.DoCache(context.Background(), client.B().Get().Key("a").Cache(), 100).ToString(); err != nil || v != "b" { t.Fatalf("unexpected resp %v %v", v, err) } }) t.Run("slot try again (cache multi 1)", func(t *testing.T) { var count int64 client, err := newClusterClient( &ClientOption{InitAddress: []string{":0"}}, func(dst string, opt *ClientOption) conn { return &mockConn{DoFn: func(cmd Completed) ValkeyResult { return slotsMultiResp }, DoMultiCacheFn: func(multi ...CacheableTTL) *valkeyresults { if atomic.AddInt64(&count, 1) <= 3 { return &valkeyresults{s: []ValkeyResult{newResult(ValkeyMessage{typ: '-', string: "TRYAGAIN"}, nil)}} } return &valkeyresults{s: []ValkeyResult{newResult(ValkeyMessage{typ: '+', string: "b"}, nil)}} }} }, newRetryer(defaultRetryDelayFn), ) if err != nil { t.Fatalf("unexpected err %v", err) } if v, err := client.DoMultiCache(context.Background(), CT(client.B().Get().Key("a").Cache(), 100))[0].ToString(); err != nil || v != "b" { t.Fatalf("unexpected resp %v %v", v, err) } }) t.Run("slot try again (cache multi 2)", func(t *testing.T) { var count int64 client, err := newClusterClient( &ClientOption{InitAddress: []string{":0"}}, func(dst string, opt *ClientOption) conn { return &mockConn{DoFn: func(cmd Completed) ValkeyResult { if strings.Join(cmd.Commands(), " ") == "CLUSTER SLOTS" { return slotsMultiResp } return shardsMultiResp }, DoMultiCacheFn: func(multi ...CacheableTTL) *valkeyresults { if atomic.AddInt64(&count, 1) <= 3 { return &valkeyresults{s: []ValkeyResult{newResult(ValkeyMessage{typ: '-', string: "TRYAGAIN"}, nil)}} } return &valkeyresults{s: []ValkeyResult{newResult(ValkeyMessage{typ: '+', string: multi[0].Cmd.Commands()[1]}, nil)}} }} }, newRetryer(defaultRetryDelayFn), ) if err != nil { t.Fatalf("unexpected err %v", err) } for i, resp := range client.DoMultiCache(context.Background(), CT(client.B().Get().Key("a").Cache(), 100), CT(client.B().Get().Key("b").Cache(), 100)) { if v, err := resp.ToString(); err != nil { t.Fatalf("unexpected resp %v %v", v, err) } else if i == 0 && v != "a" { t.Fatalf("unexpected resp %v %v", v, err) } else if i == 1 && v != "b" { t.Fatalf("unexpected resp %v %v", v, err) } } }) } func TestClusterClientRetry(t *testing.T) { defer ShouldNotLeaked(SetupLeakDetection()) SetupClientRetry(t, func(m *mockConn) Client { m.DoOverride = map[string]func(cmd Completed) ValkeyResult{ "CLUSTER SLOTS": func(cmd Completed) ValkeyResult { return slotsMultiResp }, } c, err := newClusterClient( &ClientOption{InitAddress: []string{":0"}}, func(dst string, opt *ClientOption) conn { return m }, newRetryer(defaultRetryDelayFn), ) if err != nil { t.Fatalf("unexpected err %v", err) } return c }) } func TestClusterClientReplicaOnly_PickReplica(t *testing.T) { defer ShouldNotLeaked(SetupLeakDetection()) m := &mockConn{ DoFn: func(cmd Completed) ValkeyResult { if strings.Join(cmd.Commands(), " ") == "CLUSTER SLOTS" { return slotsMultiResp } return ValkeyResult{} }, } client, err := newClusterClient( &ClientOption{InitAddress: []string{"127.0.0.1:0"}, ReplicaOnly: true}, func(dst string, opt *ClientOption) conn { copiedM := *m return &copiedM }, newRetryer(defaultRetryDelayFn), ) if err != nil { t.Fatalf("unexpected err %v", err) } t.Run("replicas should be picked", func(t *testing.T) { if client.pslots[0] != client.conns["127.0.1.1:1"].conn { t.Fatalf("unexpected replica node assigned to slot 0") } if client.pslots[8192] != client.conns["127.0.1.1:1"].conn { t.Fatalf("unexpected replica node assigned to slot 8192") } if client.pslots[8193] != client.conns["127.0.3.1:1"].conn { t.Fatalf("unexpected replica node assigned to slot 8193") } if client.pslots[16383] != client.conns["127.0.3.1:1"].conn { t.Fatalf("unexpected replica node assigned to slot 16383") } }) } func TestClusterClientReplicaOnly_PickMasterIfNoReplica(t *testing.T) { defer ShouldNotLeaked(SetupLeakDetection()) t.Run("replicas should be picked", func(t *testing.T) { m := &mockConn{ DoFn: func(cmd Completed) ValkeyResult { if strings.Join(cmd.Commands(), " ") == "CLUSTER SLOTS" { return slotsMultiResp } return ValkeyResult{} }, } client, err := newClusterClient( &ClientOption{InitAddress: []string{"127.0.0.1:0"}, ReplicaOnly: true}, func(dst string, opt *ClientOption) conn { copiedM := *m return &copiedM }, newRetryer(defaultRetryDelayFn), ) if err != nil { t.Fatalf("unexpected err %v", err) } if client.pslots[0] != client.conns["127.0.1.1:1"].conn { t.Fatalf("unexpected replica node assigned to slot 0") } if client.pslots[8192] != client.conns["127.0.1.1:1"].conn { t.Fatalf("unexpected replica node assigned to slot 8192") } if client.pslots[8193] != client.conns["127.0.3.1:1"].conn { t.Fatalf("unexpected replica node assigned to slot 8193") } if client.pslots[16383] != client.conns["127.0.3.1:1"].conn { t.Fatalf("unexpected replica node assigned to slot 16383") } }) t.Run("distributed to replicas", func(t *testing.T) { m := &mockConn{ DoFn: func(cmd Completed) ValkeyResult { if strings.Join(cmd.Commands(), " ") == "CLUSTER SLOTS" { return slotsMultiRespWithMultiReplicas } return ValkeyResult{} }, } client, err := newClusterClient( &ClientOption{InitAddress: []string{"127.0.0.1:0"}, ReplicaOnly: true}, func(dst string, opt *ClientOption) conn { copiedM := *m return &copiedM }, newRetryer(defaultRetryDelayFn), ) if err != nil { t.Fatalf("unexpected err %v", err) } for slot := 0; slot < 8193; slot++ { if client.pslots[slot] == client.conns["127.0.0.2:1"].conn { continue } if client.pslots[slot] == client.conns["127.0.0.3:2"].conn { continue } if client.pslots[slot] == client.conns["127.0.0.4:3"].conn { continue } t.Fatalf("unexpected replica node assigned to slot %d", slot) } for slot := 8193; slot < 16384; slot++ { if client.pslots[slot] == client.conns["127.0.1.2:1"].conn { continue } if client.pslots[slot] == client.conns["127.0.1.3:2"].conn { continue } if client.pslots[slot] == client.conns["127.0.1.4:3"].conn { continue } t.Fatalf("unexpected replica node assigned to slot %d", slot) } }) } func TestClusterShardsParsing(t *testing.T) { defer ShouldNotLeaked(SetupLeakDetection()) t.Run("master selection", func(t *testing.T) { result := parseShards(shardsRespTls.val, "127.0.0.1:5", true) if len(result) != 1 { t.Fatalf("unexpected result %v", result) } if _, ok := result["127.0.1.1:1"]; !ok { t.Fatal("unexpected master node") } }) t.Run("port selection", func(t *testing.T) { result := parseShards(shardsRespTls.val, "127.0.0.1:5", true) if len(result) != 1 { t.Fatalf("unexpected result %v", result) } for _, val := range result { _nodes := val.nodes sort.Slice(_nodes, func(i, j int) bool { return _nodes[i].Addr < _nodes[j].Addr }) if len(_nodes) != 3 || _nodes[0].Addr != "127.0.1.1:1" || _nodes[1].Addr != "127.0.2.1:2" || _nodes[2].Addr != "127.0.3.1:3" { t.Fatalf("unexpected nodes %v", _nodes) } } result = parseShards(shardsRespTls.val, "127.0.0.1:5", false) if len(result) != 1 { t.Fatalf("unexpected result %v", result) } for _, val := range result { _nodes := val.nodes sort.Slice(_nodes, func(i, j int) bool { return _nodes[i].Addr < _nodes[j].Addr }) if len(_nodes) != 3 || _nodes[0].Addr != "127.0.1.1:0" || _nodes[1].Addr != "127.0.2.1:0" || _nodes[2].Addr != "127.0.3.1:3" { t.Fatalf("unexpected nodes %v", _nodes) } } }) t.Run("master position", func(t *testing.T) { result := parseShards(shardsRespTls.val, "127.0.0.1:5", true) if len(result) != 1 { t.Fatalf("unexpected result %v", result) } for master, group := range result { if len(group.nodes) == 0 || group.nodes[0].Addr != master { t.Fatalf("unexpected first node %v", group) } } }) } // https://github.com/redis/rueidis/issues/543 func TestConnectToNonAvailableCluster(t *testing.T) { defer ShouldNotLeaked(SetupLeakDetection()) var wg sync.WaitGroup for i := 0; i < 4; i++ { wg.Add(1) go func() { defer wg.Done() for i := 0; i < 100; i++ { _, err := NewClient(ClientOption{ InitAddress: []string{"127.0.0.1:3000", "127.0.0.1:3001", "127.0.0.1:3002"}, }) if err == nil { t.Errorf("expected connect error") } } }() } wg.Wait() } func TestClusterTopologyRefreshment(t *testing.T) { defer ShouldNotLeaked(SetupLeakDetection()) t.Run("no refreshment", func(t *testing.T) { var callCount int64 cc, err := newClusterClient( &ClientOption{ InitAddress: []string{"127.0.0.1:0"}, ClusterOption: ClusterOption{ ShardsRefreshInterval: 0, }, }, func(dst string, opt *ClientOption) conn { return &mockConn{ DoFn: func(cmd Completed) ValkeyResult { // initial call if atomic.CompareAndSwapInt64(&callCount, 0, 1) { return singleSlotResp } t.Fatalf("unexpected call") return singleSlotResp }, } }, newRetryer(defaultRetryDelayFn), ) if err != nil { t.Fatalf("unexpected err %v", err) } defer cc.Close() time.Sleep(3 * time.Second) // verify that no refreshment is called if atomic.LoadInt64(&callCount) != 1 { t.Fatalf("unexpected call count %d", callCount) } }) t.Run("nothing changed", func(t *testing.T) { var callCount int64 refreshWaitCh := make(chan struct{}) cli, err := newClusterClient( &ClientOption{ InitAddress: []string{"127.0.0.1:0"}, ClusterOption: ClusterOption{ ShardsRefreshInterval: time.Second, }, }, func(dst string, opt *ClientOption) conn { return &mockConn{ DoFn: func(cmd Completed) ValkeyResult { if atomic.AddInt64(&callCount, 1) >= 3 { defer func() { recover() }() defer close(refreshWaitCh) return singleSlotResp } return singleSlotResp }, } }, newRetryer(defaultRetryDelayFn), ) if err != nil { t.Fatalf("unexpected err %v", err) } select { case <-refreshWaitCh: cli.Close() cli.mu.Lock() conns := cli.conns cli.mu.Unlock() if len(conns) != 1 { t.Fatalf("unexpected conns %v", conns) } if _, ok := conns["127.0.0.1:0"]; !ok { t.Fatalf("unexpected conns %v", conns) } } }) t.Run("replicas are changed", func(t *testing.T) { var callCount int64 refreshWaitCh := make(chan struct{}) cli, err := newClusterClient( &ClientOption{ InitAddress: []string{"127.0.0.1:0"}, ClusterOption: ClusterOption{ ShardsRefreshInterval: time.Second, }, }, func(dst string, opt *ClientOption) conn { return &mockConn{ DoFn: func(cmd Completed) ValkeyResult { if c := atomic.AddInt64(&callCount, 1); c >= 6 { defer func() { recover() }() defer close(refreshWaitCh) return slotsResp } else if c >= 3 { return slotsResp } return singleSlotResp }, } }, newRetryer(defaultRetryDelayFn), ) if err != nil { t.Fatalf("unexpected err %v", err) } select { case <-refreshWaitCh: cli.Close() cli.mu.Lock() conns := cli.conns cli.mu.Unlock() if len(conns) != 2 { t.Fatalf("unexpected conns %v", conns) } if _, ok := conns["127.0.0.1:0"]; !ok { t.Fatalf("unexpected conns %v", conns) } if _, ok := conns["127.0.1.1:1"]; !ok { t.Fatalf("unexpected conns %v", conns) } } }) t.Run("shards are changed", func(t *testing.T) { var callCount int64 refreshWaitCh := make(chan struct{}) cli, err := newClusterClient( &ClientOption{ InitAddress: []string{"127.0.0.1:0"}, ClusterOption: ClusterOption{ ShardsRefreshInterval: time.Second, }, }, func(dst string, opt *ClientOption) conn { return &mockConn{ DoFn: func(cmd Completed) ValkeyResult { if c := atomic.AddInt64(&callCount, 1); c >= 6 { defer func() { recover() }() defer close(refreshWaitCh) return slotsMultiRespWithoutReplicas } else if c >= 3 { return slotsMultiRespWithoutReplicas } return singleSlotResp }, } }, newRetryer(defaultRetryDelayFn), ) if err != nil { t.Fatalf("unexpected err %v", err) } select { case <-refreshWaitCh: cli.Close() cli.mu.Lock() conns := cli.conns cli.mu.Unlock() if len(conns) != 2 { t.Fatalf("unexpected conns %v", conns) } if _, ok := conns["127.0.0.1:0"]; !ok { t.Fatalf("unexpected conns %v", conns) } if _, ok := conns["127.0.1.1:0"]; !ok { t.Fatalf("unexpected conns %v", conns) } } }) } func TestClusterClientLoadingRetry(t *testing.T) { defer ShouldNotLeaked(SetupLeakDetection()) setup := func() (*clusterClient, *mockConn) { m := &mockConn{ DoFn: func(cmd Completed) ValkeyResult { if strings.Join(cmd.Commands(), " ") == "CLUSTER SLOTS" { return slotsMultiResp } return ValkeyResult{} }, } client, err := newClusterClient( &ClientOption{InitAddress: []string{":0"}}, func(dst string, opt *ClientOption) conn { return m }, newRetryer(defaultRetryDelayFn), ) if err != nil { t.Fatalf("unexpected err %v", err) } return client, m } t.Run("Do Retry on Loading", func(t *testing.T) { client, m := setup() attempts := 0 m.DoFn = func(cmd Completed) ValkeyResult { if strings.Join(cmd.Commands(), " ") == "CLUSTER SLOTS" { return slotsMultiResp } attempts++ if attempts == 1 { return newResult(ValkeyMessage{typ: '-', string: "LOADING Valkey is loading the dataset in memory"}, nil) } return newResult(ValkeyMessage{typ: '+', string: "OK"}, nil) } if v, err := client.Do(context.Background(), client.B().Get().Key("test").Build()).ToString(); err != nil || v != "OK" { t.Fatalf("unexpected response %v %v", v, err) } if attempts != 2 { t.Fatalf("expected 2 attempts, got %v", attempts) } }) t.Run("Do not retry on non-loading errors", func(t *testing.T) { client, m := setup() attempts := 0 m.DoFn = func(cmd Completed) ValkeyResult { if strings.Join(cmd.Commands(), " ") == "CLUSTER SLOTS" { return slotsMultiResp } attempts++ if attempts == 1 { return newResult(ValkeyMessage{typ: '-', string: "ERR some other error"}, nil) } return newResult(ValkeyMessage{typ: '+', string: "OK"}, nil) } if err := client.Do(context.Background(), client.B().Get().Key("test").Build()).Error(); err == nil { t.Fatal("expected error but got nil") } if attempts != 1 { t.Fatalf("unexpected attempts %v, expected no retry", attempts) } }) t.Run("DoMulti Retry on Loading", func(t *testing.T) { client, m := setup() attempts := 0 m.DoMultiFn = func(multi ...Completed) *valkeyresults { attempts++ if attempts == 1 { return &valkeyresults{s: []ValkeyResult{newResult(ValkeyMessage{typ: '-', string: "LOADING Valkey is loading the dataset in memory"}, nil)}} } return &valkeyresults{s: []ValkeyResult{newResult(ValkeyMessage{typ: '+', string: "OK"}, nil)}} } cmd := client.B().Get().Key("test").Build() resps := client.DoMulti(context.Background(), cmd) if len(resps) != 1 { t.Fatalf("unexpected response length %v", len(resps)) } if v, err := resps[0].ToString(); err != nil || v != "OK" { t.Fatalf("unexpected response %v %v", v, err) } }) t.Run("DoCache Retry on Loading", func(t *testing.T) { client, m := setup() attempts := 0 m.DoCacheFn = func(cmd Cacheable, ttl time.Duration) ValkeyResult { attempts++ if attempts == 1 { return newResult(ValkeyMessage{typ: '-', string: "LOADING Valkey is loading the dataset in memory"}, nil) } return newResult(ValkeyMessage{typ: '+', string: "OK"}, nil) } cmd := client.B().Get().Key("test").Cache() if v, err := client.DoCache(context.Background(), cmd, time.Minute).ToString(); err != nil || v != "OK" { t.Fatalf("unexpected response %v %v", v, err) } }) t.Run("DoMultiCache Retry on Loading", func(t *testing.T) { client, m := setup() attempts := 0 m.DoMultiCacheFn = func(multi ...CacheableTTL) *valkeyresults { attempts++ if attempts == 1 { return &valkeyresults{s: []ValkeyResult{newResult(ValkeyMessage{typ: '-', string: "LOADING Valkey is loading the dataset in memory"}, nil)}} } return &valkeyresults{s: []ValkeyResult{newResult(ValkeyMessage{typ: '+', string: "OK"}, nil)}} } cmd := client.B().Get().Key("test").Cache() resps := client.DoMultiCache(context.Background(), CT(cmd, time.Minute)) if len(resps) != 1 { t.Fatalf("unexpected response length %v", len(resps)) } if v, err := resps[0].ToString(); err != nil || v != "OK" { t.Fatalf("unexpected response %v %v", v, err) } }) t.Run("Dedicated Do Retry on Loading", func(t *testing.T) { client, m := setup() attempts := 0 m.DoFn = func(cmd Completed) ValkeyResult { if strings.Join(cmd.Commands(), " ") == "CLUSTER SLOTS" { return slotsMultiResp } attempts++ if attempts == 1 { return newResult(ValkeyMessage{typ: '-', string: "LOADING Valkey is loading the dataset in memory"}, nil) } return newResult(ValkeyMessage{typ: '+', string: "OK"}, nil) } m.AcquireFn = func() wire { return &mockWire{DoFn: m.DoFn} } err := client.Dedicated(func(c DedicatedClient) error { if v, err := c.Do(context.Background(), c.B().Get().Key("test").Build()).ToString(); err != nil || v != "OK" { t.Fatalf("unexpected response %v %v", v, err) } return nil }) if err != nil { t.Fatalf("unexpected err %v", err) } }) t.Run("Dedicated DoMulti Retry on Loading", func(t *testing.T) { client, m := setup() attempts := 0 m.DoMultiFn = func(multi ...Completed) *valkeyresults { attempts++ if attempts == 1 { return &valkeyresults{s: []ValkeyResult{newResult(ValkeyMessage{typ: '-', string: "LOADING Valkey is loading the dataset in memory"}, nil)}} } return &valkeyresults{s: []ValkeyResult{newResult(ValkeyMessage{typ: '+', string: "OK"}, nil)}} } m.AcquireFn = func() wire { return &mockWire{DoMultiFn: m.DoMultiFn} } err := client.Dedicated(func(c DedicatedClient) error { resps := c.DoMulti(context.Background(), c.B().Get().Key("test").Build()) if len(resps) != 1 { t.Fatalf("unexpected response length %v", len(resps)) } if v, err := resps[0].ToString(); err != nil || v != "OK" { t.Fatalf("unexpected response %v %v", v, err) } return nil }) if err != nil { t.Fatalf("unexpected err %v", err) } }) } func TestClusterClientMovedRetry(t *testing.T) { defer ShouldNotLeaked(SetupLeakDetection()) setup := func() (*clusterClient, *mockConn) { m := &mockConn{ DoFn: func(cmd Completed) ValkeyResult { if strings.Join(cmd.Commands(), " ") == "CLUSTER SLOTS" { return slotsMultiResp } return ValkeyResult{} }, } client, err := newClusterClient( &ClientOption{InitAddress: []string{":0"}}, func(dst string, opt *ClientOption) conn { return m }, newRetryer(defaultRetryDelayFn), ) if err != nil { t.Fatalf("unexpected err %v", err) } return client, m } t.Run("DoMulti Retry on MOVED", func(t *testing.T) { client, m := setup() attempts := 0 m.DoMultiFn = func(multi ...Completed) *valkeyresults { attempts++ if attempts == 1 { return &valkeyresults{s: []ValkeyResult{newResult(ValkeyMessage{typ: '-', string: "MOVED 0 127.0.0.1"}, nil)}} } return &valkeyresults{s: []ValkeyResult{newResult(ValkeyMessage{typ: '+', string: "OK"}, nil)}} } cmd := client.B().Set().Key("test").Value(`test`).Build() resps := client.DoMulti(context.Background(), cmd) if len(resps) != 1 { t.Fatalf("unexpected response length %v", len(resps)) } if v, err := resps[0].ToString(); err != nil || v != "OK" { t.Fatalf("unexpected response %v %v", v, err) } }) t.Run("DoMulti Retry on ASK", func(t *testing.T) { client, m := setup() attempts := 0 m.DoMultiFn = func(multi ...Completed) *valkeyresults { attempts++ if attempts == 1 { return &valkeyresults{s: []ValkeyResult{newResult(ValkeyMessage{typ: '-', string: "ASK 0 127.0.0.1"}, nil)}} } return &valkeyresults{s: []ValkeyResult{newResult(ValkeyMessage{typ: '+', string: "OK"}, nil), newResult(ValkeyMessage{typ: '+', string: "OK"}, nil)}} } cmd := client.B().Set().Key("test").Value(`test`).Build() resps := client.DoMulti(context.Background(), cmd) if len(resps) != 1 { t.Fatalf("unexpected response length %v", len(resps)) } if v, err := resps[0].ToString(); err != nil || v != "OK" { t.Fatalf("unexpected response %v %v", v, err) } }) } func TestClusterClientCacheASKRetry(t *testing.T) { defer ShouldNotLeaked(SetupLeakDetection()) setup := func() (*clusterClient, *mockConn) { m := &mockConn{ DoFn: func(cmd Completed) ValkeyResult { if strings.Join(cmd.Commands(), " ") == "CLUSTER SLOTS" { return slotsMultiResp } return ValkeyResult{} }, } client, err := newClusterClient( &ClientOption{InitAddress: []string{":0"}}, func(dst string, opt *ClientOption) conn { return m }, newRetryer(defaultRetryDelayFn), ) if err != nil { t.Fatalf("unexpected err %v", err) } return client, m } t.Run("DoCache Retry on ASK", func(t *testing.T) { client, m := setup() attempts := 0 m.DoCacheFn = func(cmd Cacheable, ttl time.Duration) ValkeyResult { return newResult(ValkeyMessage{typ: '-', string: "ASK 0 :0"}, nil) } m.DoMultiFn = func(multi ...Completed) *valkeyresults { attempts++ if attempts == 1 { return &valkeyresults{s: []ValkeyResult{{}, {}, {}, {}, newResult(ValkeyMessage{typ: '-', string: "ASK 0 :0"}, nil), newResult(ValkeyMessage{typ: '_'}, nil)}} } return &valkeyresults{s: []ValkeyResult{{}, {}, {}, {}, {}, newResult(ValkeyMessage{typ: '*', values: []ValkeyMessage{{}, {}, {}, {}, {}, {typ: '+', string: "OK"}}}, nil)}} } resp := client.DoCache(context.Background(), client.B().Get().Key("a1").Cache(), 10*time.Second) if v, err := resp.ToString(); err != nil || v != "OK" { t.Fatalf("unexpected response %v %v", v, err) } if attempts != 2 { t.Fatalf("expected 2 attempts, got %v", attempts) } }) t.Run("DoMultiCache Retry on ASK", func(t *testing.T) { client, m := setup() attempts := 0 m.DoMultiCacheFn = func(multi ...CacheableTTL) *valkeyresults { return &valkeyresults{s: []ValkeyResult{newResult(ValkeyMessage{typ: '-', string: "ASK 0 :0"}, nil)}} } m.DoMultiFn = func(multi ...Completed) *valkeyresults { attempts++ if attempts == 1 { return &valkeyresults{s: []ValkeyResult{{}, {}, {}, {}, newResult(ValkeyMessage{typ: '-', string: "ASK 0 :0"}, nil), newResult(ValkeyMessage{typ: '_'}, nil)}} } return &valkeyresults{s: []ValkeyResult{{}, {}, {}, {}, {}, newResult(ValkeyMessage{typ: '*', values: []ValkeyMessage{{}, {}, {}, {}, {}, ValkeyMessage{typ: '+', string: "OK"}}}, nil)}} } resps := client.DoMultiCache(context.Background(), CT(client.B().Get().Key("a1").Cache(), 10*time.Second)) if v, err := resps[0].ToString(); err != nil || v != "OK" { t.Fatalf("unexpected response %v %v", v, err) } if attempts != 2 { t.Fatalf("expected 2 attempts, got %v", attempts) } }) } //gocyclo:ignore func TestClusterClient_SendReadOperationToReplicaNodeWriteOperationToPrimaryNode(t *testing.T) { defer ShouldNotLeaked(SetupLeakDetection()) primaryNodeConn := &mockConn{ DoOverride: map[string]func(cmd Completed) ValkeyResult{ "CLUSTER SLOTS": func(cmd Completed) ValkeyResult { return slotsMultiResp }, "INFO": func(cmd Completed) ValkeyResult { return newResult(ValkeyMessage{typ: '+', string: "INFO"}, nil) }, "SET Do V": func(cmd Completed) ValkeyResult { return newResult(ValkeyMessage{typ: '+', string: "SET Do V"}, nil) }, "SET K2{a} V2{a}": func(cmd Completed) ValkeyResult { return newResult(ValkeyMessage{typ: '+', string: "SET K2{a} V2{a}"}, nil) }, }, DoMultiFn: func(multi ...Completed) *valkeyresults { resps := make([]ValkeyResult, len(multi)) for i, cmd := range multi { if strings.HasPrefix(strings.Join(cmd.Commands(), " "), "SET K1") { resps[i] = newResult(ValkeyMessage{typ: '+', string: strings.Join(cmd.Commands(), " ")}, nil) continue } if strings.HasPrefix(strings.Join(cmd.Commands(), " "), "SET K2") { resps[i] = newResult(ValkeyMessage{typ: '+', string: strings.Join(cmd.Commands(), " ")}, nil) continue } if strings.HasPrefix(strings.Join(cmd.Commands(), " "), "MULTI") { resps[i] = newResult(ValkeyMessage{typ: '+', string: "MULTI"}, nil) continue } if strings.HasPrefix(strings.Join(cmd.Commands(), " "), "EXEC") { resps[i] = newResult(ValkeyMessage{typ: '+', string: "EXEC"}, nil) continue } return &valkeyresults{ s: []ValkeyResult{}, } } return &valkeyresults{s: resps} }, } replicaNodeConn := &mockConn{ DoOverride: map[string]func(cmd Completed) ValkeyResult{ "GET Do": func(cmd Completed) ValkeyResult { return newResult(ValkeyMessage{typ: '+', string: "GET Do"}, nil) }, "GET K1{a}": func(cmd Completed) ValkeyResult { return newResult(ValkeyMessage{typ: '+', string: "GET K1{a}"}, nil) }, "GET K2{a}": func(cmd Completed) ValkeyResult { return newResult(ValkeyMessage{typ: '+', string: "GET K2{a}"}, nil) }, }, DoMultiFn: func(multi ...Completed) *valkeyresults { resps := make([]ValkeyResult, len(multi)) for i, cmd := range multi { if strings.HasPrefix(strings.Join(cmd.Commands(), " "), "GET K1") { resps[i] = newResult(ValkeyMessage{typ: '+', string: strings.Join(cmd.Commands(), " ")}, nil) continue } return &valkeyresults{ s: []ValkeyResult{}, } } return &valkeyresults{s: resps} }, DoCacheOverride: map[string]func(cmd Cacheable, ttl time.Duration) ValkeyResult{ "GET DoCache": func(cmd Cacheable, ttl time.Duration) ValkeyResult { return newResult(ValkeyMessage{typ: '+', string: "GET DoCache"}, nil) }, "GET K1{a}": func(cmd Cacheable, ttl time.Duration) ValkeyResult { return newResult(ValkeyMessage{typ: '+', string: "GET K1{a}"}, nil) }, "GET K2{a}": func(cmd Cacheable, ttl time.Duration) ValkeyResult { return newResult(ValkeyMessage{typ: '+', string: "GET K2{a}"}, nil) }, }, DoMultiCacheFn: func(multi ...CacheableTTL) *valkeyresults { resps := make([]ValkeyResult, len(multi)) for i, cmd := range multi { if strings.HasPrefix(strings.Join(cmd.Cmd.Commands(), " "), "GET K1") { resps[i] = newResult(ValkeyMessage{typ: '+', string: strings.Join(cmd.Cmd.Commands(), " ")}, nil) continue } return &valkeyresults{ s: []ValkeyResult{}, } } return &valkeyresults{s: resps} }, } client, err := newClusterClient( &ClientOption{ InitAddress: []string{"127.0.0.1:0"}, SendToReplicas: func(cmd Completed) bool { return cmd.IsReadOnly() }, ReplicaSelector: func(slot uint16, replicas []ReplicaInfo) int { return 0 }, }, func(dst string, opt *ClientOption) conn { if dst == "127.0.0.1:0" || dst == "127.0.2.1:0" { // primary node return primaryNodeConn } else { // replica node return replicaNodeConn } }, newRetryer(defaultRetryDelayFn), ) if err != nil { t.Fatalf("unexpected err %v", err) } t.Run("Do read operation", func(t *testing.T) { c := client.B().Get().Key("Do").Build() if v, err := client.Do(context.Background(), c).ToString(); err != nil || v != "GET Do" { t.Fatalf("unexpected response %v %v", v, err) } }) t.Run("Do write operation", func(t *testing.T) { c := client.B().Set().Key("Do").Value("V").Build() if v, err := client.Do(context.Background(), c).ToString(); err != nil || v != "SET Do V" { t.Fatalf("unexpected response %v %v", v, err) } }) t.Run("DoMulti Single Slot All Read Operations", func(t *testing.T) { c1 := client.B().Get().Key("K1{a}").Build() c2 := client.B().Get().Key("K2{a}").Build() resps := client.DoMulti(context.Background(), c1, c2) if v, err := resps[0].ToString(); err != nil || v != "GET K1{a}" { t.Fatalf("unexpected response %v %v", v, err) } if v, err := resps[1].ToString(); err != nil || v != "GET K2{a}" { t.Fatalf("unexpected response %v %v", v, err) } }) t.Run("DoMulti Single Slot Read Operation And Write Operation", func(t *testing.T) { c1 := client.B().Get().Key("K1{a}").Build() c2 := client.B().Set().Key("K2{a}").Value("V2{a}").Build() resps := client.DoMulti(context.Background(), c1, c2) if v, err := resps[0].ToString(); err != nil || v != "GET K1{a}" { t.Fatalf("unexpected response %v %v", v, err) } if v, err := resps[1].ToString(); err != nil || v != "SET K2{a} V2{a}" { t.Fatalf("unexpected response %v %v", v, err) } }) t.Run("DoMulti Single Slot Operations + Init Slot", func(t *testing.T) { c1 := client.B().Multi().Build() c2 := client.B().Set().Key("K1{a}").Value("V1{a}").Build() c3 := client.B().Set().Key("K2{a}").Value("V2{a}").Build() c4 := client.B().Exec().Build() resps := client.DoMulti(context.Background(), c1, c2, c3, c4) if v, err := resps[0].ToString(); err != nil || v != "MULTI" { t.Fatalf("unexpected response %v %v", v, err) } if v, err := resps[1].ToString(); err != nil || v != "SET K1{a} V1{a}" { t.Fatalf("unexpected response %v %v", v, err) } if v, err := resps[2].ToString(); err != nil || v != "SET K2{a} V2{a}" { t.Fatalf("unexpected response %v %v", v, err) } if v, err := resps[3].ToString(); err != nil || v != "EXEC" { t.Fatalf("unexpected response %v %v", v, err) } }) t.Run("DoMulti Cross Slot + Init Slot", func(t *testing.T) { defer func() { if err := recover(); err != panicMixCxSlot { t.Errorf("DoMulti should panic if Cross Slot + Init Slot") } }() c1 := client.B().Get().Key("K1{a}").Build() c2 := client.B().Get().Key("K1{b}").Build() c3 := client.B().Info().Build() client.DoMulti(context.Background(), c1, c2, c3) }) t.Run("DoMulti Multi Slot All Read Operations", func(t *testing.T) { multi := make([]Completed, 500) for i := 0; i < len(multi); i++ { multi[i] = client.B().Get().Key(fmt.Sprintf("K1{%d}", i)).Build() } resps := client.DoMulti(context.Background(), multi...) for i := 0; i < len(multi); i++ { if v, err := resps[i].ToString(); err != nil || v != fmt.Sprintf("GET K1{%d}", i) { t.Fatalf("unexpected response %v %v", v, err) } } }) t.Run("DoMulti Multi Slot Read & Write Operations", func(t *testing.T) { multi := make([]Completed, 500) for i := 0; i < len(multi); i++ { if i%2 == 0 { multi[i] = client.B().Get().Key(fmt.Sprintf("K1{%d}", i)).Build() } else { multi[i] = client.B().Set().Key(fmt.Sprintf("K2{%d}", i)).Value(fmt.Sprintf("V2{%d}", i)).Build() } } resps := client.DoMulti(context.Background(), multi...) for i := 0; i < len(multi); i++ { if i%2 == 0 { if v, err := resps[i].ToString(); err != nil || v != fmt.Sprintf("GET K1{%d}", i) { t.Fatalf("unexpected response %v %v", v, err) } } else { if v, err := resps[i].ToString(); err != nil || v != fmt.Sprintf("SET K2{%d} V2{%d}", i, i) { t.Fatalf("unexpected response %v %v", v, err) } } } }) t.Run("DoCache Operation", func(t *testing.T) { c := client.B().Get().Key("DoCache").Cache() if v, err := client.DoCache(context.Background(), c, 100).ToString(); err != nil || v != "GET DoCache" { t.Fatalf("unexpected response %v %v", v, err) } }) t.Run("DoMultiCache Single Slot", func(t *testing.T) { c1 := client.B().Get().Key("K1{a}").Cache() c2 := client.B().Get().Key("K2{a}").Cache() resps := client.DoMultiCache(context.Background(), CT(c1, time.Second), CT(c2, time.Second)) if v, err := resps[0].ToString(); err != nil || v != "GET K1{a}" { t.Fatalf("unexpected response %v %v", v, err) } if v, err := resps[1].ToString(); err != nil || v != "GET K2{a}" { t.Fatalf("unexpected response %v %v", v, err) } }) t.Run("DoMultiCache Multi Slot", func(t *testing.T) { multi := make([]CacheableTTL, 500) for i := 0; i < len(multi); i++ { multi[i] = CT(client.B().Get().Key(fmt.Sprintf("K1{%d}", i)).Cache(), time.Second) } resps := client.DoMultiCache(context.Background(), multi...) for i := 0; i < len(multi); i++ { if v, err := resps[i].ToString(); err != nil || v != fmt.Sprintf("GET K1{%d}", i) { t.Fatalf("unexpected response %v %v", v, err) } } }) t.Run("Receive", func(t *testing.T) { c := client.B().Subscribe().Channel("ch").Build() hdl := func(message PubSubMessage) {} primaryNodeConn.ReceiveFn = func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error { if !reflect.DeepEqual(subscribe.Commands(), c.Commands()) { t.Fatalf("unexpected command %v", subscribe) } return nil } replicaNodeConn.ReceiveFn = func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error { if !reflect.DeepEqual(subscribe.Commands(), c.Commands()) { t.Fatalf("unexpected command %v", subscribe) } return nil } if err := client.Receive(context.Background(), c, hdl); err != nil { t.Fatalf("unexpected response %v", err) } }) t.Run("Receive Valkey Err", func(t *testing.T) { c := client.B().Ssubscribe().Channel("ch").Build() e := &ValkeyError{} primaryNodeConn.ReceiveFn = func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error { return e } replicaNodeConn.ReceiveFn = func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error { return e } if err := client.Receive(context.Background(), c, func(message PubSubMessage) {}); err != e { t.Fatalf("unexpected response %v", err) } }) t.Run("Dedicated Cross Slot Err", func(t *testing.T) { defer func() { if err := recover(); err != panicMsgCxSlot { t.Errorf("Dedicated should panic if cross slots is used") } }() primaryNodeConn.AcquireFn = func() wire { return &mockWire{} } client.Dedicated(func(c DedicatedClient) error { c.Do(context.Background(), c.B().Get().Key("a").Build()).Error() return c.Do(context.Background(), c.B().Get().Key("b").Build()).Error() }) }) t.Run("Dedicated Cross Slot Err Multi", func(t *testing.T) { defer func() { if err := recover(); err != panicMsgCxSlot { t.Errorf("Dedicated should panic if cross slots is used") } }() primaryNodeConn.AcquireFn = func() wire { return &mockWire{ DoMultiFn: func(multi ...Completed) *valkeyresults { return &valkeyresults{s: []ValkeyResult{ newResult(ValkeyMessage{typ: '+', string: "OK"}, nil), newResult(ValkeyMessage{typ: '+', string: "OK"}, nil), newResult(ValkeyMessage{typ: '*', values: []ValkeyMessage{{typ: '+', string: "a"}}}, nil), }} }, } } client.Dedicated(func(c DedicatedClient) (err error) { c.DoMulti( context.Background(), c.B().Multi().Build(), c.B().Get().Key("a").Build(), c.B().Exec().Build(), ) c.DoMulti( context.Background(), c.B().Multi().Build(), c.B().Get().Key("b").Build(), c.B().Exec().Build(), ) return nil }) }) t.Run("Dedicated Multi Cross Slot Err", func(t *testing.T) { primaryNodeConn.AcquireFn = func() wire { return &mockWire{} } err := client.Dedicated(func(c DedicatedClient) (err error) { defer func() { err = errors.New(recover().(string)) }() c.DoMulti( context.Background(), c.B().Get().Key("a").Build(), c.B().Get().Key("b").Build(), ) return nil }) if err == nil || err.Error() != panicMsgCxSlot { t.Errorf("Multi should panic if cross slots is used") } }) t.Run("Dedicated Receive Valkey Err", func(t *testing.T) { e := &ValkeyError{} w := &mockWire{ ReceiveFn: func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error { return e }, } primaryNodeConn.AcquireFn = func() wire { return w } replicaNodeConn.AcquireFn = func() wire { return w } // Subscribe can work on replicas if err := client.Dedicated(func(c DedicatedClient) error { return c.Receive(context.Background(), c.B().Subscribe().Channel("a").Build(), func(msg PubSubMessage) {}) }); err != e { t.Fatalf("unexpected err %v", err) } }) t.Run("Dedicated", func(t *testing.T) { closed := false w := &mockWire{ DoFn: func(cmd Completed) ValkeyResult { return newResult(ValkeyMessage{typ: '+', string: "Delegate"}, nil) }, DoMultiFn: func(cmd ...Completed) *valkeyresults { if len(cmd) == 4 { return &valkeyresults{s: []ValkeyResult{ newResult(ValkeyMessage{typ: '+', string: "OK"}, nil), newResult(ValkeyMessage{typ: '+', string: "OK"}, nil), newResult(ValkeyMessage{typ: '+', string: "OK"}, nil), newResult(ValkeyMessage{typ: '*', values: []ValkeyMessage{ {typ: '+', string: "Delegate0"}, {typ: '+', string: "Delegate1"}, }}, nil), }} } return &valkeyresults{s: []ValkeyResult{ newResult(ValkeyMessage{typ: '+', string: "Delegate0"}, nil), newResult(ValkeyMessage{typ: '+', string: "Delegate1"}, nil), }} }, ReceiveFn: func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error { return ErrClosing }, SetPubSubHooksFn: func(hooks PubSubHooks) <-chan error { ch := make(chan error, 1) ch <- ErrClosing close(ch) return ch }, ErrorFn: func() error { return ErrClosing }, CloseFn: func() { closed = true }, } primaryNodeConn.AcquireFn = func() wire { return w } stored := false primaryNodeConn.StoreFn = func(ww wire) { if ww != w { t.Fatalf("received unexpected wire %v", ww) } stored = true } if err := client.Dedicated(func(c DedicatedClient) error { ch := c.SetPubSubHooks(PubSubHooks{OnMessage: func(m PubSubMessage) {}}) if v, err := c.Do(context.Background(), c.B().Get().Key("a").Build()).ToString(); err != nil || v != "Delegate" { t.Fatalf("unexpected response %v %v", v, err) } if v := c.DoMulti(context.Background()); len(v) != 0 { t.Fatalf("received unexpected response %v", v) } for i, resp := range c.DoMulti( context.Background(), c.B().Info().Build(), c.B().Info().Build(), ) { if v, err := resp.ToString(); err != nil || v != "Delegate"+strconv.Itoa(i) { t.Fatalf("unexpected response %v %v", v, err) } } for i, resp := range c.DoMulti( context.Background(), c.B().Get().Key("a").Build(), c.B().Get().Key("a").Build(), ) { if v, err := resp.ToString(); err != nil || v != "Delegate"+strconv.Itoa(i) { t.Fatalf("unexpected response %v %v", v, err) } } for i, resp := range c.DoMulti( context.Background(), c.B().Multi().Build(), c.B().Get().Key("a").Build(), c.B().Get().Key("a").Build(), c.B().Exec().Build(), )[3].val.values { if v, err := resp.ToString(); err != nil || v != "Delegate"+strconv.Itoa(i) { t.Fatalf("unexpected response %v %v", v, err) } } if err := c.Receive(context.Background(), c.B().Ssubscribe().Channel("a").Build(), func(msg PubSubMessage) {}); err != ErrClosing { t.Fatalf("unexpected ret %v", err) } if err := <-ch; err != ErrClosing { t.Fatalf("unexpected ret %v", err) } if err := <-c.SetPubSubHooks(PubSubHooks{OnMessage: func(m PubSubMessage) {}}); err != ErrClosing { t.Fatalf("unexpected ret %v", err) } c.Close() return nil }); err != nil { t.Fatalf("unexpected err %v", err) } if !stored { t.Fatalf("Dedicated desn't put back the wire") } if !closed { t.Fatalf("Dedicated desn't delegate Close") } }) t.Run("Dedicate", func(t *testing.T) { closed := false w := &mockWire{ DoFn: func(cmd Completed) ValkeyResult { return newResult(ValkeyMessage{typ: '+', string: "Delegate"}, nil) }, DoMultiFn: func(cmd ...Completed) *valkeyresults { if len(cmd) == 4 { return &valkeyresults{s: []ValkeyResult{ newResult(ValkeyMessage{typ: '+', string: "OK"}, nil), newResult(ValkeyMessage{typ: '+', string: "OK"}, nil), newResult(ValkeyMessage{typ: '+', string: "OK"}, nil), newResult(ValkeyMessage{typ: '*', values: []ValkeyMessage{ {typ: '+', string: "Delegate0"}, {typ: '+', string: "Delegate1"}, }}, nil), }} } return &valkeyresults{s: []ValkeyResult{ newResult(ValkeyMessage{typ: '+', string: "Delegate0"}, nil), newResult(ValkeyMessage{typ: '+', string: "Delegate1"}, nil), }} }, ReceiveFn: func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error { return ErrClosing }, SetPubSubHooksFn: func(hooks PubSubHooks) <-chan error { ch := make(chan error, 1) ch <- ErrClosing close(ch) return ch }, ErrorFn: func() error { return ErrClosing }, CloseFn: func() { closed = true }, } primaryNodeConn.AcquireFn = func() wire { return w } stored := false primaryNodeConn.StoreFn = func(ww wire) { if ww != w { t.Fatalf("received unexpected wire %v", ww) } stored = true } c, cancel := client.Dedicate() ch := c.SetPubSubHooks(PubSubHooks{OnMessage: func(m PubSubMessage) {}}) if v, err := c.Do(context.Background(), c.B().Get().Key("a").Build()).ToString(); err != nil || v != "Delegate" { t.Fatalf("unexpected response %v %v", v, err) } if v := c.DoMulti(context.Background()); len(v) != 0 { t.Fatalf("received unexpected response %v", v) } for i, resp := range c.DoMulti( context.Background(), c.B().Info().Build(), c.B().Info().Build(), ) { if v, err := resp.ToString(); err != nil || v != "Delegate"+strconv.Itoa(i) { t.Fatalf("unexpected response %v %v", v, err) } } for i, resp := range c.DoMulti( context.Background(), c.B().Get().Key("a").Build(), c.B().Get().Key("a").Build(), ) { if v, err := resp.ToString(); err != nil || v != "Delegate"+strconv.Itoa(i) { t.Fatalf("unexpected response %v %v", v, err) } } for i, resp := range c.DoMulti( context.Background(), c.B().Multi().Build(), c.B().Get().Key("a").Build(), c.B().Get().Key("a").Build(), c.B().Exec().Build(), )[3].val.values { if v, err := resp.ToString(); err != nil || v != "Delegate"+strconv.Itoa(i) { t.Fatalf("unexpected response %v %v", v, err) } } if err := c.Receive(context.Background(), c.B().Ssubscribe().Channel("a").Build(), func(msg PubSubMessage) {}); err != ErrClosing { t.Fatalf("unexpected ret %v", err) } if err := <-ch; err != ErrClosing { t.Fatalf("unexpected ret %v", err) } if err := <-c.SetPubSubHooks(PubSubHooks{OnMessage: func(m PubSubMessage) {}}); err != ErrClosing { t.Fatalf("unexpected ret %v", err) } c.Close() cancel() if !stored { t.Fatalf("Dedicated desn't put back the wire") } if !closed { t.Fatalf("Dedicated desn't delegate Close") } }) } //gocyclo:ignore func TestClusterClient_SendToOnlyPrimaryNodeWhenPrimaryNodeSelected(t *testing.T) { defer ShouldNotLeaked(SetupLeakDetection()) primaryNodeConn := &mockConn{ DoOverride: map[string]func(cmd Completed) ValkeyResult{ "CLUSTER SLOTS": func(cmd Completed) ValkeyResult { return slotsMultiResp }, "GET Do": func(cmd Completed) ValkeyResult { return newResult(ValkeyMessage{typ: '+', string: "GET Do"}, nil) }, "GET K1{a}": func(cmd Completed) ValkeyResult { return newResult(ValkeyMessage{typ: '+', string: "GET K1{a}"}, nil) }, "GET K2{a}": func(cmd Completed) ValkeyResult { return newResult(ValkeyMessage{typ: '+', string: "GET K2{a}"}, nil) }, "INFO": func(cmd Completed) ValkeyResult { return newResult(ValkeyMessage{typ: '+', string: "INFO"}, nil) }, }, DoMultiFn: func(multi ...Completed) *valkeyresults { resps := make([]ValkeyResult, len(multi)) for i, cmd := range multi { resps[i] = newResult(ValkeyMessage{typ: '+', string: strings.Join(cmd.Commands(), " ")}, nil) } return &valkeyresults{s: resps} }, DoCacheOverride: map[string]func(cmd Cacheable, ttl time.Duration) ValkeyResult{ "GET DoCache": func(cmd Cacheable, ttl time.Duration) ValkeyResult { return newResult(ValkeyMessage{typ: '+', string: "GET DoCache"}, nil) }, "GET K1{a}": func(cmd Cacheable, ttl time.Duration) ValkeyResult { return newResult(ValkeyMessage{typ: '+', string: "GET K1{a}"}, nil) }, "GET K2{a}": func(cmd Cacheable, ttl time.Duration) ValkeyResult { return newResult(ValkeyMessage{typ: '+', string: "GET K2{a}"}, nil) }, }, DoMultiCacheFn: func(multi ...CacheableTTL) *valkeyresults { resps := make([]ValkeyResult, len(multi)) for i, cmd := range multi { resps[i] = newResult(ValkeyMessage{typ: '+', string: strings.Join(cmd.Cmd.Commands(), " ")}, nil) } return &valkeyresults{s: resps} }, } replicaNodeConn := &mockConn{} client, err := newClusterClient( &ClientOption{ InitAddress: []string{"127.0.0.1:0"}, SendToReplicas: func(cmd Completed) bool { return true }, ReplicaSelector: func(slot uint16, replicas []ReplicaInfo) int { return -1 }, }, func(dst string, opt *ClientOption) conn { if dst == "127.0.0.1:0" || dst == "127.0.2.1:0" { // primary nodes return primaryNodeConn } else { // replica nodes return replicaNodeConn } }, newRetryer(defaultRetryDelayFn), ) if err != nil { t.Fatalf("unexpected err %v", err) } t.Run("Do", func(t *testing.T) { c := client.B().Get().Key("Do").Build() if v, err := client.Do(context.Background(), c).ToString(); err != nil || v != "GET Do" { t.Fatalf("unexpected response %v %v", v, err) } }) t.Run("DoMulti Single Slot", func(t *testing.T) { c1 := client.B().Get().Key("K1{a}").Build() c2 := client.B().Get().Key("K2{a}").Build() resps := client.DoMulti(context.Background(), c1, c2) if v, err := resps[0].ToString(); err != nil || v != "GET K1{a}" { t.Fatalf("unexpected response %v %v", v, err) } if v, err := resps[1].ToString(); err != nil || v != "GET K2{a}" { t.Fatalf("unexpected response %v %v", v, err) } }) t.Run("DoMulti Single Slot + Init Slot", func(t *testing.T) { c1 := client.B().Get().Key("K1{a}").Build() c2 := client.B().Info().Build() resps := client.DoMulti(context.Background(), c1, c2) if v, err := resps[0].ToString(); err != nil || v != "GET K1{a}" { t.Fatalf("unexpected response %v %v", v, err) } if v, err := resps[1].ToString(); err != nil || v != "INFO" { t.Fatalf("unexpected response %v %v", v, err) } }) t.Run("DoMulti Cross Slot + Init Slot", func(t *testing.T) { defer func() { if err := recover(); err != panicMixCxSlot { t.Errorf("DoMulti should panic if Cross Slot + Init Slot") } }() c1 := client.B().Get().Key("K1{a}").Build() c2 := client.B().Get().Key("K1{b}").Build() c3 := client.B().Info().Build() client.DoMulti(context.Background(), c1, c2, c3) }) t.Run("DoMulti Multi Slot", func(t *testing.T) { multi := make([]Completed, 500) for i := 0; i < len(multi); i++ { multi[i] = client.B().Get().Key(fmt.Sprintf("K1{%d}", i)).Build() } resps := client.DoMulti(context.Background(), multi...) for i := 0; i < len(multi); i++ { if v, err := resps[i].ToString(); err != nil || v != fmt.Sprintf("GET K1{%d}", i) { t.Fatalf("unexpected response %v %v", v, err) } } }) t.Run("DoCache", func(t *testing.T) { c := client.B().Get().Key("DoCache").Cache() if v, err := client.DoCache(context.Background(), c, 100).ToString(); err != nil || v != "GET DoCache" { t.Fatalf("unexpected response %v %v", v, err) } }) t.Run("DoMultiCache Single Slot", func(t *testing.T) { c1 := client.B().Get().Key("K1{a}").Cache() c2 := client.B().Get().Key("K2{a}").Cache() resps := client.DoMultiCache(context.Background(), CT(c1, time.Second), CT(c2, time.Second)) if v, err := resps[0].ToString(); err != nil || v != "GET K1{a}" { t.Fatalf("unexpected response %v %v", v, err) } if v, err := resps[1].ToString(); err != nil || v != "GET K2{a}" { t.Fatalf("unexpected response %v %v", v, err) } }) t.Run("DoMultiCache Multi Slot", func(t *testing.T) { multi := make([]CacheableTTL, 500) for i := 0; i < len(multi); i++ { multi[i] = CT(client.B().Get().Key(fmt.Sprintf("K1{%d}", i)).Cache(), time.Second) } resps := client.DoMultiCache(context.Background(), multi...) for i := 0; i < len(multi); i++ { if v, err := resps[i].ToString(); err != nil || v != fmt.Sprintf("GET K1{%d}", i) { t.Fatalf("unexpected response %v %v", v, err) } } }) t.Run("Receive", func(t *testing.T) { c := client.B().Subscribe().Channel("ch").Build() hdl := func(message PubSubMessage) {} primaryNodeConn.ReceiveFn = func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error { if !reflect.DeepEqual(subscribe.Commands(), c.Commands()) { t.Fatalf("unexpected command %v", subscribe) } return nil } replicaNodeConn.ReceiveFn = func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error { if !reflect.DeepEqual(subscribe.Commands(), c.Commands()) { t.Fatalf("unexpected command %v", subscribe) } return nil } if err := client.Receive(context.Background(), c, hdl); err != nil { t.Fatalf("unexpected response %v", err) } }) t.Run("Receive Valkey Err", func(t *testing.T) { c := client.B().Subscribe().Channel("ch").Build() e := &ValkeyError{} primaryNodeConn.ReceiveFn = func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error { return e } replicaNodeConn.ReceiveFn = func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error { return e } if err := client.Receive(context.Background(), c, func(message PubSubMessage) {}); err != e { t.Fatalf("unexpected response %v", err) } }) t.Run("Dedicated Err", func(t *testing.T) { v := errors.New("fn err") if err := client.Dedicated(func(client DedicatedClient) error { return v }); err != v { t.Fatalf("unexpected err %v", err) } }) t.Run("Dedicated Cross Slot Err", func(t *testing.T) { defer func() { if err := recover(); err != panicMsgCxSlot { t.Errorf("Dedicated should panic if cross slots is used") } }() primaryNodeConn.AcquireFn = func() wire { return &mockWire{} } client.Dedicated(func(c DedicatedClient) error { c.Do(context.Background(), c.B().Get().Key("a").Build()).Error() return c.Do(context.Background(), c.B().Get().Key("b").Build()).Error() }) }) t.Run("Dedicated Cross Slot Err Multi", func(t *testing.T) { defer func() { if err := recover(); err != panicMsgCxSlot { t.Errorf("Dedicated should panic if cross slots is used") } }() primaryNodeConn.AcquireFn = func() wire { return &mockWire{ DoMultiFn: func(multi ...Completed) *valkeyresults { return &valkeyresults{s: []ValkeyResult{ newResult(ValkeyMessage{typ: '+', string: "OK"}, nil), newResult(ValkeyMessage{typ: '+', string: "OK"}, nil), newResult(ValkeyMessage{typ: '*', values: []ValkeyMessage{{typ: '+', string: "a"}}}, nil), }} }, } } client.Dedicated(func(c DedicatedClient) (err error) { c.DoMulti( context.Background(), c.B().Multi().Build(), c.B().Get().Key("a").Build(), c.B().Exec().Build(), ) c.DoMulti( context.Background(), c.B().Multi().Build(), c.B().Get().Key("b").Build(), c.B().Exec().Build(), ) return nil }) }) t.Run("Dedicated Multi Cross Slot Err", func(t *testing.T) { primaryNodeConn.AcquireFn = func() wire { return &mockWire{} } err := client.Dedicated(func(c DedicatedClient) (err error) { defer func() { err = errors.New(recover().(string)) }() c.DoMulti( context.Background(), c.B().Get().Key("a").Build(), c.B().Get().Key("b").Build(), ) return nil }) if err == nil || err.Error() != panicMsgCxSlot { t.Errorf("Multi should panic if cross slots is used") } }) t.Run("Dedicated Receive Valkey Err", func(t *testing.T) { e := &ValkeyError{} w := &mockWire{ ReceiveFn: func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error { return e }, } primaryNodeConn.AcquireFn = func() wire { return w } if err := client.Dedicated(func(c DedicatedClient) error { return c.Receive(context.Background(), c.B().Ssubscribe().Channel("a").Build(), func(msg PubSubMessage) {}) }); err != e { t.Fatalf("unexpected err %v", err) } }) t.Run("Dedicated", func(t *testing.T) { closed := false w := &mockWire{ DoFn: func(cmd Completed) ValkeyResult { return newResult(ValkeyMessage{typ: '+', string: "Delegate"}, nil) }, DoMultiFn: func(cmd ...Completed) *valkeyresults { if len(cmd) == 4 { return &valkeyresults{s: []ValkeyResult{ newResult(ValkeyMessage{typ: '+', string: "OK"}, nil), newResult(ValkeyMessage{typ: '+', string: "OK"}, nil), newResult(ValkeyMessage{typ: '+', string: "OK"}, nil), newResult(ValkeyMessage{typ: '*', values: []ValkeyMessage{ {typ: '+', string: "Delegate0"}, {typ: '+', string: "Delegate1"}, }}, nil), }} } return &valkeyresults{s: []ValkeyResult{ newResult(ValkeyMessage{typ: '+', string: "Delegate0"}, nil), newResult(ValkeyMessage{typ: '+', string: "Delegate1"}, nil), }} }, ReceiveFn: func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error { return ErrClosing }, SetPubSubHooksFn: func(hooks PubSubHooks) <-chan error { ch := make(chan error, 1) ch <- ErrClosing close(ch) return ch }, ErrorFn: func() error { return ErrClosing }, CloseFn: func() { closed = true }, } primaryNodeConn.AcquireFn = func() wire { return w } stored := false primaryNodeConn.StoreFn = func(ww wire) { if ww != w { t.Fatalf("received unexpected wire %v", ww) } stored = true } if err := client.Dedicated(func(c DedicatedClient) error { ch := c.SetPubSubHooks(PubSubHooks{OnMessage: func(m PubSubMessage) {}}) if v, err := c.Do(context.Background(), c.B().Get().Key("a").Build()).ToString(); err != nil || v != "Delegate" { t.Fatalf("unexpected response %v %v", v, err) } if v := c.DoMulti(context.Background()); len(v) != 0 { t.Fatalf("received unexpected response %v", v) } for i, resp := range c.DoMulti( context.Background(), c.B().Info().Build(), c.B().Info().Build(), ) { if v, err := resp.ToString(); err != nil || v != "Delegate"+strconv.Itoa(i) { t.Fatalf("unexpected response %v %v", v, err) } } for i, resp := range c.DoMulti( context.Background(), c.B().Get().Key("a").Build(), c.B().Get().Key("a").Build(), ) { if v, err := resp.ToString(); err != nil || v != "Delegate"+strconv.Itoa(i) { t.Fatalf("unexpected response %v %v", v, err) } } for i, resp := range c.DoMulti( context.Background(), c.B().Multi().Build(), c.B().Get().Key("a").Build(), c.B().Get().Key("a").Build(), c.B().Exec().Build(), )[3].val.values { if v, err := resp.ToString(); err != nil || v != "Delegate"+strconv.Itoa(i) { t.Fatalf("unexpected response %v %v", v, err) } } if err := c.Receive(context.Background(), c.B().Ssubscribe().Channel("a").Build(), func(msg PubSubMessage) {}); err != ErrClosing { t.Fatalf("unexpected ret %v", err) } if err := <-ch; err != ErrClosing { t.Fatalf("unexpected ret %v", err) } if err := <-c.SetPubSubHooks(PubSubHooks{OnMessage: func(m PubSubMessage) {}}); err != ErrClosing { t.Fatalf("unexpected ret %v", err) } c.Close() return nil }); err != nil { t.Fatalf("unexpected err %v", err) } if !stored { t.Fatalf("Dedicated desn't put back the wire") } if !closed { t.Fatalf("Dedicated desn't delegate Close") } }) t.Run("Dedicate", func(t *testing.T) { closed := false w := &mockWire{ DoFn: func(cmd Completed) ValkeyResult { return newResult(ValkeyMessage{typ: '+', string: "Delegate"}, nil) }, DoMultiFn: func(cmd ...Completed) *valkeyresults { if len(cmd) == 4 { return &valkeyresults{s: []ValkeyResult{ newResult(ValkeyMessage{typ: '+', string: "OK"}, nil), newResult(ValkeyMessage{typ: '+', string: "OK"}, nil), newResult(ValkeyMessage{typ: '+', string: "OK"}, nil), newResult(ValkeyMessage{typ: '*', values: []ValkeyMessage{ {typ: '+', string: "Delegate0"}, {typ: '+', string: "Delegate1"}, }}, nil), }} } return &valkeyresults{s: []ValkeyResult{ newResult(ValkeyMessage{typ: '+', string: "Delegate0"}, nil), newResult(ValkeyMessage{typ: '+', string: "Delegate1"}, nil), }} }, ReceiveFn: func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error { return ErrClosing }, SetPubSubHooksFn: func(hooks PubSubHooks) <-chan error { ch := make(chan error, 1) ch <- ErrClosing close(ch) return ch }, ErrorFn: func() error { return ErrClosing }, CloseFn: func() { closed = true }, } primaryNodeConn.AcquireFn = func() wire { return w } stored := false primaryNodeConn.StoreFn = func(ww wire) { if ww != w { t.Fatalf("received unexpected wire %v", ww) } stored = true } c, cancel := client.Dedicate() ch := c.SetPubSubHooks(PubSubHooks{OnMessage: func(m PubSubMessage) {}}) if v, err := c.Do(context.Background(), c.B().Get().Key("a").Build()).ToString(); err != nil || v != "Delegate" { t.Fatalf("unexpected response %v %v", v, err) } if v := c.DoMulti(context.Background()); len(v) != 0 { t.Fatalf("received unexpected response %v", v) } for i, resp := range c.DoMulti( context.Background(), c.B().Info().Build(), c.B().Info().Build(), ) { if v, err := resp.ToString(); err != nil || v != "Delegate"+strconv.Itoa(i) { t.Fatalf("unexpected response %v %v", v, err) } } for i, resp := range c.DoMulti( context.Background(), c.B().Get().Key("a").Build(), c.B().Get().Key("a").Build(), ) { if v, err := resp.ToString(); err != nil || v != "Delegate"+strconv.Itoa(i) { t.Fatalf("unexpected response %v %v", v, err) } } for i, resp := range c.DoMulti( context.Background(), c.B().Multi().Build(), c.B().Get().Key("a").Build(), c.B().Get().Key("a").Build(), c.B().Exec().Build(), )[3].val.values { if v, err := resp.ToString(); err != nil || v != "Delegate"+strconv.Itoa(i) { t.Fatalf("unexpected response %v %v", v, err) } } if err := c.Receive(context.Background(), c.B().Ssubscribe().Channel("a").Build(), func(msg PubSubMessage) {}); err != ErrClosing { t.Fatalf("unexpected ret %v", err) } if err := <-ch; err != ErrClosing { t.Fatalf("unexpected ret %v", err) } if err := <-c.SetPubSubHooks(PubSubHooks{OnMessage: func(m PubSubMessage) {}}); err != ErrClosing { t.Fatalf("unexpected ret %v", err) } c.Close() cancel() if !stored { t.Fatalf("Dedicated desn't put back the wire") } if !closed { t.Fatalf("Dedicated desn't delegate Close") } }) }