Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 33 additions & 4 deletions tests/common/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,41 @@ func minimalE2eEnabled() bool {
func e2eClusterTestCases() []testCase {
minimalTestCases := []testCase{
{
name: "NoTLS",
config: config.ClusterConfig{ClusterSize: 1},
name: "NoTLS",
config: config.ClusterConfig{
ClusterSize: 1,
ClusterContext: &e2e.ClusterContext{
ClientHTTPSeparate: false,
},
},
},
{
name: "NoTLS SeparateHTTPPort",
config: config.ClusterConfig{
ClusterSize: 1,
ClusterContext: &e2e.ClusterContext{
ClientHTTPSeparate: true,
},
},
},
{
name: "PeerTLS and ClientTLS",
config: config.ClusterConfig{ClusterSize: 3, PeerTLS: config.ManualTLS, ClientTLS: config.ManualTLS},
name: "PeerTLS and ClientTLS",
config: config.ClusterConfig{
ClusterSize: 3,
PeerTLS: config.ManualTLS,
ClientTLS: config.ManualTLS,
},
},
{
name: "PeerTLS and ClientTLS SeparateHTTPPort",
config: config.ClusterConfig{
ClusterSize: 3,
PeerTLS: config.ManualTLS,
ClientTLS: config.ManualTLS,
ClusterContext: &e2e.ClusterContext{
ClientHTTPSeparate: true,
},
},
},
}

Expand Down
98 changes: 98 additions & 0 deletions tests/common/watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,20 @@ package common

import (
"context"
"fmt"
"strings"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"

"go.etcd.io/etcd/tests/v3/framework/config"
"go.etcd.io/etcd/tests/v3/framework/e2e"
"go.etcd.io/etcd/tests/v3/framework/testutils"

e2e_utils "go.etcd.io/etcd/tests/v3/e2e"
)

func TestWatch(t *testing.T) {
Expand Down Expand Up @@ -85,6 +91,7 @@ func TestWatch(t *testing.T) {
wCancel()
require.NoErrorf(t, err, "failed to get key-values from watch channel %s", err)
}
fmt.Println("kvs: ", kvs)

wCancel()
assert.Equal(t, tt.wanted, kvs)
Expand All @@ -93,3 +100,94 @@ func TestWatch(t *testing.T) {
})
}
}

func TestWatchDelayForPeriodicProgressNotification(t *testing.T) {
testRunner.BeforeTest(t)
watchResponsePeriod := 100 * time.Millisecond
watchTestDuration := 5 * time.Second
dbSizeBytes := 5 * 1000 * 1000

for _, tc := range clusterTestCases() {
t.Run(tc.name, func(t *testing.T) {
ctx, cancel := context.WithTimeout(t.Context(), 20*time.Second)
defer cancel()

cfg := tc.config
if cfg.ClusterContext == nil {
cfg.ClusterContext = &e2e.ClusterContext{}
}
cfg.ClusterContext.(*e2e.ClusterContext).ServerWatchProgressNotifyInterval = watchResponsePeriod

clus := testRunner.NewCluster(ctx, t, config.WithClusterConfig(cfg))
defer clus.Close()

cc := testutils.MustClient(clus.Client())

wCtx, cancel := context.WithTimeout(t.Context(), watchTestDuration)
defer cancel()
require.NoError(t, e2e_utils.FillEtcdWithData(ctx, cc, dbSizeBytes))

g := errgroup.Group{}

wch := cc.Watch(wCtx, "fake-key", config.WatchOptions{ProgressNotify: true})
require.NotNil(t, wch)

e2e_utils.ContinuouslyExecuteGetAll(wCtx, t, &g, cc)

e2e_utils.ValidateWatchDelay(t, wch, 150*time.Millisecond)
require.NoError(t, g.Wait())
})
}
}

