feat(remote): Implement lazy distributed feature (#64)
THIS IS AN EXPERIMENTAL FEATURE/IMPLEMENTATION, AND IT MAY BE REMOVED IN THE FUTURE. Note that for now, it will be an undocumented feature.
This commit is contained in:
@ -5,11 +5,16 @@ import (
|
||||
"compress/gzip"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/TwiN/gatus/v4/client"
|
||||
"github.com/TwiN/gatus/v4/config"
|
||||
"github.com/TwiN/gatus/v4/config/remote"
|
||||
"github.com/TwiN/gatus/v4/core"
|
||||
"github.com/TwiN/gatus/v4/storage/store"
|
||||
"github.com/TwiN/gatus/v4/storage/store/common"
|
||||
"github.com/TwiN/gatus/v4/storage/store/common/paging"
|
||||
@ -28,48 +33,89 @@ var (
|
||||
// EndpointStatuses handles requests to retrieve all EndpointStatus
|
||||
// Due to the size of the response, this function leverages a cache.
|
||||
// Must not be wrapped by GzipHandler
|
||||
func EndpointStatuses(writer http.ResponseWriter, r *http.Request) {
|
||||
page, pageSize := extractPageAndPageSizeFromRequest(r)
|
||||
gzipped := strings.Contains(r.Header.Get("Accept-Encoding"), "gzip")
|
||||
var exists bool
|
||||
var value interface{}
|
||||
if gzipped {
|
||||
writer.Header().Set("Content-Encoding", "gzip")
|
||||
value, exists = cache.Get(fmt.Sprintf("endpoint-status-%d-%d-gzipped", page, pageSize))
|
||||
} else {
|
||||
value, exists = cache.Get(fmt.Sprintf("endpoint-status-%d-%d", page, pageSize))
|
||||
}
|
||||
var data []byte
|
||||
if !exists {
|
||||
var err error
|
||||
buffer := &bytes.Buffer{}
|
||||
gzipWriter := gzip.NewWriter(buffer)
|
||||
endpointStatuses, err := store.Get().GetAllEndpointStatuses(paging.NewEndpointStatusParams().WithResults(page, pageSize))
|
||||
if err != nil {
|
||||
log.Printf("[handler][EndpointStatuses] Failed to retrieve endpoint statuses: %s", err.Error())
|
||||
http.Error(writer, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
data, err = json.Marshal(endpointStatuses)
|
||||
if err != nil {
|
||||
log.Printf("[handler][EndpointStatuses] Unable to marshal object to JSON: %s", err.Error())
|
||||
http.Error(writer, "unable to marshal object to JSON", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
_, _ = gzipWriter.Write(data)
|
||||
_ = gzipWriter.Close()
|
||||
gzippedData := buffer.Bytes()
|
||||
cache.SetWithTTL(fmt.Sprintf("endpoint-status-%d-%d", page, pageSize), data, cacheTTL)
|
||||
cache.SetWithTTL(fmt.Sprintf("endpoint-status-%d-%d-gzipped", page, pageSize), gzippedData, cacheTTL)
|
||||
func EndpointStatuses(cfg *config.Config) http.HandlerFunc {
|
||||
return func(writer http.ResponseWriter, r *http.Request) {
|
||||
page, pageSize := extractPageAndPageSizeFromRequest(r)
|
||||
gzipped := strings.Contains(r.Header.Get("Accept-Encoding"), "gzip")
|
||||
var exists bool
|
||||
var value interface{}
|
||||
if gzipped {
|
||||
data = gzippedData
|
||||
writer.Header().Set("Content-Encoding", "gzip")
|
||||
value, exists = cache.Get(fmt.Sprintf("endpoint-status-%d-%d-gzipped", page, pageSize))
|
||||
} else {
|
||||
value, exists = cache.Get(fmt.Sprintf("endpoint-status-%d-%d", page, pageSize))
|
||||
}
|
||||
} else {
|
||||
data = value.([]byte)
|
||||
var data []byte
|
||||
if !exists {
|
||||
var err error
|
||||
buffer := &bytes.Buffer{}
|
||||
gzipWriter := gzip.NewWriter(buffer)
|
||||
endpointStatuses, err := store.Get().GetAllEndpointStatuses(paging.NewEndpointStatusParams().WithResults(page, pageSize))
|
||||
if err != nil {
|
||||
log.Printf("[handler][EndpointStatuses] Failed to retrieve endpoint statuses: %s", err.Error())
|
||||
http.Error(writer, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
// ALPHA: Retrieve endpoint statuses from remote instances
|
||||
if endpointStatusesFromRemote, err := getEndpointStatusesFromRemoteInstances(cfg.Remote); err != nil {
|
||||
log.Printf("[handler][EndpointStatuses] Silently failed to retrieve endpoint statuses from remote: %s", err.Error())
|
||||
} else if endpointStatusesFromRemote != nil {
|
||||
endpointStatuses = append(endpointStatuses, endpointStatusesFromRemote...)
|
||||
}
|
||||
// Marshal endpoint statuses to JSON
|
||||
data, err = json.Marshal(endpointStatuses)
|
||||
if err != nil {
|
||||
log.Printf("[handler][EndpointStatuses] Unable to marshal object to JSON: %s", err.Error())
|
||||
http.Error(writer, "unable to marshal object to JSON", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
_, _ = gzipWriter.Write(data)
|
||||
_ = gzipWriter.Close()
|
||||
gzippedData := buffer.Bytes()
|
||||
cache.SetWithTTL(fmt.Sprintf("endpoint-status-%d-%d", page, pageSize), data, cacheTTL)
|
||||
cache.SetWithTTL(fmt.Sprintf("endpoint-status-%d-%d-gzipped", page, pageSize), gzippedData, cacheTTL)
|
||||
if gzipped {
|
||||
data = gzippedData
|
||||
}
|
||||
} else {
|
||||
data = value.([]byte)
|
||||
}
|
||||
writer.Header().Add("Content-Type", "application/json")
|
||||
writer.WriteHeader(http.StatusOK)
|
||||
_, _ = writer.Write(data)
|
||||
}
|
||||
writer.Header().Add("Content-Type", "application/json")
|
||||
writer.WriteHeader(http.StatusOK)
|
||||
_, _ = writer.Write(data)
|
||||
}
|
||||
|
||||
func getEndpointStatusesFromRemoteInstances(remoteConfig *remote.Config) ([]*core.EndpointStatus, error) {
|
||||
if remoteConfig == nil || len(remoteConfig.Instances) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
var endpointStatusesFromAllRemotes []*core.EndpointStatus
|
||||
httpClient := client.GetHTTPClient(remoteConfig.ClientConfig)
|
||||
for _, instance := range remoteConfig.Instances {
|
||||
response, err := httpClient.Get(instance.URL)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
body, err := io.ReadAll(response.Body)
|
||||
if err != nil {
|
||||
_ = response.Body.Close()
|
||||
log.Printf("[handler][getEndpointStatusesFromRemoteInstances] Silently failed to retrieve endpoint statuses from %s: %s", instance.URL, err.Error())
|
||||
continue
|
||||
}
|
||||
var endpointStatuses []*core.EndpointStatus
|
||||
if err = json.Unmarshal(body, &endpointStatuses); err != nil {
|
||||
_ = response.Body.Close()
|
||||
log.Printf("[handler][getEndpointStatusesFromRemoteInstances] Silently failed to retrieve endpoint statuses from %s: %s", instance.URL, err.Error())
|
||||
continue
|
||||
}
|
||||
_ = response.Body.Close()
|
||||
for _, endpointStatus := range endpointStatuses {
|
||||
endpointStatus.Name = instance.EndpointPrefix + endpointStatus.Name
|
||||
}
|
||||
endpointStatusesFromAllRemotes = append(endpointStatusesFromAllRemotes, endpointStatuses...)
|
||||
}
|
||||
return endpointStatusesFromAllRemotes, nil
|
||||
}
|
||||
|
||||
// EndpointStatus retrieves a single core.EndpointStatus by group and endpoint name
|
||||
|
Reference in New Issue
Block a user