Fix dependencies

This commit is contained in:
TwinProduction
2020-12-25 03:00:08 -05:00
parent 10ab9265d9
commit 416178fb28
1449 changed files with 7770 additions and 474390 deletions

View File

@ -94,10 +94,6 @@ type RESTClient struct {
// overridden.
rateLimiter flowcontrol.RateLimiter
// warningHandler is shared among all requests created by this client.
// If not set, defaultWarningHandler is used.
warningHandler WarningHandler
// Set specific behavior of the client. If not set http.DefaultClient will be used.
Client *http.Client
}

View File

@ -23,7 +23,6 @@ import (
"io/ioutil"
"net"
"net/http"
"net/url"
"os"
"path/filepath"
gruntime "runtime"
@ -38,7 +37,7 @@ import (
"k8s.io/client-go/transport"
certutil "k8s.io/client-go/util/cert"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/klog/v2"
"k8s.io/klog"
)
const (
@ -123,23 +122,12 @@ type Config struct {
// Rate limiter for limiting connections to the master from this client. If present overwrites QPS/Burst
RateLimiter flowcontrol.RateLimiter
// WarningHandler handles warnings in server responses.
// If not set, the default warning handler is used.
WarningHandler WarningHandler
// The maximum length of time to wait before giving up on a server request. A value of zero means no timeout.
Timeout time.Duration
// Dial specifies the dial function for creating unencrypted TCP connections.
Dial func(ctx context.Context, network, address string) (net.Conn, error)
// Proxy is the the proxy func to be used for all requests made by this
// transport. If Proxy is nil, http.ProxyFromEnvironment is used. If Proxy
// returns a nil *URL, no proxy is used.
//
// socks5 proxying does not currently support spdy streaming endpoints.
Proxy func(*http.Request) (*url.URL, error)
// Version forces a specific version to be used (if registered)
// Do we need this?
// Version string
@ -343,11 +331,7 @@ func RESTClientFor(config *Config) (*RESTClient, error) {
Negotiator: runtime.NewClientNegotiator(config.NegotiatedSerializer, gv),
}
restClient, err := NewRESTClient(baseURL, versionedAPIPath, clientContent, rateLimiter, httpClient)
if err == nil && config.WarningHandler != nil {
restClient.warningHandler = config.WarningHandler
}
return restClient, err
return NewRESTClient(baseURL, versionedAPIPath, clientContent, rateLimiter, httpClient)
}
// UnversionedRESTClientFor is the same as RESTClientFor, except that it allows
@ -401,11 +385,7 @@ func UnversionedRESTClientFor(config *Config) (*RESTClient, error) {
Negotiator: runtime.NewClientNegotiator(config.NegotiatedSerializer, gv),
}
restClient, err := NewRESTClient(baseURL, versionedAPIPath, clientContent, rateLimiter, httpClient)
if err == nil && config.WarningHandler != nil {
restClient.warningHandler = config.WarningHandler
}
return restClient, err
return NewRESTClient(baseURL, versionedAPIPath, clientContent, rateLimiter, httpClient)
}
// SetKubernetesDefaults sets default values on the provided client config for accessing the
@ -574,14 +554,12 @@ func AnonymousClientConfig(config *Config) *Config {
NextProtos: config.TLSClientConfig.NextProtos,
},
RateLimiter: config.RateLimiter,
WarningHandler: config.WarningHandler,
UserAgent: config.UserAgent,
DisableCompression: config.DisableCompression,
QPS: config.QPS,
Burst: config.Burst,
Timeout: config.Timeout,
Dial: config.Dial,
Proxy: config.Proxy,
}
}
@ -621,9 +599,7 @@ func CopyConfig(config *Config) *Config {
QPS: config.QPS,
Burst: config.Burst,
RateLimiter: config.RateLimiter,
WarningHandler: config.WarningHandler,
Timeout: config.Timeout,
Dial: config.Dial,
Proxy: config.Proxy,
}
}

View File

@ -21,7 +21,7 @@ import (
"net/http"
"sync"
"k8s.io/klog/v2"
"k8s.io/klog"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
)

View File