func TestWatchDelayForEvent(t *testing.T) {
e2e.BeforeTest(t)
watchResponsePeriod := 100 * time.Millisecond
watchTestDuration := 5 * time.Second
dbSizeBytes := 5 * 1000 * 1000

for _, tc := range clusterTestCases() {
t.Run(tc.name, func(t *testing.T) {
ctx, cancel := context.WithTimeout(t.Context(), 20*time.Second)
defer cancel()

cfg := tc.config
if cfg.ClusterContext == nil {
cfg.ClusterContext = &e2e.ClusterContext{}
}
cfg.ClusterContext.(*e2e.ClusterContext).ServerWatchProgressNotifyInterval = watchResponsePeriod

clus := testRunner.NewCluster(ctx, t, config.WithClusterConfig(cfg))
defer clus.Close()

cc := testutils.MustClient(clus.Client())

wCtx, cancel := context.WithTimeout(t.Context(), watchTestDuration)
defer cancel()
require.NoError(t, e2e_utils.FillEtcdWithData(ctx, cc, dbSizeBytes))

g := errgroup.Group{}
g.Go(func() error {
i := 0
for {
err := cc.Put(ctx, "key", fmt.Sprintf("%d", i), config.PutOptions{})
if err != nil {
if strings.Contains(err.Error(), "context deadline exceeded") {
return nil
}
return err
}
time.Sleep(watchResponsePeriod)
}
})

wch := cc.Watch(wCtx, "key", config.WatchOptions{})
require.NotNil(t, wch)

e2e_utils.ContinuouslyExecuteGetAll(wCtx, t, &g, cc)

e2e_utils.ValidateWatchDelay(t, wch, 150*time.Millisecond)
require.NoError(t, g.Wait())
})
}
}
7 changes: 4 additions & 3 deletions tests/e2e/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,10 @@ import (
"go.etcd.io/etcd/client/pkg/v3/transport"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/pkg/v3/stringutil"
"go.etcd.io/etcd/tests/v3/framework/config"
"go.etcd.io/etcd/tests/v3/framework/e2e"
"go.etcd.io/etcd/tests/v3/framework/integration"
"go.etcd.io/etcd/tests/v3/framework/interfaces"
)

func newClient(t *testing.T, entpoints []string, cfg e2e.ClientConfig) *clientv3.Client {
Expand Down Expand Up @@ -87,7 +89,7 @@ func tlsInfo(tb testing.TB, cfg e2e.ClientConfig) (*transport.TLSInfo, error) {
}
}

