Implement service uptime support for database store

This commit is contained in:
TwinProduction
2021-07-14 01:40:27 -04:00
committed by Chris
parent 943d0a19d1
commit a4c69d6fc3
2 changed files with 282 additions and 18 deletions

View File

@ -18,6 +18,12 @@ import (
const (
arraySeparator = "|~|"
uptimeCleanUpThreshold = 10 * 24 * time.Hour // Maximum uptime age before triggering a clean up
eventsCleanUpThreshold = core.MaximumNumberOfEvents * 2 // Maximum number of events before triggering a clean up
resultsCleanUpThreshold = core.MaximumNumberOfResults * 2 // Maximum number of results before triggering a clean up
uptimeRetention = 7 * 24 * time.Hour
)
var (
@ -228,16 +234,14 @@ func (s *Store) Insert(service *core.Service, result *core.Result) {
log.Printf("[database][Insert] Failed to insert event=%s for group=%s; service=%s: %s", core.EventStart, service.Group, service.Name, err.Error())
}
event := generateEventBasedOnResult(result)
err = s.insertEvent(tx, serviceID, event)
if err != nil {
if err = s.insertEvent(tx, serviceID, event); err != nil {
// Silently fail
log.Printf("[database][Insert] Failed to insert event=%s for group=%s; service=%s: %s", event.Type, service.Group, service.Name, err.Error())
}
} else {
// Get the success value of the previous result
var lastResultSuccess bool
lastResultSuccess, err = s.getLastServiceResultSuccessValue(tx, serviceID)
if err != nil {
if lastResultSuccess, err = s.getLastServiceResultSuccessValue(tx, serviceID); err != nil {
log.Printf("[database][Insert] Failed to retrieve outcome of previous result for group=%s; service=%s: %s", service.Group, service.Name, err.Error())
} else {
// If we managed to retrieve the outcome of the previous result, we'll compare it with the new result.
@ -246,8 +250,7 @@ func (s *Store) Insert(service *core.Service, result *core.Result) {
// an event to mark the change in state
if lastResultSuccess != result.Success {
event := generateEventBasedOnResult(result)
err = s.insertEvent(tx, serviceID, event)
if err != nil {
if err = s.insertEvent(tx, serviceID, event); err != nil {
// Silently fail
log.Printf("[database][Insert] Failed to insert event=%s for group=%s; service=%s: %s", event.Type, service.Group, service.Name, err.Error())
}
@ -256,29 +259,43 @@ func (s *Store) Insert(service *core.Service, result *core.Result) {
// Clean up old events if there's more than twice the maximum number of events
// This lets us both keep the table clean without impacting performance too much
// (since we're only deleting MaximumNumberOfEvents at a time instead of 1)
if numberOfEvents > core.MaximumNumberOfEvents*2 {
err = s.deleteOldServiceEvents(tx, serviceID)
if err != nil {
if numberOfEvents > resultsCleanUpThreshold {
if err = s.deleteOldServiceEvents(tx, serviceID); err != nil {
log.Printf("[database][Insert] Failed to delete old events for group=%s; service=%s: %s", service.Group, service.Name, err.Error())
}
}
}
// Second, we need to insert the result.
err = s.insertResult(tx, serviceID, result)
if err != nil {
if err = s.insertResult(tx, serviceID, result); err != nil {
log.Printf("[database][Insert] Failed to insert result for group=%s; service=%s: %s", service.Group, service.Name, err.Error())
_ = tx.Rollback()
_ = tx.Rollback() // If we can't insert the result, we'll rollback now since there's no point continuing
return
}
// Clean up old results
numberOfResults, err := s.getNumberOfResultsByServiceID(tx, serviceID)
if err != nil {
log.Printf("[database][Insert] Failed to retrieve total number of results for group=%s; service=%s: %s", service.Group, service.Name, err.Error())
} else {
if numberOfResults > eventsCleanUpThreshold {
if err = s.deleteOldServiceResults(tx, serviceID); err != nil {
log.Printf("[database][Insert] Failed to delete old results for group=%s; service=%s: %s", service.Group, service.Name, err.Error())
}
}
}
if numberOfResults > core.MaximumNumberOfResults*2 {
err = s.deleteOldServiceResults(tx, serviceID)
if err != nil {
log.Printf("[database][Insert] Failed to delete old results for group=%s; service=%s: %s", service.Group, service.Name, err.Error())
// Finally, we need to insert the uptime data.
// Because the uptime data significantly outlives the results, we can't rely on the results for determining the uptime
if err = s.updateServiceUptime(tx, serviceID, result); err != nil {
log.Printf("[database][Insert] Failed to update uptime for group=%s; service=%s: %s", service.Group, service.Name, err.Error())
}
// Clean up old uptime entries
ageOfOldestUptimeEntry, err := s.getAgeOfOldestServiceUptimeEntry(tx, serviceID)
if err != nil {
log.Printf("[database][Insert] Failed to retrieve oldest service uptime entry for group=%s; service=%s: %s", service.Group, service.Name, err.Error())
} else {
if ageOfOldestUptimeEntry > uptimeCleanUpThreshold {
if err = s.deleteOldUptimeEntries(tx, serviceID, time.Now().Add(-(uptimeRetention + time.Hour))); err != nil {
log.Printf("[database][Insert] Failed to delete old uptime entries for group=%s; service=%s: %s", service.Group, service.Name, err.Error())
}
}
}
//log.Printf("[database][Insert] Successfully inserted result in duration=%dns", time.Since(start).Nanoseconds())
@ -381,6 +398,33 @@ func (s *Store) insertConditionResults(tx *sql.Tx, serviceResultID int64, condit
return nil
}
func (s *Store) updateServiceUptime(tx *sql.Tx, serviceID int64, result *core.Result) error {
unixTimestampFlooredAtHour := result.Timestamp.Truncate(time.Hour).Unix()
var successfulExecutions int
if result.Success {
successfulExecutions = 1
}
_, err := tx.Exec(
`
INSERT INTO service_uptime (service_id, hour_unix_timestamp, total_executions, successful_executions, total_response_time)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT(service_id, hour_unix_timestamp) DO UPDATE SET
total_executions = excluded.total_executions + total_executions,
successful_executions = excluded.successful_executions + successful_executions,
total_response_time = excluded.total_response_time + total_response_time
`,
serviceID,
unixTimestampFlooredAtHour,
1,
successfulExecutions,
result.Duration.Milliseconds(),
)
if err != nil {
return err
}
return nil
}
func (s *Store) getAllServiceKeys(tx *sql.Tx) (keys []string, err error) {
rows, err := tx.Query("SELECT service_key FROM service")
if err != nil {
@ -404,7 +448,7 @@ func (s *Store) getServiceStatusByKey(tx *sql.Tx, key string, eventsPage, events
Name: serviceName,
Group: serviceGroup,
Key: key,
Uptime: nil,
Uptime: &core.Uptime{},
}
if eventsPageSize > 0 {
if serviceStatus.Events, err = s.getEventsByServiceID(tx, serviceID, eventsPage, eventsPageSize); err != nil {
@ -417,7 +461,10 @@ func (s *Store) getServiceStatusByKey(tx *sql.Tx, key string, eventsPage, events
}
}
if includeUptime {
// TODO
now := time.Now()
serviceStatus.Uptime.LastHour, _, _ = s.getServiceUptime(tx, serviceID, now.Add(-time.Hour), now)
serviceStatus.Uptime.LastTwentyFourHours, _, _ = s.getServiceUptime(tx, serviceID, now.Add(-24*time.Hour), now)
serviceStatus.Uptime.LastSevenDays, _, _ = s.getServiceUptime(tx, serviceID, now.Add(-7*24*time.Hour), now)
}
return serviceStatus, nil
}
@ -512,6 +559,35 @@ func (s *Store) getResultsByServiceID(tx *sql.Tx, serviceID int64, page, pageSiz
return
}
func (s *Store) getServiceUptime(tx *sql.Tx, serviceID int64, from, to time.Time) (uptime float64, avgResponseTime time.Duration, err error) {
rows, err := tx.Query(
`
SELECT SUM(total_executions), SUM(successful_executions), SUM(total_response_time)
FROM service_uptime
WHERE service_id = $1
AND hour_unix_timestamp >= $2
AND hour_unix_timestamp <= $3
`,
serviceID,
from.Unix(),
to.Unix(),
)
if err != nil {
return 0, 0, err
}
var totalExecutions, totalSuccessfulExecutions, totalResponseTime int
for rows.Next() {
_ = rows.Scan(&totalExecutions, &totalSuccessfulExecutions, &totalResponseTime)
break
}
_ = rows.Close()
if totalExecutions > 0 {
uptime = float64(totalSuccessfulExecutions) / float64(totalExecutions)
avgResponseTime = time.Duration(float64(totalResponseTime)/float64(totalExecutions)) * time.Millisecond
}
return
}
func (s *Store) getServiceID(tx *sql.Tx, service *core.Service) (int64, error) {
rows, err := tx.Query("SELECT service_id FROM service WHERE service_key = $1", service.Key())
if err != nil {
@ -569,6 +645,34 @@ func (s *Store) getNumberOfResultsByServiceID(tx *sql.Tx, serviceID int64) (int6
return numberOfResults, nil
}
func (s *Store) getAgeOfOldestServiceUptimeEntry(tx *sql.Tx, serviceID int64) (time.Duration, error) {
rows, err := tx.Query(
`
SELECT hour_unix_timestamp
FROM service_uptime
WHERE service_id = $1
ORDER BY hour_unix_timestamp
LIMIT 1
`,
serviceID,
)
if err != nil {
return 0, err
}
var oldestServiceUptimeUnixTimestamp int64
var found bool
for rows.Next() {
_ = rows.Scan(&oldestServiceUptimeUnixTimestamp)
found = true
break
}
_ = rows.Close()
if !found {
return 0, errNoRowsReturned
}
return time.Since(time.Unix(oldestServiceUptimeUnixTimestamp, 0)), nil
}
func (s *Store) getLastServiceResultSuccessValue(tx *sql.Tx, serviceID int64) (bool, error) {
rows, err := tx.Query("SELECT success FROM service_result WHERE service_id = $1 ORDER BY service_result_id DESC LIMIT 1", serviceID)
if err != nil {
@ -637,3 +741,13 @@ func (s *Store) deleteOldServiceResults(tx *sql.Tx, serviceID int64) error {
//log.Printf("deleted %d rows from service_result", rowsAffected)
return nil
}
func (s *Store) deleteOldUptimeEntries(tx *sql.Tx, serviceID int64, maxAge time.Time) error {
_, err := tx.Exec("DELETE FROM service_uptime WHERE service_id = $1 AND hour_unix_timestamp < $2", serviceID, maxAge.Unix())
//if err != nil {
// return err
//}
//rowsAffected, _ := result.RowsAffected()
//log.Printf("deleted %d rows from service_uptime", rowsAffected)
return err
}