Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 18 additions & 9 deletions pkg/kube/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"k8s.io/client-go/kubernetes/scheme"
cachetools "k8s.io/client-go/tools/cache"
watchtools "k8s.io/client-go/tools/watch"
"k8s.io/client-go/util/retry"
cmdutil "k8s.io/kubectl/pkg/cmd/util"
)

Expand Down Expand Up @@ -423,18 +424,26 @@ func batchPerform(infos ResourceList, fn func(*resource.Info) error, errs chan<-
}

func createResource(info *resource.Info) error {
obj, err := resource.NewHelper(info.Client, info.Mapping).WithFieldManager(getManagedFieldsManager()).Create(info.Namespace, true, info.Object)
if err != nil {
return err
}
return info.Refresh(obj, true)
return retry.RetryOnConflict(
retry.DefaultRetry,
func() error {
obj, err := resource.NewHelper(info.Client, info.Mapping).WithFieldManager(getManagedFieldsManager()).Create(info.Namespace, true, info.Object)
if err != nil {
return err
}
return info.Refresh(obj, true)
})
}

func deleteResource(info *resource.Info) error {
policy := metav1.DeletePropagationBackground
opts := &metav1.DeleteOptions{PropagationPolicy: &policy}
_, err := resource.NewHelper(info.Client, info.Mapping).WithFieldManager(getManagedFieldsManager()).DeleteWithOptions(info.Namespace, info.Name, opts)
return err
return retry.RetryOnConflict(
retry.DefaultRetry,
func() error {
policy := metav1.DeletePropagationBackground
opts := &metav1.DeleteOptions{PropagationPolicy: &policy}
_, err := resource.NewHelper(info.Client, info.Mapping).WithFieldManager(getManagedFieldsManager()).DeleteWithOptions(info.Namespace, info.Name, opts)
return err
})
}

func createPatch(target *resource.Info, current runtime.Object) ([]byte, types.PatchType, error) {
Expand Down
113 changes: 112 additions & 1 deletion pkg/kube/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,13 @@ func newResponse(code int, obj runtime.Object) (*http.Response, error) {
return &http.Response{StatusCode: code, Header: header, Body: body}, nil
}

func newResponseJSON(code int, json []byte) (*http.Response, error) {
header := http.Header{}
header.Set("Content-Type", runtime.ContentTypeJSON)
body := ioutil.NopCloser(bytes.NewReader(json))
return &http.Response{StatusCode: code, Header: header, Body: body}, nil
}

func newTestClient(t *testing.T) *Client {
testFactory := cmdtesting.NewTestFactory()
t.Cleanup(testFactory.Cleanup)
Expand All @@ -101,6 +108,100 @@ func newTestClient(t *testing.T) *Client {
}
}

func TestCreate(t *testing.T) {
// TODO: A data race occurs when testing creation of more than 1 pod at a time
listA := newPodList("starfish")
listB := newPodList("dolphin")

var actions []string
var iterationCounter int

c := newTestClient(t)
c.Factory.(*cmdtesting.TestFactory).UnstructuredClient = &fake.RESTClient{
NegotiatedSerializer: unstructuredSerializer,
Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) {
path, method := req.URL.Path, req.Method
bodyReader := new(strings.Builder)
_, _ = io.Copy(bodyReader, req.Body)
body := bodyReader.String()
actions = append(actions, path+":"+method)
t.Logf("got request %s %s", path, method)
switch {
case path == "/namespaces/default/pods" && method == "POST":
if strings.Contains(body, "starfish") {
if iterationCounter < 2 {
iterationCounter++
return newResponseJSON(409, resourceQuotaConflict)
}
return newResponse(200, &listA.Items[0])
}
return newResponseJSON(409, resourceQuotaConflict)
default:
t.Fatalf("unexpected request: %s %s", method, path)
return nil, nil
}
}),
}

t.Run("Create success", func(t *testing.T) {
list, err := c.Build(objBody(&listA), false)
if err != nil {
t.Fatal(err)
}

result, err := c.Create(list)
if err != nil {
t.Fatal(err)
}

if len(result.Created) != 1 {
t.Errorf("expected 1 resource created, got %d", len(result.Created))
}

expectedActions := []string{
"/namespaces/default/pods:POST",
"/namespaces/default/pods:POST",
"/namespaces/default/pods:POST",
}
if len(expectedActions) != len(actions) {
t.Fatalf("unexpected number of requests, expected %d, got %d", len(expectedActions), len(actions))
}
for k, v := range expectedActions {
if actions[k] != v {
t.Errorf("expected %s request got %s", v, actions[k])
}
}
})

t.Run("Create failure", func(t *testing.T) {
list, err := c.Build(objBody(&listB), false)
if err != nil {
t.Fatal(err)
}

_, err = c.Create(list)
if err == nil {
t.Errorf("expected error")
}

expectedString := "Operation cannot be fulfilled on " +
"resourcequotas \"quota\": the object has been modified; " +
"please apply your changes to the latest version and try again"
if err.Error() != expectedString {
t.Errorf("Unexpected error message: " + err.Error())
}

expectedActions := []string{
"/namespaces/default/pods:POST",
}
for k, v := range actions {
if expectedActions[0] != v {
t.Errorf("expected %s request got %s", v, actions[k])
}
}
})
}

func TestUpdate(t *testing.T) {
listA := newPodList("starfish", "otter", "squid")
listB := newPodList("starfish", "otter", "dolphin")
Expand All @@ -109,6 +210,7 @@ func TestUpdate(t *testing.T) {
listC.Items[0].Spec.Containers[0].Ports = []v1.ContainerPort{{Name: "https", ContainerPort: 443}}

var actions []string
var iterationCounter int

c := newTestClient(t)
c.Factory.(*cmdtesting.TestFactory).UnstructuredClient = &fake.RESTClient{
Expand Down Expand Up @@ -147,6 +249,10 @@ func TestUpdate(t *testing.T) {
}
return newResponse(200, &listB.Items[0])
case p == "/namespaces/default/pods" && m == "POST":
if iterationCounter < 2 {
iterationCounter++
return newResponseJSON(409, resourceQuotaConflict)
}
return newResponse(200, &listB.Items[1])
case p == "/namespaces/default/pods/squid" && m == "DELETE":
return newResponse(200, &listB.Items[1])
Expand Down Expand Up @@ -200,7 +306,9 @@ func TestUpdate(t *testing.T) {
"/namespaces/default/pods/otter:GET",
"/namespaces/default/pods/otter:GET",
"/namespaces/default/pods/dolphin:GET",
"/namespaces/default/pods:POST",
"/namespaces/default/pods:POST", // create dolphin
"/namespaces/default/pods:POST", // retry due to 409
"/namespaces/default/pods:POST", // retry due to 409
"/namespaces/default/pods/squid:GET",
"/namespaces/default/pods/squid:DELETE",
}
Expand Down Expand Up @@ -520,3 +628,6 @@ spec:
ports:
- containerPort: 80
`

var resourceQuotaConflict = []byte(`
{"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"Operation cannot be fulfilled on resourcequotas \"quota\": the object has been modified; please apply your changes to the latest version and try again","reason":"Conflict","details":{"name":"quota","kind":"resourcequotas"},"code":409}`)