feat(storage): Add optional write-through cache to sql store

This commit is contained in:
TwiN
2022-08-11 20:47:29 -04:00
parent f01b66f083
commit 9de6334f21
6 changed files with 194 additions and 19 deletions

View File

@ -13,6 +13,7 @@ import (
"github.com/TwiN/gatus/v4/storage/store/common"
"github.com/TwiN/gatus/v4/storage/store/common/paging"
"github.com/TwiN/gatus/v4/util"
"github.com/TwiN/gocache/v2"
_ "github.com/lib/pq"
_ "modernc.org/sqlite"
)
@ -32,6 +33,8 @@ const (
resultsCleanUpThreshold = common.MaximumNumberOfResults + 10 // Maximum number of results before triggering a clean up
uptimeRetention = 7 * 24 * time.Hour
cacheTTL = 10 * time.Minute
)
var (
@ -49,10 +52,14 @@ type Store struct {
driver, path string
db *sql.DB
// writeThroughCache is a cache used to drastically decrease read latency by pre-emptively
// caching writes as they happen. If nil, writes are not cached.
writeThroughCache *gocache.Cache
}
// NewStore initializes the database and creates the schema if it doesn't already exist in the path specified
func NewStore(driver, path string) (*Store, error) {
func NewStore(driver, path string, caching bool) (*Store, error) {
if len(driver) == 0 {
return nil, ErrDatabaseDriverNotSpecified
}
@ -79,6 +86,9 @@ func NewStore(driver, path string) (*Store, error) {
_ = store.db.Close()
return nil, err
}
if caching {
store.writeThroughCache = gocache.NewCache().WithMaxSize(10000)
}
return store, nil
}
@ -323,6 +333,20 @@ func (s *Store) Insert(endpoint *core.Endpoint, result *core.Result) error {
}
}
}
// TODO: add field to automatically refresh the cache when a new result is inserted
if s.writeThroughCache != nil {
cacheKeysToRefresh := s.writeThroughCache.GetKeysByPattern(endpoint.Key()+"*", 0)
for _, cacheKey := range cacheKeysToRefresh {
s.writeThroughCache.Delete(cacheKey)
endpointKey, params, err := extractKeyAndParamsFromCacheKey(cacheKey)
if err != nil {
log.Printf("[sql][Insert] Silently deleting cache key %s instead of refreshing due to error: %s", cacheKey, err.Error())
continue
}
// Retrieve the endpoint status by key, which will in turn refresh the cache
_, _ = s.getEndpointStatusByKey(tx, endpointKey, params)
}
}
if err = tx.Commit(); err != nil {
_ = tx.Rollback()
}
@ -350,6 +374,11 @@ func (s *Store) DeleteAllEndpointStatusesNotInKeys(keys []string) int {
log.Printf("[sql][DeleteAllEndpointStatusesNotInKeys] Failed to delete rows that do not belong to any of keys=%v: %s", keys, err.Error())
return 0
}
if s.writeThroughCache != nil {
// It's easier to just wipe out the entire cache than to try to find all keys that are not in the keys list
_ = s.writeThroughCache.DeleteKeysByPattern("*")
}
// Return number of rows deleted
rowsAffects, _ := result.RowsAffected()
return int(rowsAffects)
}
@ -357,6 +386,9 @@ func (s *Store) DeleteAllEndpointStatusesNotInKeys(keys []string) int {
// Clear deletes everything from the store
func (s *Store) Clear() {
_, _ = s.db.Exec("DELETE FROM endpoints")
if s.writeThroughCache != nil {
_ = s.writeThroughCache.DeleteKeysByPattern("*")
}
}
// Save does nothing, because this store is immediately persistent.
@ -367,6 +399,10 @@ func (s *Store) Save() error {
// Close the database handle
func (s *Store) Close() {
_ = s.db.Close()
if s.writeThroughCache != nil {
// Clear the cache too. If the store's been closed, we don't want to keep the cache around.
_ = s.writeThroughCache.DeleteKeysByPattern("*")
}
}
// insertEndpoint inserts an endpoint in the store and returns the generated id of said endpoint
@ -479,6 +515,15 @@ func (s *Store) getAllEndpointKeys(tx *sql.Tx) (keys []string, err error) {
}
func (s *Store) getEndpointStatusByKey(tx *sql.Tx, key string, parameters *paging.EndpointStatusParams) (*core.EndpointStatus, error) {
var cacheKey string
if s.writeThroughCache != nil {
cacheKey = generateCacheKey(key, parameters)
if cachedEndpointStatus, exists := s.writeThroughCache.Get(cacheKey); exists {
if castedCachedEndpointStatus, ok := cachedEndpointStatus.(*core.EndpointStatus); ok {
return castedCachedEndpointStatus, nil
}
}
}
endpointID, group, endpointName, err := s.getEndpointIDGroupAndNameByKey(tx, key)
if err != nil {
return nil, err
@ -494,6 +539,9 @@ func (s *Store) getEndpointStatusByKey(tx *sql.Tx, key string, parameters *pagin
log.Printf("[sql][getEndpointStatusByKey] Failed to retrieve results for key=%s: %s", key, err.Error())
}
}
if s.writeThroughCache != nil {
s.writeThroughCache.SetWithTTL(cacheKey, endpointStatus, cacheTTL)
}
return endpointStatus, nil
}
@ -788,3 +836,29 @@ func (s *Store) deleteOldUptimeEntries(tx *sql.Tx, endpointID int64, maxAge time
_, err := tx.Exec("DELETE FROM endpoint_uptimes WHERE endpoint_id = $1 AND hour_unix_timestamp < $2", endpointID, maxAge.Unix())
return err
}
func generateCacheKey(endpointKey string, p *paging.EndpointStatusParams) string {
return fmt.Sprintf("%s-%d-%d-%d-%d", endpointKey, p.EventsPage, p.EventsPageSize, p.ResultsPage, p.ResultsPageSize)
}
func extractKeyAndParamsFromCacheKey(cacheKey string) (string, *paging.EndpointStatusParams, error) {
parts := strings.Split(cacheKey, "-")
if len(parts) < 5 {
return "", nil, fmt.Errorf("invalid cache key: %s", cacheKey)
}
params := &paging.EndpointStatusParams{}
var err error
if params.EventsPage, err = strconv.Atoi(parts[len(parts)-4]); err != nil {
return "", nil, fmt.Errorf("invalid cache key: %s", err.Error())
}
if params.EventsPageSize, err = strconv.Atoi(parts[len(parts)-3]); err != nil {
return "", nil, fmt.Errorf("invalid cache key: %s", err.Error())
}
if params.ResultsPage, err = strconv.Atoi(parts[len(parts)-2]); err != nil {
return "", nil, fmt.Errorf("invalid cache key: %s", err.Error())
}
if params.ResultsPageSize, err = strconv.Atoi(parts[len(parts)-1]); err != nil {
return "", nil, fmt.Errorf("invalid cache key: %s", err.Error())
}
return strings.Join(parts[:len(parts)-4], "-"), params, nil
}