Skip to content

Commit

Permalink
Merge pull request kubernetes#122957 from richabanker/uvip-bugfix
Browse files Browse the repository at this point in the history
MVP cleanup #2
  • Loading branch information
k8s-ci-robot authored Oct 3, 2024
2 parents 380c00d + 9c65b79 commit 3660a34
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 76 deletions.
103 changes: 55 additions & 48 deletions staging/src/k8s.io/apiserver/pkg/util/peerproxy/peerproxy_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,6 @@ type peerProxyHandler struct {
finishedSync atomic.Bool
}

type serviceableByResponse struct {
locallyServiceable bool
errorFetchingAddressFromLease bool
peerEndpoints []string
}

// responder implements rest.Responder for assisting a connector in writing objects or errors.
type responder struct {
w http.ResponseWriter
Expand Down Expand Up @@ -149,84 +143,97 @@ func (h *peerProxyHandler) WrapHandler(handler http.Handler) http.Handler {
gvr.Group = "core"
}

// find servers that are capable of serving this request
serviceableByResp, err := h.findServiceableByServers(gvr)
apiservers, err := h.findServiceableByServers(gvr)
if err != nil {
// this means that resource is an aggregated API or a CR since it wasn't found in SV informer cache, pass as it is
handler.ServeHTTP(w, r)
return
}
// found the gvr locally, pass request to the next handler in local apiserver
if serviceableByResp.locallyServiceable {
// resource wasn't found in SV informer cache which means that resource is an aggregated API
// or a CR. This situation is ok to be handled by local handler.
handler.ServeHTTP(w, r)
return
}

gv := schema.GroupVersion{Group: gvr.Group, Version: gvr.Version}
if serviceableByResp.errorFetchingAddressFromLease {
klog.ErrorS(err, "error fetching ip and port of remote server while proxying")
locallyServiceable, peerEndpoints, err := h.resolveServingLocation(apiservers)
if err != nil {
gv := schema.GroupVersion{Group: gvr.Group, Version: gvr.Version}
klog.ErrorS(err, "error finding serviceable-by apiservers for the requested resource", "gvr", gvr)
responsewriters.ErrorNegotiated(apierrors.NewServiceUnavailable("Error getting ip and port info of the remote server while proxying"), h.serializer, gv, w, r)
return
}

// no apiservers were found that could serve the request, pass request to
// next handler, that should eventually serve 404

// pass request to the next handler if found the gvr locally.
// TODO: maintain locally serviceable GVRs somewhere so that we dont have to
// consult the storageversion-informed map for those
if len(serviceableByResp.peerEndpoints) == 0 {
if locallyServiceable {
handler.ServeHTTP(w, r)
return
}

if len(peerEndpoints) == 0 {
klog.Errorf("gvr %v is not served by anything in this cluster", gvr)
handler.ServeHTTP(w, r)
return
}

// otherwise, randomly select an apiserver and proxy request to it
rand := rand.Intn(len(serviceableByResp.peerEndpoints))
destServerHostPort := serviceableByResp.peerEndpoints[rand]
rand := rand.Intn(len(peerEndpoints))
destServerHostPort := peerEndpoints[rand]
h.proxyRequestToDestinationAPIServer(r, w, destServerHostPort)

})
}

func (h *peerProxyHandler) findServiceableByServers(gvr schema.GroupVersionResource) (serviceableByResponse, error) {

func (h *peerProxyHandler) findServiceableByServers(gvr schema.GroupVersionResource) (*sync.Map, error) {
apiserversi, ok := h.svMap.Load(gvr)

// no value found for the requested gvr in svMap
if !ok || apiserversi == nil {
return serviceableByResponse{}, fmt.Errorf("no StorageVersions found for the GVR: %v", gvr)
return nil, fmt.Errorf("no storageVersions found for the GVR: %v", gvr)
}
apiservers := apiserversi.(*sync.Map)
response := serviceableByResponse{}

apiservers, _ := apiserversi.(*sync.Map)
return apiservers, nil
}

func (h *peerProxyHandler) resolveServingLocation(apiservers *sync.Map) (bool, []string, error) {
var peerServerEndpoints []string
var locallyServiceable bool
var respErr error

apiservers.Range(func(key, value interface{}) bool {
apiserverKey := key.(string)
if apiserverKey == h.serverId {
response.locallyServiceable = true
locallyServiceable = true
// stop iteration
return false
}

hostPort, err := h.reconciler.GetEndpoint(apiserverKey)
hostPort, err := h.hostportInfo(apiserverKey)
if err != nil {
response.errorFetchingAddressFromLease = true
klog.ErrorS(err, "failed to get peer ip from storage lease for server", "serverID", apiserverKey)
respErr = err
// continue with iteration
return true
}
// check ip format
_, _, err = net.SplitHostPort(hostPort)
if err != nil {
response.errorFetchingAddressFromLease = true
klog.ErrorS(err, "invalid address found for server", "serverID", apiserverKey)
return true
}

peerServerEndpoints = append(peerServerEndpoints, hostPort)
return true
})

response.peerEndpoints = peerServerEndpoints
return response, nil
// reset err if there was atleast one valid peer server found.
if len(peerServerEndpoints) > 0 {
respErr = nil
}

return locallyServiceable, peerServerEndpoints, respErr
}

func (h *peerProxyHandler) hostportInfo(apiserverKey string) (string, error) {
hostport, err := h.reconciler.GetEndpoint(apiserverKey)
if err != nil {
return "", err
}
// check ip format
_, _, err = net.SplitHostPort(hostport)
if err != nil {
return "", err
}

return hostport, nil
}

func (h *peerProxyHandler) proxyRequestToDestinationAPIServer(req *http.Request, rw http.ResponseWriter, host string) {
Expand All @@ -248,13 +255,11 @@ func (h *peerProxyHandler) proxyRequestToDestinationAPIServer(req *http.Request,
defer cancelFn()

proxyRoundTripper := transport.NewAuthProxyRoundTripper(user.GetName(), user.GetUID(), user.GetGroups(), user.GetExtra(), h.proxyTransport)

delegate := &epmetrics.ResponseWriterDelegator{ResponseWriter: rw}
w := responsewriter.WrapForHTTP1Or2(delegate)

handler := proxy.NewUpgradeAwareHandler(location, proxyRoundTripper, true, false, &responder{w: w, ctx: req.Context()})
handler.ServeHTTP(w, newReq)
// Increment the count of proxied requests
metrics.IncPeerProxiedRequest(req.Context(), strconv.Itoa(delegate.Status()))
}

Expand All @@ -280,11 +285,13 @@ func (h *peerProxyHandler) updateSV(oldObj interface{}, newObj interface{}) {
klog.Error("Invalid StorageVersion provided to updateSV()")
return
}

newSV, ok := newObj.(*v1alpha1.StorageVersion)
if !ok {
klog.Error("Invalid StorageVersion provided to updateSV()")
return
}

h.updateSVMap(oldSV, newSV)
}

Expand All @@ -295,17 +302,17 @@ func (h *peerProxyHandler) deleteSV(obj interface{}) {
klog.Error("Invalid StorageVersion provided to deleteSV()")
return
}

h.updateSVMap(sv, nil)
}

// Delete old storageversion, add new storagversion
func (h *peerProxyHandler) updateSVMap(oldSV *v1alpha1.StorageVersion, newSV *v1alpha1.StorageVersion) {
if oldSV != nil {
// delete old SV entries
h.deleteSVFromMap(oldSV)
}

if newSV != nil {
// add new SV entries
h.addSVToMap(newSV)
}
}
Expand Down
Loading

0 comments on commit 3660a34

Please sign in to comment.