Skip to content

Commit

Permalink
remove wrapper format
Browse files Browse the repository at this point in the history
  • Loading branch information
珩轩 committed Jun 4, 2024
1 parent 2c4a314 commit a1d1502
Show file tree
Hide file tree
Showing 9 changed files with 64 additions and 106 deletions.
7 changes: 3 additions & 4 deletions pkg/networkengine/routedriver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (

"github.com/openyurtio/raven/cmd/agent/app/config"
"github.com/openyurtio/raven/pkg/types"
"github.com/openyurtio/raven/pkg/utils"
)

// Driver is the interface for inner gateway routing mechanism.
Expand Down Expand Up @@ -61,17 +60,17 @@ func RegisterRouteDriver(name string, factory Factory) {
driversMutex.Lock()
defer driversMutex.Unlock()
if _, found := drivers[name]; found {
klog.Fatal(utils.FormatTunnel("route drivers %q was registered twice", name))
klog.Fatal("route drivers %q was registered twice", name)
}
klog.V(1).Info(utils.FormatTunnel("registered route driver %q", name))
klog.Info("registered route driver %q", name)
drivers[name] = factory
}

func New(name string, cfg *config.Config) (Driver, error) {
driversMutex.Lock()
defer driversMutex.Unlock()
if _, found := drivers[name]; !found {
klog.Fatal(utils.FormatTunnel("route driver %q not found", name))
klog.Fatal("route driver %q not found", name)
}
return drivers[name](cfg)
}
3 changes: 1 addition & 2 deletions pkg/networkengine/routedriver/vxlan/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"k8s.io/klog/v2"

netlinkutil "github.com/openyurtio/raven/pkg/networkengine/util/netlink"
"github.com/openyurtio/raven/pkg/utils"
)

const (
Expand All @@ -42,7 +41,7 @@ func ensureVxlanLink(vxlan netlink.Vxlan, vtepIP net.IP) (netlink.Link, error) {
return link
}
if _, ok := err.(netlink.LinkNotFoundError); !ok {
klog.Errorf(utils.FormatTunnel("error get vxlan link: %v", err))
klog.Errorf("error get vxlan link: %v", err)
}
return nil
}
Expand Down
20 changes: 11 additions & 9 deletions pkg/networkengine/routedriver/vxlan/vxlan.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"os"
"strings"
"syscall"
"time"

"github.com/vdobler/ht/errorlist"
"github.com/vishvananda/netlink"
Expand All @@ -36,7 +37,6 @@ import (
ipsetutil "github.com/openyurtio/raven/pkg/networkengine/util/ipset"
iptablesutil "github.com/openyurtio/raven/pkg/networkengine/util/iptables"
"github.com/openyurtio/raven/pkg/types"
"github.com/openyurtio/raven/pkg/utils"
)

