Work on #61: Add support for ICMP

+ Update dependencies
This commit is contained in:
TwinProduction
2020-12-25 00:07:18 -05:00
parent c86173d46f
commit 83a5813daf
1004 changed files with 182274 additions and 64323 deletions

View File

@ -20,7 +20,6 @@ reviewers:
- resouer
- cjcullen
- rmmh
- lixiaobing10051267
- asalkeld
- juanvallejo
- lojies

View File

@ -17,8 +17,6 @@ limitations under the License.
package rest
import (
"fmt"
"mime"
"net/http"
"net/url"
"os"
@ -51,6 +49,28 @@ type Interface interface {
APIVersion() schema.GroupVersion
}
// ClientContentConfig controls how RESTClient communicates with the server.
//
// TODO: ContentConfig will be updated to accept a Negotiator instead of a
// NegotiatedSerializer and NegotiatedSerializer will be removed.
type ClientContentConfig struct {
// AcceptContentTypes specifies the types the client will accept and is optional.
// If not set, ContentType will be used to define the Accept header
AcceptContentTypes string
// ContentType specifies the wire format used to communicate with the server.
// This value will be set as the Accept header on requests made to the server if
// AcceptContentTypes is not set, and as the default content type on any object
// sent to the server. If not set, "application/json" is used.
ContentType string
// GroupVersion is the API version to talk to. Must be provided when initializing
// a RESTClient directly. When initializing a Client, will be set with the default
// code version. This is used as the default group version for VersionedParams.
GroupVersion schema.GroupVersion
// Negotiator is used for obtaining encoders and decoders for multiple
// supported media types.
Negotiator runtime.ClientNegotiator
}
// RESTClient imposes common Kubernetes API conventions on a set of resource paths.
// The baseURL is expected to point to an HTTP or HTTPS path that is the parent
// of one or more resources. The server should return a decodable API resource
@ -64,34 +84,31 @@ type RESTClient struct {
// versionedAPIPath is a path segment connecting the base URL to the resource root
versionedAPIPath string
// contentConfig is the information used to communicate with the server.
contentConfig ContentConfig
// serializers contain all serializers for underlying content type.
serializers Serializers
// content describes how a RESTClient encodes and decodes responses.
content ClientContentConfig
// creates BackoffManager that is passed to requests.
createBackoffMgr func() BackoffManager
// TODO extract this into a wrapper interface via the RESTClient interface in kubectl.
Throttle flowcontrol.RateLimiter
// rateLimiter is shared among all requests created by this client unless specifically
// overridden.
rateLimiter flowcontrol.RateLimiter
// warningHandler is shared among all requests created by this client.
// If not set, defaultWarningHandler is used.
warningHandler WarningHandler
// Set specific behavior of the client. If not set http.DefaultClient will be used.
Client *http.Client
}
type Serializers struct {
Encoder runtime.Encoder
Decoder runtime.Decoder
StreamingSerializer runtime.Serializer
Framer runtime.Framer
RenegotiatedDecoder func(contentType string, params map[string]string) (runtime.Decoder, error)
}
// NewRESTClient creates a new RESTClient. This client performs generic REST functions
// such as Get, Put, Post, and Delete on specified paths. Codec controls encoding and
// decoding of responses from the server.
func NewRESTClient(baseURL *url.URL, versionedAPIPath string, config ContentConfig, maxQPS float32, maxBurst int, rateLimiter flowcontrol.RateLimiter, client *http.Client) (*RESTClient, error) {
// such as Get, Put, Post, and Delete on specified paths.
func NewRESTClient(baseURL *url.URL, versionedAPIPath string, config ClientContentConfig, rateLimiter flowcontrol.RateLimiter, client *http.Client) (*RESTClient, error) {
if len(config.ContentType) == 0 {
config.ContentType = "application/json"
}
base := *baseURL
if !strings.HasSuffix(base.Path, "/") {
base.Path += "/"
@ -99,31 +116,14 @@ func NewRESTClient(baseURL *url.URL, versionedAPIPath string, config ContentConf
base.RawQuery = ""
base.Fragment = ""
if config.GroupVersion == nil {
config.GroupVersion = &schema.GroupVersion{}
}
if len(config.ContentType) == 0 {
config.ContentType = "application/json"
}
serializers, err := createSerializers(config)
if err != nil {
return nil, err
}
var throttle flowcontrol.RateLimiter
if maxQPS > 0 && rateLimiter == nil {
throttle = flowcontrol.NewTokenBucketRateLimiter(maxQPS, maxBurst)
} else if rateLimiter != nil {
throttle = rateLimiter
}
return &RESTClient{
base: &base,
versionedAPIPath: versionedAPIPath,
contentConfig: config,
serializers: *serializers,
content: config,
createBackoffMgr: readExpBackoffConfig,
Throttle: throttle,
Client: client,
rateLimiter: rateLimiter,
Client: client,
}, nil
}
@ -132,7 +132,7 @@ func (c *RESTClient) GetRateLimiter() flowcontrol.RateLimiter {
if c == nil {
return nil
}
return c.Throttle
return c.rateLimiter
}
// readExpBackoffConfig handles the internal logic of determining what the
@ -153,58 +153,6 @@ func readExpBackoffConfig() BackoffManager {
time.Duration(backoffDurationInt)*time.Second)}
}
// createSerializers creates all necessary serializers for given contentType.
// TODO: the negotiated serializer passed to this method should probably return
// serializers that control decoding and versioning without this package
// being aware of the types. Depends on whether RESTClient must deal with
// generic infrastructure.
func createSerializers(config ContentConfig) (*Serializers, error) {
mediaTypes := config.NegotiatedSerializer.SupportedMediaTypes()
contentType := config.ContentType
mediaType, _, err := mime.ParseMediaType(contentType)
if err != nil {
return nil, fmt.Errorf("the content type specified in the client configuration is not recognized: %v", err)
}
info, ok := runtime.SerializerInfoForMediaType(mediaTypes, mediaType)
if !ok {
if len(contentType) != 0 || len(mediaTypes) == 0 {
return nil, fmt.Errorf("no serializers registered for %s", contentType)
}
info = mediaTypes[0]
}
internalGV := schema.GroupVersions{
{
Group: config.GroupVersion.Group,
Version: runtime.APIVersionInternal,
},
// always include the legacy group as a decoding target to handle non-error `Status` return types
{
Group: "",
Version: runtime.APIVersionInternal,
},
}
s := &Serializers{
Encoder: config.NegotiatedSerializer.EncoderForVersion(info.Serializer, *config.GroupVersion),
Decoder: config.NegotiatedSerializer.DecoderToVersion(info.Serializer, internalGV),
RenegotiatedDecoder: func(contentType string, params map[string]string) (runtime.Decoder, error) {
info, ok := runtime.SerializerInfoForMediaType(mediaTypes, contentType)
if !ok {
return nil, fmt.Errorf("serializer for %s not registered", contentType)
}
return config.NegotiatedSerializer.DecoderToVersion(info.Serializer, internalGV), nil
},
}
if info.StreamSerializer != nil {
s.StreamingSerializer = info.StreamSerializer.Serializer
s.Framer = info.StreamSerializer.Framer
}
return s, nil
}
// Verb begins a request with a verb (GET, POST, PUT, DELETE).
//
// Example usage of RESTClient's request building interface:
@ -219,12 +167,7 @@ func createSerializers(config ContentConfig) (*Serializers, error) {
// list, ok := resp.(*api.PodList)
//
func (c *RESTClient) Verb(verb string) *Request {
backoff := c.createBackoffMgr()
if c.Client == nil {
return NewRequest(nil, verb, c.base, c.versionedAPIPath, c.contentConfig, c.serializers, backoff, c.Throttle, 0)
}
return NewRequest(c.Client, verb, c.base, c.versionedAPIPath, c.contentConfig, c.serializers, backoff, c.Throttle, c.Client.Timeout)
return NewRequest(c).Verb(verb)
}
// Post begins a POST request. Short for c.Verb("POST").
@ -254,5 +197,5 @@ func (c *RESTClient) Delete() *Request {
// APIVersion returns the APIVersion this RESTClient is expected to use.
func (c *RESTClient) APIVersion() schema.GroupVersion {
return *c.contentConfig.GroupVersion
return c.content.GroupVersion
}

View File

@ -23,6 +23,7 @@ import (
"io/ioutil"
"net"
"net/http"
"net/url"
"os"
"path/filepath"
gruntime "runtime"
@ -37,7 +38,7 @@ import (
"k8s.io/client-go/transport"
certutil "k8s.io/client-go/util/cert"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/klog"
"k8s.io/klog/v2"
)
const (
@ -94,6 +95,10 @@ type Config struct {
// UserAgent is an optional field that specifies the caller of this request.
UserAgent string
// DisableCompression bypasses automatic GZip compression requests to the
// server.
DisableCompression bool
// Transport may be used for custom HTTP behavior. This attribute may not
// be specified with the TLS client certificate options. Use WrapTransport
// to provide additional per-server middleware behavior.
@ -118,12 +123,23 @@ type Config struct {
// Rate limiter for limiting connections to the master from this client. If present overwrites QPS/Burst
RateLimiter flowcontrol.RateLimiter
// WarningHandler handles warnings in server responses.
// If not set, the default warning handler is used.
WarningHandler WarningHandler
// The maximum length of time to wait before giving up on a server request. A value of zero means no timeout.
Timeout time.Duration
// Dial specifies the dial function for creating unencrypted TCP connections.
Dial func(ctx context.Context, network, address string) (net.Conn, error)
// Proxy is the the proxy func to be used for all requests made by this
// transport. If Proxy is nil, http.ProxyFromEnvironment is used. If Proxy
// returns a nil *URL, no proxy is used.
//
// socks5 proxying does not currently support spdy streaming endpoints.
Proxy func(*http.Request) (*url.URL, error)
// Version forces a specific version to be used (if registered)
// Do we need this?
// Version string
@ -207,6 +223,12 @@ type TLSClientConfig struct {
// CAData holds PEM-encoded bytes (typically read from a root certificates bundle).
// CAData takes precedence over CAFile
CAData []byte
// NextProtos is a list of supported application level protocols, in order of preference.
// Used to populate tls.Config.NextProtos.
// To indicate to the server http/1.1 is preferred over http/2, set to ["http/1.1", "h2"] (though the server is free to ignore that preference).
// To use only http/1.1, set to ["http/1.1"].
NextProtos []string
}
var _ fmt.Stringer = TLSClientConfig{}
@ -232,6 +254,7 @@ func (c TLSClientConfig) String() string {
CertData: c.CertData,
KeyData: c.KeyData,
CAData: c.CAData,
NextProtos: c.NextProtos,
}
// Explicitly mark non-empty credential fields as redacted.
if len(cc.CertData) != 0 {
@ -258,6 +281,9 @@ type ContentConfig struct {
GroupVersion *schema.GroupVersion
// NegotiatedSerializer is used for obtaining encoders and decoders for multiple
// supported media types.
//
// TODO: NegotiatedSerializer will be phased out as internal clients are removed
// from Kubernetes.
NegotiatedSerializer runtime.NegotiatedSerializer
}
@ -272,14 +298,6 @@ func RESTClientFor(config *Config) (*RESTClient, error) {
if config.NegotiatedSerializer == nil {
return nil, fmt.Errorf("NegotiatedSerializer is required when initializing a RESTClient")
}
qps := config.QPS
if config.QPS == 0.0 {
qps = DefaultQPS
}
burst := config.Burst
if config.Burst == 0 {
burst = DefaultBurst
}
baseURL, versionedAPIPath, err := defaultServerUrlFor(config)
if err != nil {
@ -299,7 +317,37 @@ func RESTClientFor(config *Config) (*RESTClient, error) {
}
}
return NewRESTClient(baseURL, versionedAPIPath, config.ContentConfig, qps, burst, config.RateLimiter, httpClient)
rateLimiter := config.RateLimiter
if rateLimiter == nil {
qps := config.QPS
if config.QPS == 0.0 {
qps = DefaultQPS
}
burst := config.Burst
if config.Burst == 0 {
burst = DefaultBurst
}
if qps > 0 {
rateLimiter = flowcontrol.NewTokenBucketRateLimiter(qps, burst)
}
}
var gv schema.GroupVersion
if config.GroupVersion != nil {
gv = *config.GroupVersion
}
clientContent := ClientContentConfig{
AcceptContentTypes: config.AcceptContentTypes,
ContentType: config.ContentType,
GroupVersion: gv,
Negotiator: runtime.NewClientNegotiator(config.NegotiatedSerializer, gv),
}
restClient, err := NewRESTClient(baseURL, versionedAPIPath, clientContent, rateLimiter, httpClient)
if err == nil && config.WarningHandler != nil {
restClient.warningHandler = config.WarningHandler
}
return restClient, err
}
// UnversionedRESTClientFor is the same as RESTClientFor, except that it allows
@ -327,13 +375,37 @@ func UnversionedRESTClientFor(config *Config) (*RESTClient, error) {
}
}
versionConfig := config.ContentConfig
if versionConfig.GroupVersion == nil {
v := metav1.SchemeGroupVersion
versionConfig.GroupVersion = &v
rateLimiter := config.RateLimiter
if rateLimiter == nil {
qps := config.QPS
if config.QPS == 0.0 {
qps = DefaultQPS
}
burst := config.Burst
if config.Burst == 0 {
burst = DefaultBurst
}
if qps > 0 {
rateLimiter = flowcontrol.NewTokenBucketRateLimiter(qps, burst)
}
}
return NewRESTClient(baseURL, versionedAPIPath, versionConfig, config.QPS, config.Burst, config.RateLimiter, httpClient)
gv := metav1.SchemeGroupVersion
if config.GroupVersion != nil {
gv = *config.GroupVersion
}
clientContent := ClientContentConfig{
AcceptContentTypes: config.AcceptContentTypes,
ContentType: config.ContentType,
GroupVersion: gv,
Negotiator: runtime.NewClientNegotiator(config.NegotiatedSerializer, gv),
}
restClient, err := NewRESTClient(baseURL, versionedAPIPath, clientContent, rateLimiter, httpClient)
if err == nil && config.WarningHandler != nil {
restClient.warningHandler = config.WarningHandler
}
return restClient, err
}
// SetKubernetesDefaults sets default values on the provided client config for accessing the
@ -487,7 +559,7 @@ func AddUserAgent(config *Config, userAgent string) *Config {
return config
}
// AnonymousClientConfig returns a copy of the given config with all user credentials (cert/key, bearer token, and username/password) removed
// AnonymousClientConfig returns a copy of the given config with all user credentials (cert/key, bearer token, and username/password) and custom transports (WrapTransport, Transport) removed
func AnonymousClientConfig(config *Config) *Config {
// copy only known safe fields
return &Config{
@ -499,15 +571,17 @@ func AnonymousClientConfig(config *Config) *Config {
ServerName: config.ServerName,
CAFile: config.TLSClientConfig.CAFile,
CAData: config.TLSClientConfig.CAData,
NextProtos: config.TLSClientConfig.NextProtos,
},
RateLimiter: config.RateLimiter,
UserAgent: config.UserAgent,
Transport: config.Transport,
WrapTransport: config.WrapTransport,
QPS: config.QPS,
Burst: config.Burst,
Timeout: config.Timeout,
Dial: config.Dial,
RateLimiter: config.RateLimiter,
WarningHandler: config.WarningHandler,
UserAgent: config.UserAgent,
DisableCompression: config.DisableCompression,
QPS: config.QPS,
Burst: config.Burst,
Timeout: config.Timeout,
Dial: config.Dial,
Proxy: config.Proxy,
}
}
@ -538,14 +612,18 @@ func CopyConfig(config *Config) *Config {
CertData: config.TLSClientConfig.CertData,
KeyData: config.TLSClientConfig.KeyData,
CAData: config.TLSClientConfig.CAData,
NextProtos: config.TLSClientConfig.NextProtos,
},
UserAgent: config.UserAgent,
Transport: config.Transport,
WrapTransport: config.WrapTransport,
QPS: config.QPS,
Burst: config.Burst,
RateLimiter: config.RateLimiter,
Timeout: config.Timeout,
Dial: config.Dial,
UserAgent: config.UserAgent,
DisableCompression: config.DisableCompression,
Transport: config.Transport,
WrapTransport: config.WrapTransport,
QPS: config.QPS,
Burst: config.Burst,
RateLimiter: config.RateLimiter,
WarningHandler: config.WarningHandler,
Timeout: config.Timeout,
Dial: config.Dial,
Proxy: config.Proxy,
}
}

View File

@ -21,7 +21,7 @@ import (
"net/http"
"sync"
"k8s.io/klog"
"k8s.io/klog/v2"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
)
@ -55,7 +55,7 @@ func RegisterAuthProviderPlugin(name string, plugin Factory) error {
pluginsLock.Lock()
defer pluginsLock.Unlock()
if _, found := plugins[name]; found {
return fmt.Errorf("Auth Provider Plugin %q was registered twice", name)
return fmt.Errorf("auth Provider Plugin %q was registered twice", name)
}
klog.V(4).Infof("Registered Auth Provider Plugin %q", name)
plugins[name] = plugin
@ -67,7 +67,7 @@ func GetAuthProvider(clusterAddress string, apc *clientcmdapi.AuthProviderConfig
defer pluginsLock.Unlock()
p, ok := plugins[apc.Name]
if !ok {
return nil, fmt.Errorf("No Auth Provider found for name %q", apc.Name)
return nil, fmt.Errorf("no Auth Provider found for name %q", apc.Name)
}
return p(clusterAddress, apc.Config, persister)
}

View File

@ -30,6 +30,7 @@ import (
"reflect"
"strconv"
"strings"
"sync"
"time"
"golang.org/x/net/http2"
@ -38,18 +39,23 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer/streaming"
utilclock "k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/watch"
restclientwatch "k8s.io/client-go/rest/watch"
"k8s.io/client-go/tools/metrics"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/klog"
"k8s.io/klog/v2"
)
var (
// longThrottleLatency defines threshold for logging requests. All requests being
// throttle for more than longThrottleLatency will be logged.
// throttled (via the provided rateLimiter) for more than longThrottleLatency will
// be logged.
longThrottleLatency = 50 * time.Millisecond
// extraLongThrottleLatency defines the threshold for logging requests at log level 2.
extraLongThrottleLatency = 1 * time.Second
)
// HTTPClient is an interface for testing a request object.
@ -60,8 +66,8 @@ type HTTPClient interface {
// ResponseWrapper is an interface for getting a response.
// The response may be either accessed as a raw data (the whole output is put into memory) or as a stream.
type ResponseWrapper interface {
DoRaw() ([]byte, error)
Stream() (io.ReadCloser, error)
DoRaw(context.Context) ([]byte, error)
Stream(context.Context) (io.ReadCloser, error)
}
// RequestConstructionError is returned when there's an error assembling a request.
@ -74,19 +80,23 @@ func (r *RequestConstructionError) Error() string {
return fmt.Sprintf("request construction error: '%v'", r.Err)
}
var noBackoff = &NoBackoff{}
// Request allows for building up a request to a server in a chained fashion.
// Any errors are stored until the end of your call, so you only have to
// check once.
type Request struct {
// required
client HTTPClient
verb string
c *RESTClient
baseURL *url.URL
content ContentConfig
serializers Serializers
warningHandler WarningHandler
rateLimiter flowcontrol.RateLimiter
backoff BackoffManager
timeout time.Duration
maxRetries int
// generic components accessible via method setters
verb string
pathPrefix string
subpath string
params url.Values
@ -98,50 +108,69 @@ type Request struct {
resource string
resourceName string
subresource string
timeout time.Duration
// output
err error
body io.Reader
// This is only used for per-request timeouts, deadlines, and cancellations.
ctx context.Context
backoffMgr BackoffManager
throttle flowcontrol.RateLimiter
}
// NewRequest creates a new request helper object for accessing runtime.Objects on a server.
func NewRequest(client HTTPClient, verb string, baseURL *url.URL, versionedAPIPath string, content ContentConfig, serializers Serializers, backoff BackoffManager, throttle flowcontrol.RateLimiter, timeout time.Duration) *Request {
func NewRequest(c *RESTClient) *Request {
var backoff BackoffManager
if c.createBackoffMgr != nil {
backoff = c.createBackoffMgr()
}
if backoff == nil {
klog.V(2).Infof("Not implementing request backoff strategy.")
backoff = &NoBackoff{}
backoff = noBackoff
}
pathPrefix := "/"
if baseURL != nil {
pathPrefix = path.Join(pathPrefix, baseURL.Path)
var pathPrefix string
if c.base != nil {
pathPrefix = path.Join("/", c.base.Path, c.versionedAPIPath)
} else {
pathPrefix = path.Join("/", c.versionedAPIPath)
}
var timeout time.Duration
if c.Client != nil {
timeout = c.Client.Timeout
}
r := &Request{
client: client,
verb: verb,
baseURL: baseURL,
pathPrefix: path.Join(pathPrefix, versionedAPIPath),
content: content,
serializers: serializers,
backoffMgr: backoff,
throttle: throttle,
timeout: timeout,
c: c,
rateLimiter: c.rateLimiter,
backoff: backoff,
timeout: timeout,
pathPrefix: pathPrefix,
maxRetries: 10,
warningHandler: c.warningHandler,
}
switch {
case len(content.AcceptContentTypes) > 0:
r.SetHeader("Accept", content.AcceptContentTypes)
case len(content.ContentType) > 0:
r.SetHeader("Accept", content.ContentType+", */*")
case len(c.content.AcceptContentTypes) > 0:
r.SetHeader("Accept", c.content.AcceptContentTypes)
case len(c.content.ContentType) > 0:
r.SetHeader("Accept", c.content.ContentType+", */*")
}
return r
}
// NewRequestWithClient creates a Request with an embedded RESTClient for use in test scenarios.
func NewRequestWithClient(base *url.URL, versionedAPIPath string, content ClientContentConfig, client *http.Client) *Request {
return NewRequest(&RESTClient{
base: base,
versionedAPIPath: versionedAPIPath,
content: content,
Client: client,
})
}
// Verb sets the verb this request will use.
func (r *Request) Verb(verb string) *Request {
r.verb = verb
return r
}
// Prefix adds segments to the relative beginning to the request path. These
// items will be placed before the optional Namespace, Resource, or Name sections.
// Setting AbsPath will clear any previously set Prefix segments
@ -184,17 +213,24 @@ func (r *Request) Resource(resource string) *Request {
// or defaults to the stub implementation if nil is provided
func (r *Request) BackOff(manager BackoffManager) *Request {
if manager == nil {
r.backoffMgr = &NoBackoff{}
r.backoff = &NoBackoff{}
return r
}
r.backoffMgr = manager
r.backoff = manager
return r
}
// WarningHandler sets the handler this client uses when warning headers are encountered.
// If set to nil, this client will use the default warning handler (see SetDefaultWarningHandler).
func (r *Request) WarningHandler(handler WarningHandler) *Request {
r.warningHandler = handler
return r
}
// Throttle receives a rate-limiter and sets or replaces an existing request limiter
func (r *Request) Throttle(limiter flowcontrol.RateLimiter) *Request {
r.throttle = limiter
r.rateLimiter = limiter
return r
}
@ -272,8 +308,8 @@ func (r *Request) AbsPath(segments ...string) *Request {
if r.err != nil {
return r
}
r.pathPrefix = path.Join(r.baseURL.Path, path.Join(segments...))
if len(segments) == 1 && (len(r.baseURL.Path) > 1 || len(segments[0]) > 1) && strings.HasSuffix(segments[0], "/") {
r.pathPrefix = path.Join(r.c.base.Path, path.Join(segments...))
if len(segments) == 1 && (len(r.c.base.Path) > 1 || len(segments[0]) > 1) && strings.HasSuffix(segments[0], "/") {
// preserve any trailing slashes for legacy behavior
r.pathPrefix += "/"
}
@ -317,7 +353,7 @@ func (r *Request) Param(paramName, s string) *Request {
// VersionedParams will not write query parameters that have omitempty set and are empty. If a
// parameter has already been set it is appended to (Params and VersionedParams are additive).
func (r *Request) VersionedParams(obj runtime.Object, codec runtime.ParameterCodec) *Request {
return r.SpecificallyVersionedParams(obj, codec, *r.content.GroupVersion)
return r.SpecificallyVersionedParams(obj, codec, r.c.content.GroupVersion)
}
func (r *Request) SpecificallyVersionedParams(obj runtime.Object, codec runtime.ParameterCodec, version schema.GroupVersion) *Request {
@ -367,6 +403,18 @@ func (r *Request) Timeout(d time.Duration) *Request {
return r
}
// MaxRetries makes the request use the given integer as a ceiling of retrying upon receiving
// "Retry-After" headers and 429 status-code in the response. The default is 10 unless this
// function is specifically called with a different value.
// A zero maxRetries prevent it from doing retires and return an error immediately.
func (r *Request) MaxRetries(maxRetries int) *Request {
if maxRetries < 0 {
maxRetries = 0
}
r.maxRetries = maxRetries
return r
}
// Body makes the request use obj as the body. Optional.
// If obj is a string, try to read a file of that name.
// If obj is a []byte, send it directly.
@ -397,27 +445,25 @@ func (r *Request) Body(obj interface{}) *Request {
if reflect.ValueOf(t).IsNil() {
return r
}
data, err := runtime.Encode(r.serializers.Encoder, t)
encoder, err := r.c.content.Negotiator.Encoder(r.c.content.ContentType, nil)
if err != nil {
r.err = err
return r
}
data, err := runtime.Encode(encoder, t)
if err != nil {
r.err = err
return r
}
glogBody("Request Body", data)
r.body = bytes.NewReader(data)
r.SetHeader("Content-Type", r.content.ContentType)
r.SetHeader("Content-Type", r.c.content.ContentType)
default:
r.err = fmt.Errorf("unknown type used for body: %+v", obj)
}
return r
}
// Context adds a context to the request. Contexts are only used for
// timeouts, deadlines, and cancellations.
func (r *Request) Context(ctx context.Context) *Request {
r.ctx = ctx
return r
}
// URL returns the current working URL.
func (r *Request) URL() *url.URL {
p := r.pathPrefix
@ -433,8 +479,8 @@ func (r *Request) URL() *url.URL {
}
finalURL := &url.URL{}
if r.baseURL != nil {
*finalURL = *r.baseURL
if r.c.base != nil {
*finalURL = *r.c.base
}
finalURL.Path = p
@ -468,8 +514,8 @@ func (r Request) finalURLTemplate() url.URL {
segments := strings.Split(r.URL().Path, "/")
groupIndex := 0
index := 0
if r.URL() != nil && r.baseURL != nil && strings.Contains(r.URL().Path, r.baseURL.Path) {
groupIndex += len(strings.Split(r.baseURL.Path, "/"))
if r.URL() != nil && r.c.base != nil && strings.Contains(r.URL().Path, r.c.base.Path) {
groupIndex += len(strings.Split(r.c.base.Path, "/"))
}
if groupIndex >= len(segments) {
return *url
@ -521,68 +567,119 @@ func (r Request) finalURLTemplate() url.URL {
return *url
}
func (r *Request) tryThrottle() {
now := time.Now()
if r.throttle != nil {
r.throttle.Accept()
func (r *Request) tryThrottle(ctx context.Context) error {
if r.rateLimiter == nil {
return nil
}
if latency := time.Since(now); latency > longThrottleLatency {
klog.V(4).Infof("Throttling request took %v, request: %s:%s", latency, r.verb, r.URL().String())
now := time.Now()
err := r.rateLimiter.Wait(ctx)
latency := time.Since(now)
if latency > longThrottleLatency {
klog.V(3).Infof("Throttling request took %v, request: %s:%s", latency, r.verb, r.URL().String())
}
if latency > extraLongThrottleLatency {
// If the rate limiter latency is very high, the log message should be printed at a higher log level,
// but we use a throttled logger to prevent spamming.
globalThrottledLogger.Infof("Throttling request took %v, request: %s:%s", latency, r.verb, r.URL().String())
}
metrics.RateLimiterLatency.Observe(r.verb, r.finalURLTemplate(), latency)
return err
}
type throttleSettings struct {
logLevel klog.Level
minLogInterval time.Duration
lastLogTime time.Time
lock sync.RWMutex
}
type throttledLogger struct {
clock utilclock.PassiveClock
settings []*throttleSettings
}
var globalThrottledLogger = &throttledLogger{
clock: utilclock.RealClock{},
settings: []*throttleSettings{
{
logLevel: 2,
minLogInterval: 1 * time.Second,
}, {
logLevel: 0,
minLogInterval: 10 * time.Second,
},
},
}
func (b *throttledLogger) attemptToLog() (klog.Level, bool) {
for _, setting := range b.settings {
if bool(klog.V(setting.logLevel).Enabled()) {
// Return early without write locking if possible.
if func() bool {
setting.lock.RLock()
defer setting.lock.RUnlock()
return b.clock.Since(setting.lastLogTime) >= setting.minLogInterval
}() {
setting.lock.Lock()
defer setting.lock.Unlock()
if b.clock.Since(setting.lastLogTime) >= setting.minLogInterval {
setting.lastLogTime = b.clock.Now()
return setting.logLevel, true
}
}
return -1, false
}
}
return -1, false
}
// Infof will write a log message at each logLevel specified by the reciever's throttleSettings
// as long as it hasn't written a log message more recently than minLogInterval.
func (b *throttledLogger) Infof(message string, args ...interface{}) {
if logLevel, ok := b.attemptToLog(); ok {
klog.V(logLevel).Infof(message, args...)
}
}
// Watch attempts to begin watching the requested location.
// Returns a watch.Interface, or an error.
func (r *Request) Watch() (watch.Interface, error) {
return r.WatchWithSpecificDecoders(
func(body io.ReadCloser) streaming.Decoder {
framer := r.serializers.Framer.NewFrameReader(body)
return streaming.NewDecoder(framer, r.serializers.StreamingSerializer)
},
r.serializers.Decoder,
)
}
// WatchWithSpecificDecoders attempts to begin watching the requested location with a *different* decoder.
// Turns out that you want one "standard" decoder for the watch event and one "personal" decoder for the content
// Returns a watch.Interface, or an error.
func (r *Request) WatchWithSpecificDecoders(wrapperDecoderFn func(io.ReadCloser) streaming.Decoder, embeddedDecoder runtime.Decoder) (watch.Interface, error) {
func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
// We specifically don't want to rate limit watches, so we
// don't use r.throttle here.
// don't use r.rateLimiter here.
if r.err != nil {
return nil, r.err
}
if r.serializers.Framer == nil {
return nil, fmt.Errorf("watching resources is not possible with this client (content-type: %s)", r.content.ContentType)
}
url := r.URL().String()
req, err := http.NewRequest(r.verb, url, r.body)
if err != nil {
return nil, err
}
if r.ctx != nil {
req = req.WithContext(r.ctx)
}
req = req.WithContext(ctx)
req.Header = r.headers
client := r.client
client := r.c.Client
if client == nil {
client = http.DefaultClient
}
r.backoffMgr.Sleep(r.backoffMgr.CalculateBackoff(r.URL()))
r.backoff.Sleep(r.backoff.CalculateBackoff(r.URL()))
resp, err := client.Do(req)
updateURLMetrics(r, resp, err)
if r.baseURL != nil {
if r.c.base != nil {
if err != nil {
r.backoffMgr.UpdateBackoff(r.baseURL, err, 0)
r.backoff.UpdateBackoff(r.c.base, err, 0)
} else {
r.backoffMgr.UpdateBackoff(r.baseURL, err, resp.StatusCode)
r.backoff.UpdateBackoff(r.c.base, err, resp.StatusCode)
}
}
if err != nil {
// The watch stream mechanism handles many common partial data errors, so closed
// connections can be retried in many cases.
if net.IsProbableEOF(err) {
if net.IsProbableEOF(err) || net.IsTimeout(err) {
return watch.NewEmptyWatch(), nil
}
return nil, err
@ -592,18 +689,38 @@ func (r *Request) WatchWithSpecificDecoders(wrapperDecoderFn func(io.ReadCloser)
if result := r.transformResponse(resp, req); result.err != nil {
return nil, result.err
}
return nil, fmt.Errorf("for request '%+v', got status: %v", url, resp.StatusCode)
return nil, fmt.Errorf("for request %s, got status: %v", url, resp.StatusCode)
}
wrapperDecoder := wrapperDecoderFn(resp.Body)
return watch.NewStreamWatcher(restclientwatch.NewDecoder(wrapperDecoder, embeddedDecoder)), nil
contentType := resp.Header.Get("Content-Type")
mediaType, params, err := mime.ParseMediaType(contentType)
if err != nil {
klog.V(4).Infof("Unexpected content type from the server: %q: %v", contentType, err)
}
objectDecoder, streamingSerializer, framer, err := r.c.content.Negotiator.StreamDecoder(mediaType, params)
if err != nil {
return nil, err
}
handleWarnings(resp.Header, r.warningHandler)
frameReader := framer.NewFrameReader(resp.Body)
watchEventDecoder := streaming.NewDecoder(frameReader, streamingSerializer)
return watch.NewStreamWatcher(
restclientwatch.NewDecoder(watchEventDecoder, objectDecoder),
// use 500 to indicate that the cause of the error is unknown - other error codes
// are more specific to HTTP interactions, and set a reason
errors.NewClientErrorReporter(http.StatusInternalServerError, r.verb, "ClientWatchDecoding"),
), nil
}
// updateURLMetrics is a convenience function for pushing metrics.
// It also handles corner cases for incomplete/invalid request data.
func updateURLMetrics(req *Request, resp *http.Response, err error) {
url := "none"
if req.baseURL != nil {
url = req.baseURL.Host
if req.c.base != nil {
url = req.c.base.Host
}
// Errors can be arbitrary strings. Unbound label cardinality is not suitable for a metric
@ -620,34 +737,37 @@ func updateURLMetrics(req *Request, resp *http.Response, err error) {
// Returns io.ReadCloser which could be used for streaming of the response, or an error
// Any non-2xx http status code causes an error. If we get a non-2xx code, we try to convert the body into an APIStatus object.
// If we can, we return that as an error. Otherwise, we create an error that lists the http status and the content of the response.
func (r *Request) Stream() (io.ReadCloser, error) {
func (r *Request) Stream(ctx context.Context) (io.ReadCloser, error) {
if r.err != nil {
return nil, r.err
}
r.tryThrottle()
if err := r.tryThrottle(ctx); err != nil {
return nil, err
}
url := r.URL().String()
req, err := http.NewRequest(r.verb, url, nil)
if err != nil {
return nil, err
}
if r.ctx != nil {
req = req.WithContext(r.ctx)
if r.body != nil {
req.Body = ioutil.NopCloser(r.body)
}
req = req.WithContext(ctx)
req.Header = r.headers
client := r.client
client := r.c.Client
if client == nil {
client = http.DefaultClient
}
r.backoffMgr.Sleep(r.backoffMgr.CalculateBackoff(r.URL()))
r.backoff.Sleep(r.backoff.CalculateBackoff(r.URL()))
resp, err := client.Do(req)
updateURLMetrics(r, resp, err)
if r.baseURL != nil {
if r.c.base != nil {
if err != nil {
r.backoffMgr.UpdateBackoff(r.URL(), err, 0)
r.backoff.UpdateBackoff(r.URL(), err, 0)
} else {
r.backoffMgr.UpdateBackoff(r.URL(), err, resp.StatusCode)
r.backoff.UpdateBackoff(r.URL(), err, resp.StatusCode)
}
}
if err != nil {
@ -656,6 +776,7 @@ func (r *Request) Stream() (io.ReadCloser, error) {
switch {
case (resp.StatusCode >= 200) && (resp.StatusCode < 300):
handleWarnings(resp.Header, r.warningHandler)
return resp.Body, nil
default:
@ -671,11 +792,38 @@ func (r *Request) Stream() (io.ReadCloser, error) {
}
}
// requestPreflightCheck looks for common programmer errors on Request.
//
// We tackle here two programmer mistakes. The first one is to try to create
// something(POST) using an empty string as namespace with namespaceSet as
// true. If namespaceSet is true then namespace should also be defined. The
// second mistake is, when under the same circumstances, the programmer tries
// to GET, PUT or DELETE a named resource(resourceName != ""), again, if
// namespaceSet is true then namespace must not be empty.
func (r *Request) requestPreflightCheck() error {
if !r.namespaceSet {
return nil
}
if len(r.namespace) > 0 {
return nil
}
switch r.verb {
case "POST":
return fmt.Errorf("an empty namespace may not be set during creation")
case "GET", "PUT", "DELETE":
if len(r.resourceName) > 0 {
return fmt.Errorf("an empty namespace may not be set when a resource name is provided")
}
}
return nil
}
// request connects to the server and invokes the provided function when a server response is
// received. It handles retry behavior and up front validation of requests. It will invoke
// fn at most once. It will return an error if a problem occurred prior to connecting to the
// server - the provided function is responsible for handling server errors.
func (r *Request) request(fn func(*http.Request, *http.Response)) error {
func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Response)) error {
//Metrics for total request latency
start := time.Now()
defer func() {
@ -687,69 +835,75 @@ func (r *Request) request(fn func(*http.Request, *http.Response)) error {
return r.err
}
// TODO: added to catch programmer errors (invoking operations with an object with an empty namespace)
if (r.verb == "GET" || r.verb == "PUT" || r.verb == "DELETE") && r.namespaceSet && len(r.resourceName) > 0 && len(r.namespace) == 0 {
return fmt.Errorf("an empty namespace may not be set when a resource name is provided")
}
if (r.verb == "POST") && r.namespaceSet && len(r.namespace) == 0 {
return fmt.Errorf("an empty namespace may not be set during creation")
if err := r.requestPreflightCheck(); err != nil {
return err
}
client := r.client
client := r.c.Client
if client == nil {
client = http.DefaultClient
}
// Throttle the first try before setting up the timeout configured on the
// client. We don't want a throttled client to return timeouts to callers
// before it makes a single request.
if err := r.tryThrottle(ctx); err != nil {
return err
}
if r.timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, r.timeout)
defer cancel()
}
// Right now we make about ten retry attempts if we get a Retry-After response.
maxRetries := 10
retries := 0
for {
url := r.URL().String()
req, err := http.NewRequest(r.verb, url, r.body)
if err != nil {
return err
}
if r.timeout > 0 {
if r.ctx == nil {
r.ctx = context.Background()
}
var cancelFn context.CancelFunc
r.ctx, cancelFn = context.WithTimeout(r.ctx, r.timeout)
defer cancelFn()
}
if r.ctx != nil {
req = req.WithContext(r.ctx)
}
req = req.WithContext(ctx)
req.Header = r.headers
r.backoffMgr.Sleep(r.backoffMgr.CalculateBackoff(r.URL()))
r.backoff.Sleep(r.backoff.CalculateBackoff(r.URL()))
if retries > 0 {
// We are retrying the request that we already send to apiserver
// at least once before.
// This request should also be throttled with the client-internal throttler.
r.tryThrottle()
// This request should also be throttled with the client-internal rate limiter.
if err := r.tryThrottle(ctx); err != nil {
return err
}
}
resp, err := client.Do(req)
updateURLMetrics(r, resp, err)
if err != nil {
r.backoffMgr.UpdateBackoff(r.URL(), err, 0)
r.backoff.UpdateBackoff(r.URL(), err, 0)
} else {
r.backoffMgr.UpdateBackoff(r.URL(), err, resp.StatusCode)
r.backoff.UpdateBackoff(r.URL(), err, resp.StatusCode)
}
if err != nil {
// "Connection reset by peer" is usually a transient error.
// "Connection reset by peer" or "apiserver is shutting down" are usually a transient errors.
// Thus in case of "GET" operations, we simply retry it.
// We are not automatically retrying "write" operations, as
// they are not idempotent.
if !net.IsConnectionReset(err) || r.verb != "GET" {
if r.verb != "GET" {
return err
}
// For the purpose of retry, we set the artificial "retry-after" response.
// TODO: Should we clean the original response if it exists?
resp = &http.Response{
StatusCode: http.StatusInternalServerError,
Header: http.Header{"Retry-After": []string{"1"}},
Body: ioutil.NopCloser(bytes.NewReader([]byte{})),
// For connection errors and apiserver shutdown errors retry.
if net.IsConnectionReset(err) || net.IsProbableEOF(err) {
// For the purpose of retry, we set the artificial "retry-after" response.
// TODO: Should we clean the original response if it exists?
resp = &http.Response{
StatusCode: http.StatusInternalServerError,
Header: http.Header{"Retry-After": []string{"1"}},
Body: ioutil.NopCloser(bytes.NewReader([]byte{})),
}
} else {
return err
}
}
@ -766,7 +920,7 @@ func (r *Request) request(fn func(*http.Request, *http.Response)) error {
}()
retries++
if seconds, wait := checkWait(resp); wait && retries < maxRetries {
if seconds, wait := checkWait(resp); wait && retries <= r.maxRetries {
if seeker, ok := r.body.(io.Seeker); ok && r.body != nil {
_, err := seeker.Seek(0, 0)
if err != nil {
@ -777,7 +931,7 @@ func (r *Request) request(fn func(*http.Request, *http.Response)) error {
}
klog.V(4).Infof("Got a Retry-After %ds response for attempt %d to %v", seconds, retries, url)
r.backoffMgr.Sleep(time.Duration(seconds) * time.Second)
r.backoff.Sleep(time.Duration(seconds) * time.Second)
return false
}
fn(req, resp)
@ -793,15 +947,11 @@ func (r *Request) request(fn func(*http.Request, *http.Response)) error {
// processing.
//
// Error type:
// * If the request can't be constructed, or an error happened earlier while building its
// arguments: *RequestConstructionError
// * If the server responds with a status: *errors.StatusError or *errors.UnexpectedObjectError
// * http.Client.Do errors are returned directly.
func (r *Request) Do() Result {
r.tryThrottle()
func (r *Request) Do(ctx context.Context) Result {
var result Result
err := r.request(func(req *http.Request, resp *http.Response) {
err := r.request(ctx, func(req *http.Request, resp *http.Response) {
result = r.transformResponse(resp, req)
})
if err != nil {
@ -811,11 +961,9 @@ func (r *Request) Do() Result {
}
// DoRaw executes the request but does not process the response body.
func (r *Request) DoRaw() ([]byte, error) {
r.tryThrottle()
func (r *Request) DoRaw(ctx context.Context) ([]byte, error) {
var result Result
err := r.request(func(req *http.Request, resp *http.Response) {
err := r.request(ctx, func(req *http.Request, resp *http.Response) {
result.body, result.err = ioutil.ReadAll(resp.Body)
glogBody("Response Body", result.body)
if resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusPartialContent {
@ -845,13 +993,13 @@ func (r *Request) transformResponse(resp *http.Response, req *http.Request) Resu
// 3. Apiserver closes connection.
// 4. client-go should catch this and return an error.
klog.V(2).Infof("Stream error %#v when reading response body, may be caused by closed connection.", err)
streamErr := fmt.Errorf("Stream error %#v when reading response body, may be caused by closed connection. Please retry.", err)
streamErr := fmt.Errorf("stream error when reading response body, may be caused by closed connection. Please retry. Original error: %v", err)
return Result{
err: streamErr,
}
default:
klog.Errorf("Unexpected error when reading response body: %#v", err)
unexpectedErr := fmt.Errorf("Unexpected error %#v when reading response body. Please retry.", err)
klog.Errorf("Unexpected error when reading response body: %v", err)
unexpectedErr := fmt.Errorf("unexpected error when reading response body. Please retry. Original error: %v", err)
return Result{
err: unexpectedErr,
}
@ -861,14 +1009,18 @@ func (r *Request) transformResponse(resp *http.Response, req *http.Request) Resu
glogBody("Response Body", body)
// verify the content type is accurate
var decoder runtime.Decoder
contentType := resp.Header.Get("Content-Type")
decoder := r.serializers.Decoder
if len(contentType) > 0 && (decoder == nil || (len(r.content.ContentType) > 0 && contentType != r.content.ContentType)) {
if len(contentType) == 0 {
contentType = r.c.content.ContentType
}
if len(contentType) > 0 {
var err error
mediaType, params, err := mime.ParseMediaType(contentType)
if err != nil {
return Result{err: errors.NewInternalError(err)}
}
decoder, err = r.serializers.RenegotiatedDecoder(mediaType, params)
decoder, err = r.c.content.Negotiator.Decoder(mediaType, params)
if err != nil {
// if we fail to negotiate a decoder, treat this as an unstructured error
switch {
@ -881,6 +1033,7 @@ func (r *Request) transformResponse(resp *http.Response, req *http.Request) Resu
body: body,
contentType: contentType,
statusCode: resp.StatusCode,
warnings: handleWarnings(resp.Header, r.warningHandler),
}
}
}
@ -899,6 +1052,7 @@ func (r *Request) transformResponse(resp *http.Response, req *http.Request) Resu
statusCode: resp.StatusCode,
decoder: decoder,
err: err,
warnings: handleWarnings(resp.Header, r.warningHandler),
}
}
@ -907,6 +1061,7 @@ func (r *Request) transformResponse(resp *http.Response, req *http.Request) Resu
contentType: contentType,
statusCode: resp.StatusCode,
decoder: decoder,
warnings: handleWarnings(resp.Header, r.warningHandler),
}
}
@ -914,11 +1069,11 @@ func (r *Request) transformResponse(resp *http.Response, req *http.Request) Resu
func truncateBody(body string) string {
max := 0
switch {
case bool(klog.V(10)):
case bool(klog.V(10).Enabled()):
return body
case bool(klog.V(9)):
case bool(klog.V(9).Enabled()):
max = 10240
case bool(klog.V(8)):
case bool(klog.V(8).Enabled()):
max = 1024
}
@ -933,7 +1088,7 @@ func truncateBody(body string) string {
// allocating a new string for the body output unless necessary. Uses a simple heuristic to determine
// whether the body is printable.
func glogBody(prefix string, body []byte) {
if klog.V(8) {
if klog.V(8).Enabled() {
if bytes.IndexFunc(body, func(r rune) bool {
return r < 0x0a
}) != -1 {
@ -988,7 +1143,7 @@ func (r *Request) newUnstructuredResponseError(body []byte, isTextResponse bool,
}
var groupResource schema.GroupResource
if len(r.resource) > 0 {
groupResource.Group = r.content.GroupVersion.Group
groupResource.Group = r.c.content.GroupVersion.Group
groupResource.Resource = r.resource
}
return errors.NewGenericServerResponse(
@ -1042,6 +1197,7 @@ func retryAfterSeconds(resp *http.Response) (int, bool) {
// Result contains the result of calling Request.Do().
type Result struct {
body []byte
warnings []net.WarningHeader
contentType string
err error
statusCode int
@ -1155,6 +1311,11 @@ func (r Result) Error() error {
return r.err
}
// Warnings returns any warning headers received in the response
func (r Result) Warnings() []net.WarningHeader {
return r.warnings
}
// NameMayNotBe specifies strings that cannot be used as names specified as path segments (like the REST API or etcd store)
var NameMayNotBe = []string{".", ".."}

View File

@ -61,9 +61,10 @@ func HTTPWrappersForConfig(config *Config, rt http.RoundTripper) (http.RoundTrip
// TransportConfig converts a client config to an appropriate transport config.
func (c *Config) TransportConfig() (*transport.Config, error) {
conf := &transport.Config{
UserAgent: c.UserAgent,
Transport: c.Transport,
WrapTransport: c.WrapTransport,
UserAgent: c.UserAgent,
Transport: c.Transport,
WrapTransport: c.WrapTransport,
DisableCompression: c.DisableCompression,
TLS: transport.TLSConfig{
Insecure: c.Insecure,
ServerName: c.ServerName,
@ -73,6 +74,7 @@ func (c *Config) TransportConfig() (*transport.Config, error) {
CertData: c.CertData,
KeyFile: c.KeyFile,
KeyData: c.KeyData,
NextProtos: c.NextProtos,
},
Username: c.Username,
Password: c.Password,
@ -83,7 +85,8 @@ func (c *Config) TransportConfig() (*transport.Config, error) {
Groups: c.Impersonate.Groups,
Extra: c.Impersonate.Extra,
},
Dial: c.Dial,
Dial: c.Dial,
Proxy: c.Proxy,
}
if c.ExecProvider != nil && c.AuthProvider != nil {

View File

@ -22,7 +22,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/klog"
"k8s.io/klog/v2"
)
// Set of resp. Codes that we backoff for.

144
vendor/k8s.io/client-go/rest/warnings.go generated vendored Normal file
View File

@ -0,0 +1,144 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package rest
import (
"fmt"
"io"
"net/http"
"sync"
"k8s.io/klog/v2"
"k8s.io/apimachinery/pkg/util/net"
)
// WarningHandler is an interface for handling warning headers
type WarningHandler interface {
// HandleWarningHeader is called with the warn code, agent, and text when a warning header is countered.
HandleWarningHeader(code int, agent string, text string)
}
var (
defaultWarningHandler WarningHandler = WarningLogger{}
defaultWarningHandlerLock sync.RWMutex
)
// SetDefaultWarningHandler sets the default handler client uses when warning headers are encountered.
// By default, warnings are printed to stderr.
func SetDefaultWarningHandler(l WarningHandler) {
defaultWarningHandlerLock.Lock()
defer defaultWarningHandlerLock.Unlock()
defaultWarningHandler = l
}
func getDefaultWarningHandler() WarningHandler {
defaultWarningHandlerLock.RLock()
defer defaultWarningHandlerLock.RUnlock()
l := defaultWarningHandler
return l
}
// NoWarnings is an implementation of WarningHandler that suppresses warnings.
type NoWarnings struct{}
func (NoWarnings) HandleWarningHeader(code int, agent string, message string) {}
// WarningLogger is an implementation of WarningHandler that logs code 299 warnings
type WarningLogger struct{}
func (WarningLogger) HandleWarningHeader(code int, agent string, message string) {
if code != 299 || len(message) == 0 {
return
}
klog.Warning(message)
}
type warningWriter struct {
// out is the writer to output warnings to
out io.Writer
// opts contains options controlling warning output
opts WarningWriterOptions
// writtenLock guards written and writtenCount
writtenLock sync.Mutex
writtenCount int
written map[string]struct{}
}
// WarningWriterOptions controls the behavior of a WarningHandler constructed using NewWarningWriter()
type WarningWriterOptions struct {
// Deduplicate indicates a given warning message should only be written once.
// Setting this to true in a long-running process handling many warnings can result in increased memory use.
Deduplicate bool
// Color indicates that warning output can include ANSI color codes
Color bool
}
// NewWarningWriter returns an implementation of WarningHandler that outputs code 299 warnings to the specified writer.
func NewWarningWriter(out io.Writer, opts WarningWriterOptions) *warningWriter {
h := &warningWriter{out: out, opts: opts}
if opts.Deduplicate {
h.written = map[string]struct{}{}
}
return h
}
const (
yellowColor = "\u001b[33;1m"
resetColor = "\u001b[0m"
)
// HandleWarningHeader prints warnings with code=299 to the configured writer.
func (w *warningWriter) HandleWarningHeader(code int, agent string, message string) {
if code != 299 || len(message) == 0 {
return
}
w.writtenLock.Lock()
defer w.writtenLock.Unlock()
if w.opts.Deduplicate {
if _, alreadyWritten := w.written[message]; alreadyWritten {
return
}
w.written[message] = struct{}{}
}
w.writtenCount++
if w.opts.Color {
fmt.Fprintf(w.out, "%sWarning:%s %s\n", yellowColor, resetColor, message)
} else {
fmt.Fprintf(w.out, "Warning: %s\n", message)
}
}
func (w *warningWriter) WarningCount() int {
w.writtenLock.Lock()
defer w.writtenLock.Unlock()
return w.writtenCount
}
func handleWarnings(headers http.Header, handler WarningHandler) []net.WarningHeader {
if handler == nil {
handler = getDefaultWarningHandler()
}
warnings, _ := net.ParseWarningHeaders(headers["Warning"])
for _, warning := range warnings {
handler.HandleWarningHeader(warning.Code, warning.Agent, warning.Text)
}
return warnings
}

View File

@ -54,7 +54,7 @@ func (d *Decoder) Decode() (watch.EventType, runtime.Object, error) {
return "", nil, fmt.Errorf("unable to decode to metav1.Event")
}
switch got.Type {
case string(watch.Added), string(watch.Modified), string(watch.Deleted), string(watch.Error):
case string(watch.Added), string(watch.Modified), string(watch.Deleted), string(watch.Error), string(watch.Bookmark):
default:
return "", nil, fmt.Errorf("got invalid watch event type: %v", got.Type)
}

View File

@ -38,6 +38,11 @@ func (in *TLSClientConfig) DeepCopyInto(out *TLSClientConfig) {
*out = make([]byte, len(*in))
copy(*out, *in)
}
if in.NextProtos != nil {
in, out := &in.NextProtos, &out.NextProtos
*out = make([]string, len(*in))
copy(*out, *in)
}
return
}