package valkey import ( "bufio" "context" "errors" "fmt" "net" "runtime" "strconv" "sync" "sync/atomic" "testing" "time" "github.com/valkey-io/valkey-go/internal/cmds" ) func setupMux(wires []*mockWire) (conn *mux, checkClean func(t *testing.T)) { return setupMuxWithOption(wires, &ClientOption{}) } func setupMuxWithOption(wires []*mockWire, option *ClientOption) (conn *mux, checkClean func(t *testing.T)) { var mu sync.Mutex var count = -1 wfn := func() wire { mu.Lock() defer mu.Unlock() count++ return wires[count] } if option.BlockingPipeline == 0 { option.BlockingPipeline = DefaultBlockingPipeline } return newMux("", option, (*mockWire)(nil), (*mockWire)(nil), wfn, wfn), func(t *testing.T) { if count != len(wires)-1 { t.Fatalf("there is %d remaining unused wires", len(wires)-count-1) } } } func TestNewMuxDailErr(t *testing.T) { defer ShouldNotLeaked(SetupLeakDetection()) c := 0 e := errors.New("any") m := makeMux("", &ClientOption{}, func(dst string, opt *ClientOption) (net.Conn, error) { c++ return nil, e }) if err := m.Dial(); err != e { t.Fatalf("unexpected return %v", err) } if c != 1 { t.Fatalf("dialFn not called") } if w := m.pipe(0); w != m.dead { // c = 2 t.Fatalf("unexpected wire %v", w) } if err := m.Dial(); err != e { // c = 3 t.Fatalf("unexpected return %v", err) } if w := m.Acquire(); w != m.dead { t.Fatalf("unexpected wire %v", w) } if c != 4 { t.Fatalf("dialFn not called %v", c) } } func TestNewMux(t *testing.T) { defer ShouldNotLeaked(SetupLeakDetection()) n1, n2 := net.Pipe() mock := &valkeyMock{t: t, buf: bufio.NewReader(n2), conn: n2} go func() { mock.Expect("HELLO", "3"). Reply(ValkeyMessage{ typ: '%', values: []ValkeyMessage{ {typ: '+', string: "proto"}, {typ: ':', integer: 3}, }, }) mock.Expect("CLIENT", "TRACKING", "ON", "OPTIN"). ReplyString("OK") mock.Expect("CLIENT", "SETINFO", "LIB-NAME", LibName). ReplyError("UNKNOWN COMMAND") mock.Expect("CLIENT", "SETINFO", "LIB-VER", LibVer). ReplyError("UNKNOWN COMMAND") mock.Expect("PING").ReplyString("OK") mock.Close() }() m := makeMux("", &ClientOption{}, func(dst string, opt *ClientOption) (net.Conn, error) { return n1, nil }) if err := m.Dial(); err != nil { t.Fatalf("unexpected error %v", err) } t.Run("Override with previous mux", func(t *testing.T) { m2 := makeMux("", &ClientOption{}, func(dst string, opt *ClientOption) (net.Conn, error) { return n1, nil }) m2.Override(m) if err := m2.Dial(); err != nil { t.Fatalf("unexpected error %v", err) } m2.Close() }) } func TestNewMuxPipelineMultiplex(t *testing.T) { defer ShouldNotLeaked(SetupLeakDetection()) for _, v := range []int{-1, 0, 1, 2} { m := makeMux("", &ClientOption{PipelineMultiplex: v}, func(dst string, opt *ClientOption) (net.Conn, error) { return nil, nil }) if (v < 0 && len(m.wire) != 1) || (v >= 0 && len(m.wire) != 1<