const (
Expand All @@ -53,7 +53,8 @@ const (

ravenMark = 0x40

ravenMarkSet = "raven-mark-set"
ravenMarkSet = "raven-mark-set"
ravenMarkSetType = "hash:net"
)

var (
Expand All @@ -76,11 +77,11 @@ type vxlan struct {

func (vx *vxlan) Apply(network *types.Network, vpnDriverMTUFn func() (int, error)) (err error) {
if network.LocalEndpoint == nil || len(network.RemoteEndpoints) == 0 {
klog.Info(utils.FormatTunnel("no local gateway or remote gateway is found, cleaning up route setting"))
klog.Info("no local gateway or remote gateway is found, cleaning up route setting")
return vx.Cleanup()
}
if len(network.LocalNodeInfo) == 1 {
klog.Infof(utils.FormatTunnel("only gateway node exist in current gateway, cleaning up route setting"))
klog.Info("only gateway node exist in current gateway, cleaning up route setting")
return vx.Cleanup()
}

Expand All @@ -103,8 +104,7 @@ func (vx *vxlan) Apply(network *types.Network, vpnDriverMTUFn func() (int, error
// The desired and current ipset entries calculated from given network.
// The key is ip set entry
var desiredSet, currentSet map[string]*netlink.IPSetEntry

vx.ipset, err = ipsetutil.New(ravenMarkSet)
vx.ipset, err = ipsetutil.New(ravenMarkSet, ravenMarkSetType, ipsetutil.IpsetWrapperOption{})
if err != nil {
return fmt.Errorf("error create ip set: %s", err)
}
Expand Down Expand Up @@ -256,7 +256,7 @@ func (vx *vxlan) Init() (err error) {
return err
}

vx.ipset, err = ipsetutil.New(ravenMarkSet)
vx.ipset, err = ipsetutil.New(ravenMarkSet, ravenMarkSetType, ipsetutil.IpsetWrapperOption{})
if err != nil {
return err
}
Expand Down Expand Up @@ -530,7 +530,7 @@ func (vx *vxlan) calIPSetOnNode(network *types.Network) map[string]*netlink.IPSe
CIDR: uint8(ones),
Replace: true,
}
set[ipsetutil.SetEntryKey(entry)] = entry
set[vx.ipset.Key(entry)] = entry
}
}
return set
Expand Down Expand Up @@ -569,14 +569,16 @@ func (vx *vxlan) Cleanup() error {
}

// Clean may be called more than one time, so we should ensure ip set exists
vx.ipset, err = ipsetutil.New(ravenMarkSet)
vx.ipset, err = ipsetutil.New(ravenMarkSet, ravenMarkSetType, ipsetutil.IpsetWrapperOption{})
if err != nil {
errList = errList.Append(fmt.Errorf("error ensure ip set %s: %s", ravenMarkSet, err))
}
time.Sleep(time.Second)
err = vx.ipset.Flush()
if err != nil {
errList = errList.Append(fmt.Errorf("error flushing ipset: %s", err))
}
time.Sleep(time.Second)
err = vx.ipset.Destroy()
if err != nil {
errList = errList.Append(fmt.Errorf("error destroying ipset: %s", err))
Expand Down
8 changes: 4 additions & 4 deletions pkg/proxyengine/proxyclient/proxyclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (c *ProxyClient) Start(ctx context.Context) error {
}
clientCertManager, err := factory.NewCertManagerFactory(c.client).New(certMgrCfg)
if err != nil {
klog.Errorf(utils.FormatProxyClient("failed to new cert manager factory for proxy client %s, error %s", c.name, err.Error()))
klog.Errorf("failed to new cert manager factory for proxy client %s, error %s", c.name, err.Error())
return fmt.Errorf("failed to new cert manager factory for proxy client %s, error %s", c.name, err.Error())
}
clientCertManager.Start()
Expand All @@ -83,13 +83,13 @@ func (c *ProxyClient) Start(ctx context.Context) error {
if clientCertManager.Current() != nil {
return true, nil
}
klog.Infof(utils.FormatProxyClient("certificate %s not signed, waiting...", certMgrCfg.CommonName))
klog.Infof("certificate %s not signed, waiting...", certMgrCfg.CommonName)
return false, nil
}, ctx.Done())
for addr := range c.servers {
tlsCfg, err := certmanager.GenTLSConfigUseCertMgrAndCA(clientCertManager, addr, utils.RavenCAFile)
if err != nil {
klog.Error(utils.FormatProxyClient("failed to generate TLS Config"))
klog.Error("failed to generate TLS Config")
return fmt.Errorf("failed to generate TLS Config")
}
c.servers[addr] = tlsCfg
Expand All @@ -105,7 +105,7 @@ func (c *ProxyClient) run(stopCh <-chan struct{}) {
for addr, cert := range c.servers {
client := c.NewClient(addr, cert, stopCh)
client.Serve()
klog.Infof(utils.FormatProxyClient("start serving grpc request redirected from %s", addr))
klog.Infof("start serving grpc request redirected from %s", addr)
}
}

Expand Down
38 changes: 19 additions & 19 deletions pkg/proxyengine/proxyserver/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,10 @@ func (c *Interceptor) ServeHTTP(w http.ResponseWriter, r *http.Request) {

// serverRequest serves the normal requests, e.g., kubectl logs
func serveRequest(conn net.Conn, w http.ResponseWriter, r *http.Request) {
klog.Info(utils.FormatProxyServer("interceptor: start serving request %s with header: host %s, proxy mode: %s",
r.URL.String(), r.Header[utils.RavenProxyHostHeaderKey], r.Header[utils.RavenProxyServerForwardModeHeaderKey]))
defer klog.Info(utils.FormatProxyServer("interceptor: stop serving request %s with header: host %s, proxy mode: %s",
r.URL.String(), r.Header[utils.RavenProxyHostHeaderKey], r.Header[utils.RavenProxyServerForwardModeHeaderKey]))
klog.Infof("interceptor: start serving request %s with header: host %s, proxy mode: %s",
r.URL.String(), r.Header[utils.RavenProxyHostHeaderKey], r.Header[utils.RavenProxyServerForwardModeHeaderKey])
defer klog.Infof("interceptor: stop serving request %s with header: host %s, proxy mode: %s",
r.URL.String(), r.Header[utils.RavenProxyHostHeaderKey], r.Header[utils.RavenProxyServerForwardModeHeaderKey])
br := newBufioReader(conn)
defer putBufioReader(br)
resp, err := http.ReadResponse(br, r)
Expand Down Expand Up @@ -189,10 +189,10 @@ func serveRequest(conn net.Conn, w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
select {
case <-stopCh:
klog.Info(utils.FormatProxyServer("chunked request(%s) normally exit", r.URL.String()))
klog.Info("chunked request(%s) normally exit", r.URL.String())
case <-ctx.Done():
klog.Info(utils.FormatProxyServer("chunked request(%s) to agent(%s) closed by cloud client, %v", r.URL.String(),
r.Header.Get(utils.RavenProxyHostHeaderKey), ctx.Err()))
klog.Info("chunked request(%s) to agent(%s) closed by cloud client, %v", r.URL.String(),
r.Header.Get(utils.RavenProxyHostHeaderKey), ctx.Err())
conn.Close()
}
}(r, conn, stopCh)
Expand All @@ -203,15 +203,15 @@ func serveRequest(conn net.Conn, w http.ResponseWriter, r *http.Request) {
}
_, err = io.Copy(writer, resp.Body)
if err != nil && !isHTTPCloseError(err) {
klog.ErrorS(err, utils.FormatProxyServer("failed to copy response from proxy server to the frontend"))
klog.ErrorS(err, "failed to copy response from proxy server to the frontend")
}
}

func serveUpgradeRequest(conn net.Conn, w http.ResponseWriter, r *http.Request) {
klog.Info(utils.FormatProxyServer("interceptor: start serving streaming request %s with header: host %s, proxy mode: %s",
r.URL.String(), r.Header[utils.RavenProxyHostHeaderKey], r.Header[utils.RavenProxyServerForwardModeHeaderKey]))
defer klog.Info(utils.FormatProxyServer("interceptor: stop serving streaming request %s with header: host %s, proxy mode: %s",
r.URL.String(), r.Header[utils.RavenProxyHostHeaderKey], r.Header[utils.RavenProxyServerForwardModeHeaderKey]))
klog.Infof("interceptor: start serving streaming request %s with header: host %s, proxy mode: %s",
r.URL.String(), r.Header[utils.RavenProxyHostHeaderKey], r.Header[utils.RavenProxyServerForwardModeHeaderKey])
defer klog.Infof("interceptor: stop serving streaming request %s with header: host %s, proxy mode: %s",
r.URL.String(), r.Header[utils.RavenProxyHostHeaderKey], r.Header[utils.RavenProxyServerForwardModeHeaderKey])

resp, rawResp, err := getResponse(conn)
if err != nil {
Expand All @@ -233,37 +233,37 @@ func serveUpgradeRequest(conn net.Conn, w http.ResponseWriter, r *http.Request)
if resp.StatusCode != http.StatusSwitchingProtocols {
deadline := time.Now().Add(10 * time.Second)
if err = conn.SetReadDeadline(deadline); err != nil {
klog.Errorf(utils.FormatProxyServer("failed set proxy connect deadline, error %s", err.Error()))
klog.Errorf("failed set proxy connect deadline, error %s", err.Error())
}
if err = frontend.SetReadDeadline(deadline); err != nil {
klog.Errorf(utils.FormatProxyServer("failed set frontend connect deadline, error %s", err.Error()))
klog.Errorf("failed set frontend connect deadline, error %s", err.Error())
}
err = resp.Write(frontend)
if err != nil && !isHTTPCloseError(err) {
klog.Errorf(utils.FormatProxyServer("error proxying un-upgrade response from proxy channel to frontend: %s", err.Error()))
klog.Errorf("error proxying un-upgrade response from proxy channel to frontend: %s", err.Error())
}
return
}

if len(rawResp) > 0 {
if _, err = frontend.Write(rawResp); err != nil {
klog.Errorf(utils.FormatProxyServer("error proxying response bytes from tunnel to client: %s", err.Error()))
klog.Errorf("error proxying response bytes from tunnel to client: %s", err.Error())
}
}

readerComplete, writerComplete := make(chan struct{}), make(chan struct{})
go func() {
_, err = io.Copy(conn, frontend)
if err != nil && !isHTTPCloseError(err) {
klog.Errorf(utils.FormatProxyServer("error proxying data from frontend to proxy channel: %s", err.Error()))
klog.Errorf("error proxying data from frontend to proxy channel: %s", err.Error())
}
close(writerComplete)
}()

go func() {
_, err = io.Copy(frontend, conn)
if err != nil && !isHTTPCloseError(err) {
klog.Errorf(utils.FormatProxyServer("error proxying data from proxy channel to frontend: %s", err.Error()))
klog.Errorf("error proxying data from proxy channel to frontend: %s", err.Error())
}
close(readerComplete)
}()
Expand All @@ -276,7 +276,7 @@ func serveUpgradeRequest(conn net.Conn, w http.ResponseWriter, r *http.Request)

func logAndHTTPError(w http.ResponseWriter, errCode int, format string, i ...interface{}) {
errMsg := fmt.Sprintf(format, i...)
klog.Error(utils.FormatProxyServer(errMsg))
klog.Error(errMsg)
http.Error(w, errMsg, errCode)
}

Expand Down
14 changes: 7 additions & 7 deletions pkg/proxyengine/proxyserver/manageheader.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,21 +67,21 @@ func NewHeaderManager(client client.Client, gatewayName string, isIPv4 bool) Wra
func (h *headerManger) Handler(handler http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r == nil {
klog.Errorf(utils.FormatProxyServer("request is nil, skip it"))
klog.Errorf("request is nil, skip it")
return
}
oldHost := r.Host
var host, ip, port string
var err error
if isAPIServerRequest(r) {
klog.Info(utils.FormatProxyServer("request from apiserver with host %s and url %s is processed by header manager", oldHost, r.URL.String()))
klog.Infof("request from apiserver with host %s and url %s is processed by header manager", oldHost, r.URL.String())
host, ip, port, err = h.getAPIServerRequestDestAddress(r)
if err != nil {
logAndHTTPError(w, http.StatusBadRequest, "request host %s and url %s is invalid, %s", r.Host, r.URL.String(), err.Error())
return
}
} else {
klog.Info(utils.FormatProxyServer("normal request with host %s and url %s is processed by header manager", oldHost, r.URL.String()))
klog.Infof("normal request with host %s and url %s is processed by header manager", oldHost, r.URL.String())
host, ip, port, err = h.getNormalRequestDestAddress(r)
if err != nil {
logAndHTTPError(w, http.StatusBadRequest, "request host %s and url %s is invalid, %s", r.Host, r.URL.String(), err.Error())
Expand Down Expand Up @@ -117,11 +117,11 @@ func (h *headerManger) Handler(handler http.Handler) http.Handler {
metrics.Metrics.IncInFlightRequests(r.Method, r.URL.Path)
defer metrics.Metrics.DecInFlightRequests(r.Method, r.URL.Path)

klog.Infoln(utils.FormatProxyServer("start handling request %s %s, req.Host changed from %s to %s, remote address is %s",
r.Method, r.URL.String(), oldHost, r.Host, r.RemoteAddr))
klog.Infof("start handling request %s %s, req.Host changed from %s to %s, remote address is %s",
r.Method, r.URL.String(), oldHost, r.Host, r.RemoteAddr)
start := time.Now()
handler.ServeHTTP(w, r)
klog.Infoln(utils.FormatProxyServer("finish handle request %s %s, handle lasts %v", r.Method, r.URL.String(), time.Since(start)))
klog.Infof("finish handle request %s %s, handle lasts %v", r.Method, r.URL.String(), time.Since(start))
})
}

Expand Down Expand Up @@ -170,7 +170,7 @@ func (h *headerManger) getNormalRequestDestAddress(r *http.Request) (name, ip, p
}
ipAddress := net.ParseIP(nodeName)
if ipAddress != nil {
klog.Warning(utils.FormatProxyServer("raven proxy server not support dest address %s and request.URL is %s", ipAddress, r.URL.String()))
klog.Warningf("raven proxy server not support dest address %s and request.URL is %s", ipAddress, r.URL.String())
return "", "", "", nil
}
var node v1.Node
Expand Down
14 changes: 7 additions & 7 deletions pkg/proxyengine/proxyserver/proxyserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,11 +149,11 @@ func (c *ProxyServer) Start(ctx context.Context) error {
if serverCertMgr.Current() != nil && proxyCertMgr.Current() != nil {
return true, nil
}
klog.Infof(utils.FormatProxyServer("certificate %s and %s not signed, waiting...", serverCertCfg.ComponentName, proxyCertCfg.ComponentName))
klog.Infof("certificate %s and %s not signed, waiting...", serverCertCfg.ComponentName, proxyCertCfg.ComponentName)
return false, nil
}, ctx.Done())

klog.Infof(utils.FormatProxyServer("certificate %s and %s ok", serverCertCfg.ComponentName, proxyCertCfg.ComponentName))
klog.Infof("certificate %s and %s ok", serverCertCfg.ComponentName, proxyCertCfg.ComponentName)
c.serverTLSConfig, err = certmanager.GenTLSConfigUseCurrentCertAndCertPool(serverCertMgr.Current, c.rootCert, "server")
if err != nil {
return err
Expand All @@ -171,7 +171,7 @@ func (c *ProxyServer) Start(ctx context.Context) error {
}

func (c *ProxyServer) runServers(ctx context.Context) error {
klog.Info(utils.FormatProxyServer("start proxy server"))
klog.Info("start proxy server")
strategy := []anpserver.ProxyStrategy{anpserver.ProxyStrategyDestHost}
proxyServer := anpserver.NewProxyServer(c.nodeName, strategy, 1, &anpserver.AgentTokenAuthenticationOptions{})
NewProxies(&anpserver.Tunnel{Server: proxyServer}, c.interceptorUDSFile).Run(ctx)
Expand All @@ -192,8 +192,8 @@ func (c *ProxyServer) getProxyServerIPsAndDNSName() (dnsName []string, ipAddr []
var svc v1.Service
err := c.client.Get(context.TODO(), types.NamespacedName{Namespace: utils.WorkingNamespace, Name: utils.GatewayProxyInternalService}, &svc)
if err != nil {
klog.Errorf(utils.FormatProxyServer("failed to get internal service %s/%s to get proxy server IPs and DNSNames, error %s",
svc.GetNamespace(), svc.GetName(), err.Error()))
klog.Errorf("failed to get internal service %s/%s to get proxy server IPs and DNSNames, error %s",
svc.GetNamespace(), svc.GetName(), err.Error())
return
}
dnsName = append(dnsName, getDefaultDomainsForSvc(svc.GetNamespace(), svc.GetName())...)
Expand All @@ -209,8 +209,8 @@ func (c *ProxyServer) getProxyServerIPsAndDNSName() (dnsName []string, ipAddr []
}.AsSelector(),
})
if err != nil {
klog.Errorf(utils.FormatProxyServer("failed to get public serivce for gateway %s, node %s to get proxy server IPs and DNSNames, error %s",
c.gateway.GetName(), c.nodeName, err.Error()))
klog.Errorf("failed to get public serivce for gateway %s, node %s to get proxy server IPs and DNSNames, error %s",
c.gateway.GetName(), c.nodeName, err.Error())
return
}

Expand Down
Loading

0 comments on commit a1d1502

Please sign in to comment.