feat(storage): Support 30d badges (#836)
* feat(storage): Add support for 30d uptime badge Fix #714 * Fix typo * Fix test * Fix typo * Improve implementation * Add check in existing test * Add extra test to ensure functionality works * Add support for 30d response time chart too
This commit is contained in:
@ -28,11 +28,13 @@ const (
|
||||
// for aesthetic purposes, I deemed it wasn't worth the performance impact of yet another one-to-many table.
|
||||
arraySeparator = "|~|"
|
||||
|
||||
uptimeCleanUpThreshold = 10 * 24 * time.Hour // Maximum uptime age before triggering a cleanup
|
||||
eventsCleanUpThreshold = common.MaximumNumberOfEvents + 10 // Maximum number of events before triggering a cleanup
|
||||
resultsCleanUpThreshold = common.MaximumNumberOfResults + 10 // Maximum number of results before triggering a cleanup
|
||||
|
||||
uptimeRetention = 7 * 24 * time.Hour
|
||||
uptimeTotalEntriesMergeThreshold = 100 // Maximum number of uptime entries before triggering a merge
|
||||
uptimeAgeCleanUpThreshold = 32 * 24 * time.Hour // Maximum uptime age before triggering a cleanup
|
||||
uptimeRetention = 30 * 24 * time.Hour // Minimum duration that must be kept to operate as intended
|
||||
uptimeHourlyBuffer = 48 * time.Hour // Number of hours to buffer from now when determining which hourly uptime entries can be merged into daily uptime entries
|
||||
|
||||
cacheTTL = 10 * time.Minute
|
||||
)
|
||||
@ -322,12 +324,27 @@ func (s *Store) Insert(ep *endpoint.Endpoint, result *endpoint.Result) error {
|
||||
if err = s.updateEndpointUptime(tx, endpointID, result); err != nil {
|
||||
log.Printf("[sql.Insert] Failed to update uptime for endpoint with key=%s: %s", ep.Key(), err.Error())
|
||||
}
|
||||
// Clean up old uptime entries
|
||||
// Merge hourly uptime entries that can be merged into daily entries and clean up old uptime entries
|
||||
numberOfUptimeEntries, err := s.getNumberOfUptimeEntriesByEndpointID(tx, endpointID)
|
||||
if err != nil {
|
||||
log.Printf("[sql.Insert] Failed to retrieve total number of uptime entries for endpoint with key=%s: %s", ep.Key(), err.Error())
|
||||
} else {
|
||||
// Merge older hourly uptime entries into daily uptime entries if we have more than uptimeTotalEntriesMergeThreshold
|
||||
if numberOfUptimeEntries >= uptimeTotalEntriesMergeThreshold {
|
||||
log.Printf("[sql.Insert] Merging hourly uptime entries for endpoint with key=%s; This is a lot of work, it shouldn't happen too often", ep.Key())
|
||||
if err = s.mergeHourlyUptimeEntriesOlderThanMergeThresholdIntoDailyUptimeEntries(tx, endpointID); err != nil {
|
||||
log.Printf("[sql.Insert] Failed to merge hourly uptime entries for endpoint with key=%s: %s", ep.Key(), err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
// Clean up outdated uptime entries
|
||||
// In most cases, this would be handled by mergeHourlyUptimeEntriesOlderThanMergeThresholdIntoDailyUptimeEntries,
|
||||
// but if Gatus was temporarily shut down, we might have some old entries that need to be cleaned up
|
||||
ageOfOldestUptimeEntry, err := s.getAgeOfOldestEndpointUptimeEntry(tx, endpointID)
|
||||
if err != nil {
|
||||
log.Printf("[sql.Insert] Failed to retrieve oldest endpoint uptime entry for endpoint with key=%s: %s", ep.Key(), err.Error())
|
||||
} else {
|
||||
if ageOfOldestUptimeEntry > uptimeCleanUpThreshold {
|
||||
if ageOfOldestUptimeEntry > uptimeAgeCleanUpThreshold {
|
||||
if err = s.deleteOldUptimeEntries(tx, endpointID, time.Now().Add(-(uptimeRetention + time.Hour))); err != nil {
|
||||
log.Printf("[sql.Insert] Failed to delete old uptime entries for endpoint with key=%s: %s", ep.Key(), err.Error())
|
||||
}
|
||||
@ -865,6 +882,12 @@ func (s *Store) getNumberOfResultsByEndpointID(tx *sql.Tx, endpointID int64) (in
|
||||
return numberOfResults, err
|
||||
}
|
||||
|
||||
func (s *Store) getNumberOfUptimeEntriesByEndpointID(tx *sql.Tx, endpointID int64) (int64, error) {
|
||||
var numberOfUptimeEntries int64
|
||||
err := tx.QueryRow("SELECT COUNT(1) FROM endpoint_uptimes WHERE endpoint_id = $1", endpointID).Scan(&numberOfUptimeEntries)
|
||||
return numberOfUptimeEntries, err
|
||||
}
|
||||
|
||||
func (s *Store) getAgeOfOldestEndpointUptimeEntry(tx *sql.Tx, endpointID int64) (time.Duration, error) {
|
||||
rows, err := tx.Query(
|
||||
`
|
||||
@ -948,6 +971,92 @@ func (s *Store) deleteOldUptimeEntries(tx *sql.Tx, endpointID int64, maxAge time
|
||||
return err
|
||||
}
|
||||
|
||||
// mergeHourlyUptimeEntriesOlderThanMergeThresholdIntoDailyUptimeEntries merges all hourly uptime entries older than
|
||||
// uptimeHourlyMergeThreshold from now into daily uptime entries by summing all hourly entries of the same day into a
|
||||
// single entry.
|
||||
//
|
||||
// This effectively limits the number of uptime entries to (48+(n-2)) where 48 is for the first 48 entries with hourly
|
||||
// entries (defined by uptimeHourlyBuffer) and n is the number of days for all entries older than 48 hours.
|
||||
// Supporting 30d of entries would then result in far less than 24*30=720 entries.
|
||||
func (s *Store) mergeHourlyUptimeEntriesOlderThanMergeThresholdIntoDailyUptimeEntries(tx *sql.Tx, endpointID int64) error {
|
||||
// Calculate timestamp of the first full day of uptime entries that would not impact the uptime calculation for 24h badges
|
||||
// The logic is that once at least 48 hours passed, we:
|
||||
// - No longer need to worry about keeping hourly entries
|
||||
// - Don't have to worry about new hourly entries being inserted, as the day has already passed
|
||||
// which implies that no matter at what hour of the day we are, any timestamp + 48h floored to the current day
|
||||
// will never impact the 24h uptime badge calculation
|
||||
now := time.Now()
|
||||
minThreshold := now.Add(-uptimeHourlyBuffer)
|
||||
minThreshold = time.Date(minThreshold.Year(), minThreshold.Month(), minThreshold.Day(), 0, 0, 0, 0, minThreshold.Location())
|
||||
maxThreshold := now.Add(-uptimeRetention)
|
||||
// Get all uptime entries older than uptimeHourlyMergeThreshold
|
||||
rows, err := tx.Query(
|
||||
`
|
||||
SELECT hour_unix_timestamp, total_executions, successful_executions, total_response_time
|
||||
FROM endpoint_uptimes
|
||||
WHERE endpoint_id = $1
|
||||
AND hour_unix_timestamp < $2
|
||||
AND hour_unix_timestamp >= $3
|
||||
`,
|
||||
endpointID,
|
||||
minThreshold.Unix(),
|
||||
maxThreshold.Unix(),
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
type Entry struct {
|
||||
totalExecutions int
|
||||
successfulExecutions int
|
||||
totalResponseTime int
|
||||
}
|
||||
dailyEntries := make(map[int64]*Entry)
|
||||
for rows.Next() {
|
||||
var unixTimestamp int64
|
||||
entry := Entry{}
|
||||
if err = rows.Scan(&unixTimestamp, &entry.totalExecutions, &entry.successfulExecutions, &entry.totalResponseTime); err != nil {
|
||||
return err
|
||||
}
|
||||
timestamp := time.Unix(unixTimestamp, 0)
|
||||
unixTimestampFlooredAtDay := time.Date(timestamp.Year(), timestamp.Month(), timestamp.Day(), 0, 0, 0, 0, timestamp.Location()).Unix()
|
||||
if dailyEntry := dailyEntries[unixTimestampFlooredAtDay]; dailyEntry == nil {
|
||||
dailyEntries[unixTimestampFlooredAtDay] = &entry
|
||||
} else {
|
||||
dailyEntries[unixTimestampFlooredAtDay].totalExecutions += entry.totalExecutions
|
||||
dailyEntries[unixTimestampFlooredAtDay].successfulExecutions += entry.successfulExecutions
|
||||
dailyEntries[unixTimestampFlooredAtDay].totalResponseTime += entry.totalResponseTime
|
||||
}
|
||||
}
|
||||
// Delete older hourly uptime entries
|
||||
_, err = tx.Exec("DELETE FROM endpoint_uptimes WHERE endpoint_id = $1 AND hour_unix_timestamp < $2", endpointID, minThreshold.Unix())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Insert new daily uptime entries
|
||||
for unixTimestamp, entry := range dailyEntries {
|
||||
_, err = tx.Exec(
|
||||
`
|
||||
INSERT INTO endpoint_uptimes (endpoint_id, hour_unix_timestamp, total_executions, successful_executions, total_response_time)
|
||||
VALUES ($1, $2, $3, $4, $5)
|
||||
ON CONFLICT(endpoint_id, hour_unix_timestamp) DO UPDATE SET
|
||||
total_executions = $3,
|
||||
successful_executions = $4,
|
||||
total_response_time = $5
|
||||
`,
|
||||
endpointID,
|
||||
unixTimestamp,
|
||||
entry.totalExecutions,
|
||||
entry.successfulExecutions,
|
||||
entry.totalResponseTime,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
// TODO: Find a way to ignore entries that were already merged?
|
||||
return nil
|
||||
}
|
||||
|
||||
func generateCacheKey(endpointKey string, p *paging.EndpointStatusParams) string {
|
||||
return fmt.Sprintf("%s-%d-%d-%d-%d", endpointKey, p.EventsPage, p.EventsPageSize, p.ResultsPage, p.ResultsPageSize)
|
||||
}
|
||||
|
Reference in New Issue
Block a user