func fillEtcdWithData(ctx context.Context, c *clientv3.Client, dbSize int) error {
func FillEtcdWithData(ctx context.Context, c interfaces.Client, dbSize int) error {
g := errgroup.Group{}
concurrency := 10
keyCount := 100
Expand All @@ -97,8 +99,7 @@ func fillEtcdWithData(ctx context.Context, c *clientv3.Client, dbSize int) error
i := i
g.Go(func() error {
for j := 0; j < keysPerRoutine; j++ {
_, err := c.Put(ctx, fmt.Sprintf("%d", i*keysPerRoutine+j), stringutil.RandString(uint(valueSize)))
if err != nil {
if err := c.Put(ctx, fmt.Sprintf("%d", i*keysPerRoutine+j), stringutil.RandString(uint(valueSize)), config.PutOptions{}); err != nil {
return err
}
}
Expand Down
103 changes: 103 additions & 0 deletions tests/e2e/watch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// Copyright 2023 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// These tests are performance sensitive, addition of cluster proxy makes them unstable.
//go:build !cluster_proxy

package e2e

import (
"context"
"strings"
"sync"
"testing"
"time"

"golang.org/x/sync/errgroup"

clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/tests/v3/framework/config"
"go.etcd.io/etcd/tests/v3/framework/interfaces"
)

const (
watchResponsePeriod = 100 * time.Millisecond
watchTestDuration = 5 * time.Second
readLoadConcurrency = 10
)

func ValidateWatchDelay(t *testing.T, watch clientv3.WatchChan, maxWatchDelay time.Duration) {
start := time.Now()
var maxDelay time.Duration
for range watch {
sinceLast := time.Since(start)
if sinceLast > watchResponsePeriod+maxWatchDelay {
t.Errorf("Unexpected watch response delayed over allowed threshold %s, delay: %s", maxWatchDelay, sinceLast-watchResponsePeriod)
} else {
t.Logf("Got watch response, since last: %s", sinceLast)
}
if sinceLast > maxDelay {
maxDelay = sinceLast
}
start = time.Now()
}
sinceLast := time.Since(start)
if sinceLast > maxDelay && sinceLast > watchResponsePeriod+maxWatchDelay {
t.Errorf("Unexpected watch response delayed over allowed threshold %s, delay: unknown", maxWatchDelay)
t.Errorf("Test finished while in middle of delayed response, measured delay: %s", sinceLast-watchResponsePeriod)
t.Logf("Please increase the test duration to measure delay")
} else {
t.Logf("Max delay: %s", maxDelay-watchResponsePeriod)
}
}

func ContinuouslyExecuteGetAll(ctx context.Context, t *testing.T, g *errgroup.Group, c interfaces.Client) {
mux := sync.RWMutex{}
size := 0
for i := 0; i < readLoadConcurrency; i++ {
g.Go(func() error {
for {
resp, err := c.Get(ctx, "", config.GetOptions{Prefix: true})
if err != nil {
if strings.Contains(err.Error(), "context deadline exceeded") {
return nil
}
return err
}
respSize := 0
for _, kv := range resp.Kvs {
respSize += kv.Size()
}
mux.Lock()
size += respSize
mux.Unlock()
}
})
}
g.Go(func() error {
lastSize := size
for range time.Tick(time.Second) {
select {
case <-ctx.Done():
return nil
default:
}
mux.RLock()
t.Logf("Generating read load around %.1f MB/s", float64(size-lastSize)/1000/1000)
lastSize = size
mux.RUnlock()
}
return nil
})
}
27 changes: 21 additions & 6 deletions tests/e2e/watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,10 @@ import (
"go.etcd.io/etcd/api/v3/mvccpb"
v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/pkg/v3/stringutil"
"go.etcd.io/etcd/tests/v3/framework/e2e"
)

const (
watchResponsePeriod = 100 * time.Millisecond
watchTestDuration = 5 * time.Second
readLoadConcurrency = 10
)

type testCase struct {
name string
client e2e.ClientConfig
Expand Down Expand Up @@ -562,3 +557,23 @@ func TestResumeCompactionOnTombstone(t *testing.T) {
t.Fatal("timed out getting watch response")
}
}

func fillEtcdWithData(ctx context.Context, c *clientv3.Client, dbSize int) error {
g := errgroup.Group{}
concurrency := 10
keyCount := 100
keysPerRoutine := keyCount / concurrency
valueSize := dbSize / keyCount
for i := 0; i < concurrency; i++ {
i := i
g.Go(func() error {
for j := 0; j < keysPerRoutine; j++ {
if _, err := c.Put(ctx, fmt.Sprintf("%d", i*keysPerRoutine+j), stringutil.RandString(uint(valueSize))); err != nil {
return err
}
}
return nil
})
}
return g.Wait()
}
7 changes: 4 additions & 3 deletions tests/framework/config/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ type UserAddOptions struct {
}

type WatchOptions struct {
Prefix bool
Revision int64
RangeEnd string
Prefix bool
Revision int64
RangeEnd string
ProgressNotify bool
}
10 changes: 7 additions & 3 deletions tests/framework/e2e/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package e2e
import (
"fmt"
"strings"
"time"

"github.com/coreos/go-semver/semver"

Expand All @@ -40,9 +41,12 @@ func (cv ClusterVersion) String() string {
}

type ClusterContext struct {
Version ClusterVersion
EnvVars map[string]string
UseUnix bool
Version ClusterVersion
EnvVars map[string]string
UseUnix bool
BasePort int
ClientHTTPSeparate bool
ServerWatchProgressNotifyInterval time.Duration
}

var experimentalFlags = map[string]struct{}{
Expand Down
4 changes: 4 additions & 0 deletions tests/framework/e2e/e2e.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ func (e e2eRunner) NewCluster(ctx context.Context, tb testing.TB, opts ...config
if ctx.UseUnix {
e2eConfig.BaseClientScheme = "unix"
}
e2eConfig.ClientHTTPSeparate = ctx.ClientHTTPSeparate
if ctx.ServerWatchProgressNotifyInterval != 0 {
e2eConfig.ServerConfig.WatchProgressNotifyInterval = ctx.ServerWatchProgressNotifyInterval
}
}

switch cfg.ClientTLS {
Expand Down
Loading