@ -45,7 +45,7 @@ import (
restclientwatch "k8s.io/client-go/rest/watch"
"k8s.io/client-go/tools/metrics"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/klog/v2"
"k8s.io/klog"
)
var (
@ -88,12 +88,9 @@ var noBackoff = &NoBackoff{}
type Request struct {
c *RESTClient
warningHandler WarningHandler
rateLimiter flowcontrol.RateLimiter
backoff BackoffManager
timeout time.Duration
maxRetries int
// generic components accessible via method setters
verb string
@ -137,13 +134,11 @@ func NewRequest(c *RESTClient) *Request {
}
r := &Request{
c: c,
rateLimiter: c.rateLimiter,
backoff: backoff,
timeout: timeout,
pathPrefix: pathPrefix,
maxRetries: 10,
warningHandler: c.warningHandler,
c: c,
rateLimiter: c.rateLimiter,
backoff: backoff,
timeout: timeout,
pathPrefix: pathPrefix,
}
switch {
@ -221,13 +216,6 @@ func (r *Request) BackOff(manager BackoffManager) *Request {
return r
}
// WarningHandler sets the handler this client uses when warning headers are encountered.
// If set to nil, this client will use the default warning handler (see SetDefaultWarningHandler).
func (r *Request) WarningHandler(handler WarningHandler) *Request {
r.warningHandler = handler
return r
}
// Throttle receives a rate-limiter and sets or replaces an existing request limiter
func (r *Request) Throttle(limiter flowcontrol.RateLimiter) *Request {
r.rateLimiter = limiter
@ -403,18 +391,6 @@ func (r *Request) Timeout(d time.Duration) *Request {
return r
}
// MaxRetries makes the request use the given integer as a ceiling of retrying upon receiving
// "Retry-After" headers and 429 status-code in the response. The default is 10 unless this
// function is specifically called with a different value.
// A zero maxRetries prevent it from doing retires and return an error immediately.
func (r *Request) MaxRetries(maxRetries int) *Request {
if maxRetries < 0 {
maxRetries = 0
}
r.maxRetries = maxRetries
return r
}
// Body makes the request use obj as the body. Optional.
// If obj is a string, try to read a file of that name.
// If obj is a []byte, send it directly.
@ -618,7 +594,7 @@ var globalThrottledLogger = &throttledLogger{
func (b *throttledLogger) attemptToLog() (klog.Level, bool) {
for _, setting := range b.settings {
if bool(klog.V(setting.logLevel).Enabled()) {
if bool(klog.V(setting.logLevel)) {
// Return early without write locking if possible.
if func() bool {
setting.lock.RLock()
@ -702,8 +678,6 @@ func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
return nil, err
}
handleWarnings(resp.Header, r.warningHandler)
frameReader := framer.NewFrameReader(resp.Body)
watchEventDecoder := streaming.NewDecoder(frameReader, streamingSerializer)
@ -776,7 +750,6 @@ func (r *Request) Stream(ctx context.Context) (io.ReadCloser, error) {
switch {
case (resp.StatusCode >= 200) && (resp.StatusCode < 300):
handleWarnings(resp.Header, r.warningHandler)
return resp.Body, nil
default:
@ -858,6 +831,7 @@ func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Resp
}
// Right now we make about ten retry attempts if we get a Retry-After response.
maxRetries := 10
retries := 0
for {
@ -920,7 +894,7 @@ func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Resp
}()
retries++
if seconds, wait := checkWait(resp); wait && retries <= r.maxRetries {
if seconds, wait := checkWait(resp); wait && retries < maxRetries {
if seeker, ok := r.body.(io.Seeker); ok && r.body != nil {
_, err := seeker.Seek(0, 0)
if err != nil {
@ -1033,7 +1007,6 @@ func (r *Request) transformResponse(resp *http.Response, req *http.Request) Resu
body: body,
contentType: contentType,
statusCode: resp.StatusCode,
warnings: handleWarnings(resp.Header, r.warningHandler),
}
}
}
@ -1052,7 +1025,6 @@ func (r *Request) transformResponse(resp *http.Response, req *http.Request) Resu
statusCode: resp.StatusCode,
decoder: decoder,
err: err,
warnings: handleWarnings(resp.Header, r.warningHandler),
}
}
@ -1061,7 +1033,6 @@ func (r *Request) transformResponse(resp *http.Response, req *http.Request) Resu
contentType: contentType,
statusCode: resp.StatusCode,
decoder: decoder,
warnings: handleWarnings(resp.Header, r.warningHandler),
}
}
@ -1069,11 +1040,11 @@ func (r *Request) transformResponse(resp *http.Response, req *http.Request) Resu
func truncateBody(body string) string {
max := 0
switch {
case bool(klog.V(10).Enabled()):
case bool(klog.V(10)):
return body
case bool(klog.V(9).Enabled()):
case bool(klog.V(9)):
max = 10240
case bool(klog.V(8).Enabled()):
case bool(klog.V(8)):
max = 1024
}
@ -1088,7 +1059,7 @@ func truncateBody(body string) string {
// allocating a new string for the body output unless necessary. Uses a simple heuristic to determine
// whether the body is printable.
func glogBody(prefix string, body []byte) {
if klog.V(8).Enabled() {
if klog.V(8) {
if bytes.IndexFunc(body, func(r rune) bool {
return r < 0x0a
}) != -1 {
@ -1197,7 +1168,6 @@ func retryAfterSeconds(resp *http.Response) (int, bool) {
// Result contains the result of calling Request.Do().
type Result struct {
body []byte
warnings []net.WarningHeader
contentType string
err error
statusCode int
@ -1311,11 +1281,6 @@ func (r Result) Error() error {
return r.err
}
// Warnings returns any warning headers received in the response
func (r Result) Warnings() []net.WarningHeader {
return r.warnings
}
// NameMayNotBe specifies strings that cannot be used as names specified as path segments (like the REST API or etcd store)
var NameMayNotBe = []string{".", ".."}

View File

@ -85,8 +85,7 @@ func (c *Config) TransportConfig() (*transport.Config, error) {
Groups: c.Impersonate.Groups,
Extra: c.Impersonate.Extra,
},
Dial: c.Dial,
Proxy: c.Proxy,
Dial: c.Dial,
}
if c.ExecProvider != nil && c.AuthProvider != nil {

View File

@ -22,7 +22,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/klog/v2"
"k8s.io/klog"
)
// Set of resp. Codes that we backoff for.

View File

@ -1,144 +0,0 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package rest
import (
"fmt"
"io"
"net/http"
"sync"
"k8s.io/klog/v2"
"k8s.io/apimachinery/pkg/util/net"
)
// WarningHandler is an interface for handling warning headers
type WarningHandler interface {
// HandleWarningHeader is called with the warn code, agent, and text when a warning header is countered.
HandleWarningHeader(code int, agent string, text string)
}
var (
defaultWarningHandler WarningHandler = WarningLogger{}
defaultWarningHandlerLock sync.RWMutex
)
// SetDefaultWarningHandler sets the default handler client uses when warning headers are encountered.
// By default, warnings are printed to stderr.
func SetDefaultWarningHandler(l WarningHandler) {
defaultWarningHandlerLock.Lock()
defer defaultWarningHandlerLock.Unlock()
defaultWarningHandler = l
}
func getDefaultWarningHandler() WarningHandler {
defaultWarningHandlerLock.RLock()
defer defaultWarningHandlerLock.RUnlock()
l := defaultWarningHandler
return l
}
// NoWarnings is an implementation of WarningHandler that suppresses warnings.
type NoWarnings struct{}
func (NoWarnings) HandleWarningHeader(code int, agent string, message string) {}
// WarningLogger is an implementation of WarningHandler that logs code 299 warnings
type WarningLogger struct{}
func (WarningLogger) HandleWarningHeader(code int, agent string, message string) {
if code != 299 || len(message) == 0 {
return
}
klog.Warning(message)
}
type warningWriter struct {
// out is the writer to output warnings to
out io.Writer
// opts contains options controlling warning output
opts WarningWriterOptions
// writtenLock guards written and writtenCount
writtenLock sync.Mutex
writtenCount int
written map[string]struct{}
}
// WarningWriterOptions controls the behavior of a WarningHandler constructed using NewWarningWriter()
type WarningWriterOptions struct {
// Deduplicate indicates a given warning message should only be written once.
// Setting this to true in a long-running process handling many warnings can result in increased memory use.
Deduplicate bool
// Color indicates that warning output can include ANSI color codes
Color bool
}
// NewWarningWriter returns an implementation of WarningHandler that outputs code 299 warnings to the specified writer.
func NewWarningWriter(out io.Writer, opts WarningWriterOptions) *warningWriter {
h := &warningWriter{out: out, opts: opts}
if opts.Deduplicate {
h.written = map[string]struct{}{}
}
return h
}
const (
yellowColor = "\u001b[33;1m"
resetColor = "\u001b[0m"
)
// HandleWarningHeader prints warnings with code=299 to the configured writer.
func (w *warningWriter) HandleWarningHeader(code int, agent string, message string) {
if code != 299 || len(message) == 0 {
return
}
w.writtenLock.Lock()
defer w.writtenLock.Unlock()
if w.opts.Deduplicate {
if _, alreadyWritten := w.written[message]; alreadyWritten {
return
}
w.written[message] = struct{}{}
}
w.writtenCount++
if w.opts.Color {
fmt.Fprintf(w.out, "%sWarning:%s %s\n", yellowColor, resetColor, message)
} else {
fmt.Fprintf(w.out, "Warning: %s\n", message)
}
}
func (w *warningWriter) WarningCount() int {
w.writtenLock.Lock()
defer w.writtenLock.Unlock()
return w.writtenCount
}
func handleWarnings(headers http.Header, handler WarningHandler) []net.WarningHeader {
if handler == nil {
handler = getDefaultWarningHandler()
}
warnings, _ := net.ParseWarningHeaders(headers["Warning"])
for _, warning := range warnings {
handler.HandleWarningHeader(warning.Code, warning.Agent, warning.Text)
}
return warnings
}