feat(alerting): Persist triggered alerts across application restart (#764)
* feat(alerting): Persist triggered alerts across application restart Fixes #679 * test(alerting): Add numerous tests related to alerts
This commit is contained in:
@ -5,6 +5,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/TwiN/gatus/v5/alerting/alert"
|
||||
"github.com/TwiN/gatus/v5/config/endpoint"
|
||||
"github.com/TwiN/gatus/v5/storage/store/common"
|
||||
"github.com/TwiN/gatus/v5/storage/store/common/paging"
|
||||
@ -174,6 +175,37 @@ func (s *Store) DeleteAllEndpointStatusesNotInKeys(keys []string) int {
|
||||
return s.cache.DeleteAll(keysToDelete)
|
||||
}
|
||||
|
||||
// GetTriggeredEndpointAlert returns whether the triggered alert for the specified endpoint as well as the necessary information to resolve it
|
||||
//
|
||||
// Always returns that the alert does not exist for the in-memory store since it does not support persistence across restarts
|
||||
func (s *Store) GetTriggeredEndpointAlert(ep *endpoint.Endpoint, alert *alert.Alert) (exists bool, resolveKey string, numberOfSuccessesInARow int, err error) {
|
||||
return false, "", 0, nil
|
||||
}
|
||||
|
||||
// UpsertTriggeredEndpointAlert inserts/updates a triggered alert for an endpoint
|
||||
// Used for persistence of triggered alerts across application restarts
|
||||
//
|
||||
// Does nothing for the in-memory store since it does not support persistence across restarts
|
||||
func (s *Store) UpsertTriggeredEndpointAlert(ep *endpoint.Endpoint, triggeredAlert *alert.Alert) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// DeleteTriggeredEndpointAlert deletes a triggered alert for an endpoint
|
||||
//
|
||||
// Does nothing for the in-memory store since it does not support persistence across restarts
|
||||
func (s *Store) DeleteTriggeredEndpointAlert(ep *endpoint.Endpoint, triggeredAlert *alert.Alert) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// DeleteAllTriggeredAlertsNotInChecksumsByEndpoint removes all triggered alerts owned by an endpoint whose alert
|
||||
// configurations are not provided in the checksums list.
|
||||
// This prevents triggered alerts that have been removed or modified from lingering in the database.
|
||||
//
|
||||
// Does nothing for the in-memory store since it does not support persistence across restarts
|
||||
func (s *Store) DeleteAllTriggeredAlertsNotInChecksumsByEndpoint(ep *endpoint.Endpoint, checksums []string) int {
|
||||
return 0
|
||||
}
|
||||
|
||||
// Clear deletes everything from the store
|
||||
func (s *Store) Clear() {
|
||||
s.cache.Clear()
|
||||
|
@ -16,7 +16,7 @@ func (s *Store) createPostgresSchema() error {
|
||||
_, err = s.db.Exec(`
|
||||
CREATE TABLE IF NOT EXISTS endpoint_events (
|
||||
endpoint_event_id BIGSERIAL PRIMARY KEY,
|
||||
endpoint_id INTEGER NOT NULL REFERENCES endpoints(endpoint_id) ON DELETE CASCADE,
|
||||
endpoint_id BIGINT NOT NULL REFERENCES endpoints(endpoint_id) ON DELETE CASCADE,
|
||||
event_type TEXT NOT NULL,
|
||||
event_timestamp TIMESTAMP NOT NULL
|
||||
)
|
||||
@ -66,7 +66,20 @@ func (s *Store) createPostgresSchema() error {
|
||||
UNIQUE(endpoint_id, hour_unix_timestamp)
|
||||
)
|
||||
`)
|
||||
// Silent table modifications
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = s.db.Exec(`
|
||||
CREATE TABLE IF NOT EXISTS endpoint_alerts_triggered (
|
||||
endpoint_alert_trigger_id BIGSERIAL PRIMARY KEY,
|
||||
endpoint_id BIGINT NOT NULL REFERENCES endpoints(endpoint_id) ON DELETE CASCADE,
|
||||
configuration_checksum TEXT NOT NULL,
|
||||
resolve_key TEXT NOT NULL,
|
||||
number_of_successes_in_a_row INTEGER NOT NULL,
|
||||
UNIQUE(endpoint_id, configuration_checksum)
|
||||
)
|
||||
`)
|
||||
// Silent table modifications TODO: Remove this in v6.0.0
|
||||
_, _ = s.db.Exec(`ALTER TABLE endpoint_results ADD IF NOT EXISTS domain_expiration BIGINT NOT NULL DEFAULT 0`)
|
||||
return err
|
||||
}
|
||||
|
@ -66,7 +66,20 @@ func (s *Store) createSQLiteSchema() error {
|
||||
UNIQUE(endpoint_id, hour_unix_timestamp)
|
||||
)
|
||||
`)
|
||||
// Silent table modifications TODO: Remove this
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = s.db.Exec(`
|
||||
CREATE TABLE IF NOT EXISTS endpoint_alerts_triggered (
|
||||
endpoint_alert_trigger_id INTEGER PRIMARY KEY,
|
||||
endpoint_id INTEGER NOT NULL REFERENCES endpoints(endpoint_id) ON DELETE CASCADE,
|
||||
configuration_checksum TEXT NOT NULL,
|
||||
resolve_key TEXT NOT NULL,
|
||||
number_of_successes_in_a_row INTEGER NOT NULL,
|
||||
UNIQUE(endpoint_id, configuration_checksum)
|
||||
)
|
||||
`)
|
||||
// Silent table modifications TODO: Remove this in v6.0.0
|
||||
_, _ = s.db.Exec(`ALTER TABLE endpoint_results ADD domain_expiration INTEGER NOT NULL DEFAULT 0`)
|
||||
return err
|
||||
}
|
||||
|
@ -9,6 +9,7 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/TwiN/gatus/v5/alerting/alert"
|
||||
"github.com/TwiN/gatus/v5/config/endpoint"
|
||||
"github.com/TwiN/gatus/v5/storage/store/common"
|
||||
"github.com/TwiN/gatus/v5/storage/store/common/paging"
|
||||
@ -27,9 +28,9 @@ const (
|
||||
// for aesthetic purposes, I deemed it wasn't worth the performance impact of yet another one-to-many table.
|
||||
arraySeparator = "|~|"
|
||||
|
||||
uptimeCleanUpThreshold = 10 * 24 * time.Hour // Maximum uptime age before triggering a clean up
|
||||
eventsCleanUpThreshold = common.MaximumNumberOfEvents + 10 // Maximum number of events before triggering a clean up
|
||||
resultsCleanUpThreshold = common.MaximumNumberOfResults + 10 // Maximum number of results before triggering a clean up
|
||||
uptimeCleanUpThreshold = 10 * 24 * time.Hour // Maximum uptime age before triggering a cleanup
|
||||
eventsCleanUpThreshold = common.MaximumNumberOfEvents + 10 // Maximum number of events before triggering a cleanup
|
||||
resultsCleanUpThreshold = common.MaximumNumberOfResults + 10 // Maximum number of results before triggering a cleanup
|
||||
|
||||
uptimeRetention = 7 * 24 * time.Hour
|
||||
|
||||
@ -234,12 +235,12 @@ func (s *Store) Insert(ep *endpoint.Endpoint, result *endpoint.Result) error {
|
||||
// Endpoint doesn't exist in the database, insert it
|
||||
if endpointID, err = s.insertEndpoint(tx, ep); err != nil {
|
||||
_ = tx.Rollback()
|
||||
log.Printf("[sql.Insert] Failed to create endpoint with group=%s; endpoint=%s: %s", ep.Group, ep.Name, err.Error())
|
||||
log.Printf("[sql.Insert] Failed to create endpoint with key=%s: %s", ep.Key(), err.Error())
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
_ = tx.Rollback()
|
||||
log.Printf("[sql.Insert] Failed to retrieve id of endpoint with group=%s; endpoint=%s: %s", ep.Group, ep.Name, err.Error())
|
||||
log.Printf("[sql.Insert] Failed to retrieve id of endpoint with key=%s: %s", ep.Key(), err.Error())
|
||||
return err
|
||||
}
|
||||
}
|
||||
@ -255,7 +256,7 @@ func (s *Store) Insert(ep *endpoint.Endpoint, result *endpoint.Result) error {
|
||||
numberOfEvents, err := s.getNumberOfEventsByEndpointID(tx, endpointID)
|
||||
if err != nil {
|
||||
// Silently fail
|
||||
log.Printf("[sql.Insert] Failed to retrieve total number of events for group=%s; endpoint=%s: %s", ep.Group, ep.Name, err.Error())
|
||||
log.Printf("[sql.Insert] Failed to retrieve total number of events for endpoint with key=%s: %s", ep.Key(), err.Error())
|
||||
}
|
||||
if numberOfEvents == 0 {
|
||||
// There's no events yet, which means we need to add the EventStart and the first healthy/unhealthy event
|
||||
@ -265,18 +266,18 @@ func (s *Store) Insert(ep *endpoint.Endpoint, result *endpoint.Result) error {
|
||||
})
|
||||
if err != nil {
|
||||
// Silently fail
|
||||
log.Printf("[sql.Insert] Failed to insert event=%s for group=%s; endpoint=%s: %s", endpoint.EventStart, ep.Group, ep.Name, err.Error())
|
||||
log.Printf("[sql.Insert] Failed to insert event=%s for endpoint with key=%s: %s", endpoint.EventStart, ep.Key(), err.Error())
|
||||
}
|
||||
event := endpoint.NewEventFromResult(result)
|
||||
if err = s.insertEndpointEvent(tx, endpointID, event); err != nil {
|
||||
// Silently fail
|
||||
log.Printf("[sql.Insert] Failed to insert event=%s for group=%s; endpoint=%s: %s", event.Type, ep.Group, ep.Name, err.Error())
|
||||
log.Printf("[sql.Insert] Failed to insert event=%s for endpoint with key=%s: %s", event.Type, ep.Key(), err.Error())
|
||||
}
|
||||
} else {
|
||||
// Get the success value of the previous result
|
||||
var lastResultSuccess bool
|
||||
if lastResultSuccess, err = s.getLastEndpointResultSuccessValue(tx, endpointID); err != nil {
|
||||
log.Printf("[sql.Insert] Failed to retrieve outcome of previous result for group=%s; endpoint=%s: %s", ep.Group, ep.Name, err.Error())
|
||||
log.Printf("[sql.Insert] Failed to retrieve outcome of previous result for endpoint with key=%s: %s", ep.Key(), err.Error())
|
||||
} else {
|
||||
// If we managed to retrieve the outcome of the previous result, we'll compare it with the new result.
|
||||
// If the final outcome (success or failure) of the previous and the new result aren't the same, it means
|
||||
@ -286,7 +287,7 @@ func (s *Store) Insert(ep *endpoint.Endpoint, result *endpoint.Result) error {
|
||||
event := endpoint.NewEventFromResult(result)
|
||||
if err = s.insertEndpointEvent(tx, endpointID, event); err != nil {
|
||||
// Silently fail
|
||||
log.Printf("[sql.Insert] Failed to insert event=%s for group=%s; endpoint=%s: %s", event.Type, ep.Group, ep.Name, err.Error())
|
||||
log.Printf("[sql.Insert] Failed to insert event=%s for endpoint with key=%s: %s", event.Type, ep.Key(), err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -295,40 +296,40 @@ func (s *Store) Insert(ep *endpoint.Endpoint, result *endpoint.Result) error {
|
||||
// (since we're only deleting MaximumNumberOfEvents at a time instead of 1)
|
||||
if numberOfEvents > eventsCleanUpThreshold {
|
||||
if err = s.deleteOldEndpointEvents(tx, endpointID); err != nil {
|
||||
log.Printf("[sql.Insert] Failed to delete old events for group=%s; endpoint=%s: %s", ep.Group, ep.Name, err.Error())
|
||||
log.Printf("[sql.Insert] Failed to delete old events for endpoint with key=%s: %s", ep.Key(), err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
// Second, we need to insert the result.
|
||||
if err = s.insertEndpointResult(tx, endpointID, result); err != nil {
|
||||
log.Printf("[sql.Insert] Failed to insert result for group=%s; endpoint=%s: %s", ep.Group, ep.Name, err.Error())
|
||||
log.Printf("[sql.Insert] Failed to insert result for endpoint with key=%s: %s", ep.Key(), err.Error())
|
||||
_ = tx.Rollback() // If we can't insert the result, we'll rollback now since there's no point continuing
|
||||
return err
|
||||
}
|
||||
// Clean up old results
|
||||
numberOfResults, err := s.getNumberOfResultsByEndpointID(tx, endpointID)
|
||||
if err != nil {
|
||||
log.Printf("[sql.Insert] Failed to retrieve total number of results for group=%s; endpoint=%s: %s", ep.Group, ep.Name, err.Error())
|
||||
log.Printf("[sql.Insert] Failed to retrieve total number of results for endpoint with key=%s: %s", ep.Key(), err.Error())
|
||||
} else {
|
||||
if numberOfResults > resultsCleanUpThreshold {
|
||||
if err = s.deleteOldEndpointResults(tx, endpointID); err != nil {
|
||||
log.Printf("[sql.Insert] Failed to delete old results for group=%s; endpoint=%s: %s", ep.Group, ep.Name, err.Error())
|
||||
log.Printf("[sql.Insert] Failed to delete old results for endpoint with key=%s: %s", ep.Key(), err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
// Finally, we need to insert the uptime data.
|
||||
// Because the uptime data significantly outlives the results, we can't rely on the results for determining the uptime
|
||||
if err = s.updateEndpointUptime(tx, endpointID, result); err != nil {
|
||||
log.Printf("[sql.Insert] Failed to update uptime for group=%s; endpoint=%s: %s", ep.Group, ep.Name, err.Error())
|
||||
log.Printf("[sql.Insert] Failed to update uptime for endpoint with key=%s: %s", ep.Key(), err.Error())
|
||||
}
|
||||
// Clean up old uptime entries
|
||||
ageOfOldestUptimeEntry, err := s.getAgeOfOldestEndpointUptimeEntry(tx, endpointID)
|
||||
if err != nil {
|
||||
log.Printf("[sql.Insert] Failed to retrieve oldest endpoint uptime entry for group=%s; endpoint=%s: %s", ep.Group, ep.Name, err.Error())
|
||||
log.Printf("[sql.Insert] Failed to retrieve oldest endpoint uptime entry for endpoint with key=%s: %s", ep.Key(), err.Error())
|
||||
} else {
|
||||
if ageOfOldestUptimeEntry > uptimeCleanUpThreshold {
|
||||
if err = s.deleteOldUptimeEntries(tx, endpointID, time.Now().Add(-(uptimeRetention + time.Hour))); err != nil {
|
||||
log.Printf("[sql.Insert] Failed to delete old uptime entries for group=%s; endpoint=%s: %s", ep.Group, ep.Name, err.Error())
|
||||
log.Printf("[sql.Insert] Failed to delete old uptime entries for endpoint with key=%s: %s", ep.Key(), err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -374,6 +375,8 @@ func (s *Store) DeleteAllEndpointStatusesNotInKeys(keys []string) int {
|
||||
}
|
||||
if s.writeThroughCache != nil {
|
||||
// It's easier to just wipe out the entire cache than to try to find all keys that are not in the keys list
|
||||
// This only happens on start and during tests, so it's fine for us to just clear the cache without worrying
|
||||
// about performance
|
||||
_ = s.writeThroughCache.DeleteKeysByPattern("*")
|
||||
}
|
||||
// Return number of rows deleted
|
||||
@ -381,6 +384,111 @@ func (s *Store) DeleteAllEndpointStatusesNotInKeys(keys []string) int {
|
||||
return int(rowsAffects)
|
||||
}
|
||||
|
||||
// GetTriggeredEndpointAlert returns whether the triggered alert for the specified endpoint as well as the necessary information to resolve it
|
||||
func (s *Store) GetTriggeredEndpointAlert(ep *endpoint.Endpoint, alert *alert.Alert) (exists bool, resolveKey string, numberOfSuccessesInARow int, err error) {
|
||||
//log.Printf("[sql.GetTriggeredEndpointAlert] Getting triggered alert with checksum=%s for endpoint with key=%s", alert.Checksum(), ep.Key())
|
||||
err = s.db.QueryRow(
|
||||
"SELECT resolve_key, number_of_successes_in_a_row FROM endpoint_alerts_triggered WHERE endpoint_id = (SELECT endpoint_id FROM endpoints WHERE endpoint_key = $1 LIMIT 1) AND configuration_checksum = $2",
|
||||
ep.Key(),
|
||||
alert.Checksum(),
|
||||
).Scan(&resolveKey, &numberOfSuccessesInARow)
|
||||
if err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return false, "", 0, nil
|
||||
}
|
||||
return false, "", 0, err
|
||||
}
|
||||
return true, resolveKey, numberOfSuccessesInARow, nil
|
||||
}
|
||||
|
||||
// UpsertTriggeredEndpointAlert inserts/updates a triggered alert for an endpoint
|
||||
// Used for persistence of triggered alerts across application restarts
|
||||
func (s *Store) UpsertTriggeredEndpointAlert(ep *endpoint.Endpoint, triggeredAlert *alert.Alert) error {
|
||||
//log.Printf("[sql.UpsertTriggeredEndpointAlert] Upserting triggered alert with checksum=%s for endpoint with key=%s", triggeredAlert.Checksum(), ep.Key())
|
||||
tx, err := s.db.Begin()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
endpointID, err := s.getEndpointID(tx, ep)
|
||||
if err != nil {
|
||||
if errors.Is(err, common.ErrEndpointNotFound) {
|
||||
// Endpoint doesn't exist in the database, insert it
|
||||
// This shouldn't happen, but we'll handle it anyway
|
||||
if endpointID, err = s.insertEndpoint(tx, ep); err != nil {
|
||||
_ = tx.Rollback()
|
||||
log.Printf("[sql.UpsertTriggeredEndpointAlert] Failed to create endpoint with key=%s: %s", ep.Key(), err.Error())
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
_ = tx.Rollback()
|
||||
log.Printf("[sql.UpsertTriggeredEndpointAlert] Failed to retrieve id of endpoint with key=%s: %s", ep.Key(), err.Error())
|
||||
return err
|
||||
}
|
||||
}
|
||||
_, err = tx.Exec(
|
||||
`
|
||||
INSERT INTO endpoint_alerts_triggered (endpoint_id, configuration_checksum, resolve_key, number_of_successes_in_a_row)
|
||||
VALUES ($1, $2, $3, $4)
|
||||
ON CONFLICT(endpoint_id, configuration_checksum) DO UPDATE SET
|
||||
resolve_key = $3,
|
||||
number_of_successes_in_a_row = $4
|
||||
`,
|
||||
endpointID,
|
||||
triggeredAlert.Checksum(),
|
||||
triggeredAlert.ResolveKey,
|
||||
ep.NumberOfSuccessesInARow, // We only persist NumberOfSuccessesInARow, because all alerts in this table are already triggered
|
||||
)
|
||||
if err != nil {
|
||||
_ = tx.Rollback()
|
||||
log.Printf("[sql.UpsertTriggeredEndpointAlert] Failed to persist triggered alert for endpoint with key=%s: %s", ep.Key(), err.Error())
|
||||
return err
|
||||
}
|
||||
if err = tx.Commit(); err != nil {
|
||||
_ = tx.Rollback()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// DeleteTriggeredEndpointAlert deletes a triggered alert for an endpoint
|
||||
func (s *Store) DeleteTriggeredEndpointAlert(ep *endpoint.Endpoint, triggeredAlert *alert.Alert) error {
|
||||
//log.Printf("[sql.DeleteTriggeredEndpointAlert] Deleting triggered alert with checksum=%s for endpoint with key=%s", triggeredAlert.Checksum(), ep.Key())
|
||||
_, err := s.db.Exec("DELETE FROM endpoint_alerts_triggered WHERE configuration_checksum = $1 AND endpoint_id = (SELECT endpoint_id FROM endpoints WHERE endpoint_key = $2 LIMIT 1)", triggeredAlert.Checksum(), ep.Key())
|
||||
return err
|
||||
}
|
||||
|
||||
// DeleteAllTriggeredAlertsNotInChecksumsByEndpoint removes all triggered alerts owned by an endpoint whose alert
|
||||
// configurations are not provided in the checksums list.
|
||||
// This prevents triggered alerts that have been removed or modified from lingering in the database.
|
||||
func (s *Store) DeleteAllTriggeredAlertsNotInChecksumsByEndpoint(ep *endpoint.Endpoint, checksums []string) int {
|
||||
//log.Printf("[sql.DeleteAllTriggeredAlertsNotInChecksumsByEndpoint] Deleting triggered alerts for endpoint with key=%s that do not belong to any of checksums=%v", ep.Key(), checksums)
|
||||
var err error
|
||||
var result sql.Result
|
||||
if len(checksums) == 0 {
|
||||
// No checksums? Then it means there are no (enabled) alerts configured for that endpoint, so we can get rid of all
|
||||
// persisted triggered alerts for that endpoint
|
||||
result, err = s.db.Exec("DELETE FROM endpoint_alerts_triggered WHERE endpoint_id = (SELECT endpoint_id FROM endpoints WHERE endpoint_key = $1 LIMIT 1)", ep.Key())
|
||||
} else {
|
||||
args := make([]interface{}, 0, len(checksums)+1)
|
||||
args = append(args, ep.Key())
|
||||
query := `DELETE FROM endpoint_alerts_triggered
|
||||
WHERE endpoint_id = (SELECT endpoint_id FROM endpoints WHERE endpoint_key = $1 LIMIT 1)
|
||||
AND configuration_checksum NOT IN (`
|
||||
for i := range checksums {
|
||||
query += fmt.Sprintf("$%d,", i+2)
|
||||
args = append(args, checksums[i])
|
||||
}
|
||||
query = query[:len(query)-1] + ")" // Remove the last comma and add the closing parenthesis
|
||||
result, err = s.db.Exec(query, args...)
|
||||
}
|
||||
if err != nil {
|
||||
log.Printf("[sql.DeleteAllTriggeredAlertsNotInChecksumsByEndpoint] Failed to delete rows for endpoint with key=%s that do not belong to any of checksums=%v: %s", ep.Key(), checksums, err.Error())
|
||||
return 0
|
||||
}
|
||||
// Return number of rows deleted
|
||||
rowsAffects, _ := result.RowsAffected()
|
||||
return int(rowsAffects)
|
||||
}
|
||||
|
||||
// Clear deletes everything from the store
|
||||
func (s *Store) Clear() {
|
||||
_, _ = s.db.Exec("DELETE FROM endpoints")
|
||||
|
@ -1,9 +1,11 @@
|
||||
package sql
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/TwiN/gatus/v5/alerting/alert"
|
||||
"github.com/TwiN/gatus/v5/config/endpoint"
|
||||
"github.com/TwiN/gatus/v5/storage/store/common"
|
||||
"github.com/TwiN/gatus/v5/storage/store/common/paging"
|
||||
@ -81,13 +83,13 @@ var (
|
||||
)
|
||||
|
||||
func TestNewStore(t *testing.T) {
|
||||
if _, err := NewStore("", "TestNewStore.db", false); err != ErrDatabaseDriverNotSpecified {
|
||||
if _, err := NewStore("", t.TempDir()+"/TestNewStore.db", false); !errors.Is(err, ErrDatabaseDriverNotSpecified) {
|
||||
t.Error("expected error due to blank driver parameter")
|
||||
}
|
||||
if _, err := NewStore("sqlite", "", false); err != ErrPathNotSpecified {
|
||||
if _, err := NewStore("sqlite", "", false); !errors.Is(err, ErrPathNotSpecified) {
|
||||
t.Error("expected error due to blank path parameter")
|
||||
}
|
||||
if store, err := NewStore("sqlite", t.TempDir()+"/TestNewStore.db", false); err != nil {
|
||||
if store, err := NewStore("sqlite", t.TempDir()+"/TestNewStore.db", true); err != nil {
|
||||
t.Error("shouldn't have returned any error, got", err.Error())
|
||||
} else {
|
||||
_ = store.db.Close()
|
||||
@ -168,6 +170,40 @@ func TestStore_InsertCleansUpEventsAndResultsProperly(t *testing.T) {
|
||||
store.Clear()
|
||||
}
|
||||
|
||||
func TestStore_InsertWithCaching(t *testing.T) {
|
||||
store, _ := NewStore("sqlite", t.TempDir()+"/TestStore_InsertWithCaching.db", true)
|
||||
defer store.Close()
|
||||
// Add 2 results
|
||||
store.Insert(&testEndpoint, &testSuccessfulResult)
|
||||
store.Insert(&testEndpoint, &testSuccessfulResult)
|
||||
// Verify that they exist
|
||||
endpointStatuses, _ := store.GetAllEndpointStatuses(paging.NewEndpointStatusParams().WithResults(1, 20))
|
||||
if numberOfEndpointStatuses := len(endpointStatuses); numberOfEndpointStatuses != 1 {
|
||||
t.Fatalf("expected 1 EndpointStatus, got %d", numberOfEndpointStatuses)
|
||||
}
|
||||
if len(endpointStatuses[0].Results) != 2 {
|
||||
t.Fatalf("expected 2 results, got %d", len(endpointStatuses[0].Results))
|
||||
}
|
||||
// Add 2 more results
|
||||
store.Insert(&testEndpoint, &testUnsuccessfulResult)
|
||||
store.Insert(&testEndpoint, &testUnsuccessfulResult)
|
||||
// Verify that they exist
|
||||
endpointStatuses, _ = store.GetAllEndpointStatuses(paging.NewEndpointStatusParams().WithResults(1, 20))
|
||||
if numberOfEndpointStatuses := len(endpointStatuses); numberOfEndpointStatuses != 1 {
|
||||
t.Fatalf("expected 1 EndpointStatus, got %d", numberOfEndpointStatuses)
|
||||
}
|
||||
if len(endpointStatuses[0].Results) != 4 {
|
||||
t.Fatalf("expected 4 results, got %d", len(endpointStatuses[0].Results))
|
||||
}
|
||||
// Clear the store, which should also clear the cache
|
||||
store.Clear()
|
||||
// Verify that they no longer exist
|
||||
endpointStatuses, _ = store.GetAllEndpointStatuses(paging.NewEndpointStatusParams().WithResults(1, 20))
|
||||
if numberOfEndpointStatuses := len(endpointStatuses); numberOfEndpointStatuses != 0 {
|
||||
t.Fatalf("expected 0 EndpointStatus, got %d", numberOfEndpointStatuses)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStore_Persistence(t *testing.T) {
|
||||
path := t.TempDir() + "/TestStore_Persistence.db"
|
||||
store, _ := NewStore("sqlite", path, false)
|
||||
@ -368,10 +404,10 @@ func TestStore_NoRows(t *testing.T) {
|
||||
defer store.Close()
|
||||
tx, _ := store.db.Begin()
|
||||
defer tx.Rollback()
|
||||
if _, err := store.getLastEndpointResultSuccessValue(tx, 1); err != errNoRowsReturned {
|
||||
if _, err := store.getLastEndpointResultSuccessValue(tx, 1); !errors.Is(err, errNoRowsReturned) {
|
||||
t.Errorf("should've %v, got %v", errNoRowsReturned, err)
|
||||
}
|
||||
if _, err := store.getAgeOfOldestEndpointUptimeEntry(tx, 1); err != errNoRowsReturned {
|
||||
if _, err := store.getAgeOfOldestEndpointUptimeEntry(tx, 1); !errors.Is(err, errNoRowsReturned) {
|
||||
t.Errorf("should've %v, got %v", errNoRowsReturned, err)
|
||||
}
|
||||
}
|
||||
@ -564,3 +600,131 @@ func TestCacheKey(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestTriggeredEndpointAlertsPersistence(t *testing.T) {
|
||||
store, _ := NewStore("sqlite", t.TempDir()+"/TestTriggeredEndpointAlertsPersistence.db", false)
|
||||
defer store.Close()
|
||||
yes, desc := false, "description"
|
||||
ep := testEndpoint
|
||||
ep.NumberOfSuccessesInARow = 0
|
||||
alrt := &alert.Alert{
|
||||
Type: alert.TypePagerDuty,
|
||||
Enabled: &yes,
|
||||
FailureThreshold: 4,
|
||||
SuccessThreshold: 2,
|
||||
Description: &desc,
|
||||
SendOnResolved: &yes,
|
||||
Triggered: true,
|
||||
ResolveKey: "1234567",
|
||||
}
|
||||
// Alert just triggered, so NumberOfSuccessesInARow is 0
|
||||
if err := store.UpsertTriggeredEndpointAlert(&ep, alrt); err != nil {
|
||||
t.Fatal("expected no error, got", err.Error())
|
||||
}
|
||||
exists, resolveKey, numberOfSuccessesInARow, err := store.GetTriggeredEndpointAlert(&ep, alrt)
|
||||
if err != nil {
|
||||
t.Fatal("expected no error, got", err.Error())
|
||||
}
|
||||
if !exists {
|
||||
t.Error("expected triggered alert to exist")
|
||||
}
|
||||
if resolveKey != alrt.ResolveKey {
|
||||
t.Errorf("expected resolveKey %s, got %s", alrt.ResolveKey, resolveKey)
|
||||
}
|
||||
if numberOfSuccessesInARow != ep.NumberOfSuccessesInARow {
|
||||
t.Errorf("expected persisted NumberOfSuccessesInARow to be %d, got %d", ep.NumberOfSuccessesInARow, numberOfSuccessesInARow)
|
||||
}
|
||||
// Endpoint just had a successful evaluation, so NumberOfSuccessesInARow is now 1
|
||||
ep.NumberOfSuccessesInARow++
|
||||
if err := store.UpsertTriggeredEndpointAlert(&ep, alrt); err != nil {
|
||||
t.Fatal("expected no error, got", err.Error())
|
||||
}
|
||||
exists, resolveKey, numberOfSuccessesInARow, err = store.GetTriggeredEndpointAlert(&ep, alrt)
|
||||
if err != nil {
|
||||
t.Error("expected no error, got", err.Error())
|
||||
}
|
||||
if !exists {
|
||||
t.Error("expected triggered alert to exist")
|
||||
}
|
||||
if resolveKey != alrt.ResolveKey {
|
||||
t.Errorf("expected resolveKey %s, got %s", alrt.ResolveKey, resolveKey)
|
||||
}
|
||||
if numberOfSuccessesInARow != ep.NumberOfSuccessesInARow {
|
||||
t.Errorf("expected persisted NumberOfSuccessesInARow to be %d, got %d", ep.NumberOfSuccessesInARow, numberOfSuccessesInARow)
|
||||
}
|
||||
// Simulate the endpoint having another successful evaluation, which means the alert is now resolved,
|
||||
// and we should delete the triggered alert from the store
|
||||
ep.NumberOfSuccessesInARow++
|
||||
if err := store.DeleteTriggeredEndpointAlert(&ep, alrt); err != nil {
|
||||
t.Fatal("expected no error, got", err.Error())
|
||||
}
|
||||
exists, _, _, err = store.GetTriggeredEndpointAlert(&ep, alrt)
|
||||
if err != nil {
|
||||
t.Error("expected no error, got", err.Error())
|
||||
}
|
||||
if exists {
|
||||
t.Error("expected triggered alert to no longer exist as it has been deleted")
|
||||
}
|
||||
}
|
||||
|
||||
func TestStore_DeleteAllTriggeredAlertsNotInChecksumsByEndpoint(t *testing.T) {
|
||||
store, _ := NewStore("sqlite", t.TempDir()+"/TestStore_DeleteAllTriggeredAlertsNotInChecksumsByEndpoint.db", false)
|
||||
defer store.Close()
|
||||
yes, desc := false, "description"
|
||||
ep1 := testEndpoint
|
||||
ep1.Name = "ep1"
|
||||
ep2 := testEndpoint
|
||||
ep2.Name = "ep2"
|
||||
alert1 := alert.Alert{
|
||||
Type: alert.TypePagerDuty,
|
||||
Enabled: &yes,
|
||||
FailureThreshold: 4,
|
||||
SuccessThreshold: 2,
|
||||
Description: &desc,
|
||||
SendOnResolved: &yes,
|
||||
Triggered: true,
|
||||
ResolveKey: "1234567",
|
||||
}
|
||||
alert2 := alert1
|
||||
alert2.Type, alert2.ResolveKey = alert.TypeSlack, ""
|
||||
alert3 := alert2
|
||||
if err := store.UpsertTriggeredEndpointAlert(&ep1, &alert1); err != nil {
|
||||
t.Fatal("expected no error, got", err.Error())
|
||||
}
|
||||
if err := store.UpsertTriggeredEndpointAlert(&ep1, &alert2); err != nil {
|
||||
t.Fatal("expected no error, got", err.Error())
|
||||
}
|
||||
if err := store.UpsertTriggeredEndpointAlert(&ep2, &alert3); err != nil {
|
||||
t.Fatal("expected no error, got", err.Error())
|
||||
}
|
||||
if exists, _, _, _ := store.GetTriggeredEndpointAlert(&ep1, &alert1); !exists {
|
||||
t.Error("expected alert1 to have been deleted")
|
||||
}
|
||||
if exists, _, _, _ := store.GetTriggeredEndpointAlert(&ep1, &alert2); !exists {
|
||||
t.Error("expected alert2 to exist for ep1")
|
||||
}
|
||||
if exists, _, _, _ := store.GetTriggeredEndpointAlert(&ep2, &alert3); !exists {
|
||||
t.Error("expected alert3 to exist for ep2")
|
||||
}
|
||||
// Now we simulate the alert configuration being updated, and the alert being resolved
|
||||
if deleted := store.DeleteAllTriggeredAlertsNotInChecksumsByEndpoint(&ep1, []string{alert2.Checksum()}); deleted != 1 {
|
||||
t.Errorf("expected 1 triggered alert to be deleted, got %d", deleted)
|
||||
}
|
||||
if exists, _, _, _ := store.GetTriggeredEndpointAlert(&ep1, &alert1); exists {
|
||||
t.Error("expected alert1 to have been deleted")
|
||||
}
|
||||
if exists, _, _, _ := store.GetTriggeredEndpointAlert(&ep1, &alert2); !exists {
|
||||
t.Error("expected alert2 to exist for ep1")
|
||||
}
|
||||
if exists, _, _, _ := store.GetTriggeredEndpointAlert(&ep2, &alert3); !exists {
|
||||
t.Error("expected alert3 to exist for ep2")
|
||||
}
|
||||
// Now let's just assume all alerts for ep1 were removed
|
||||
if deleted := store.DeleteAllTriggeredAlertsNotInChecksumsByEndpoint(&ep1, []string{}); deleted != 1 {
|
||||
t.Errorf("expected 1 triggered alert to be deleted, got %d", deleted)
|
||||
}
|
||||
// Make sure the alert for ep2 still exists
|
||||
if exists, _, _, _ := store.GetTriggeredEndpointAlert(&ep2, &alert3); !exists {
|
||||
t.Error("expected alert3 to exist for ep2")
|
||||
}
|
||||
}
|
||||
|
@ -5,6 +5,7 @@ import (
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"github.com/TwiN/gatus/v5/alerting/alert"
|
||||
"github.com/TwiN/gatus/v5/config/endpoint"
|
||||
"github.com/TwiN/gatus/v5/storage"
|
||||
"github.com/TwiN/gatus/v5/storage/store/common/paging"
|
||||
@ -41,6 +42,21 @@ type Store interface {
|
||||
// Used to delete endpoints that have been persisted but are no longer part of the configured endpoints
|
||||
DeleteAllEndpointStatusesNotInKeys(keys []string) int
|
||||
|
||||
// GetTriggeredEndpointAlert returns whether the triggered alert for the specified endpoint as well as the necessary information to resolve it
|
||||
GetTriggeredEndpointAlert(ep *endpoint.Endpoint, alert *alert.Alert) (exists bool, resolveKey string, numberOfSuccessesInARow int, err error)
|
||||
|
||||
// UpsertTriggeredEndpointAlert inserts/updates a triggered alert for an endpoint
|
||||
// Used for persistence of triggered alerts across application restarts
|
||||
UpsertTriggeredEndpointAlert(ep *endpoint.Endpoint, triggeredAlert *alert.Alert) error
|
||||
|
||||
// DeleteTriggeredEndpointAlert deletes a triggered alert for an endpoint
|
||||
DeleteTriggeredEndpointAlert(ep *endpoint.Endpoint, triggeredAlert *alert.Alert) error
|
||||
|
||||
// DeleteAllTriggeredAlertsNotInChecksumsByEndpoint removes all triggered alerts owned by an endpoint whose alert
|
||||
// configurations are not provided in the checksums list.
|
||||
// This prevents triggered alerts that have been removed or modified from lingering in the database.
|
||||
DeleteAllTriggeredAlertsNotInChecksumsByEndpoint(ep *endpoint.Endpoint, checksums []string) int
|
||||
|
||||
// Clear deletes everything from the store
|
||||
Clear()
|
||||
|
||||
|
Reference in New Issue
Block a user