From bc42d15625b66b05d29067f1bc11473f782a8941 Mon Sep 17 00:00:00 2001 From: TwinProduction Date: Mon, 12 Jul 2021 04:25:25 -0400 Subject: [PATCH] Automatically clear up old events --- storage/store/database/database.go | 101 ++++++++++++++++++++++------- 1 file changed, 77 insertions(+), 24 deletions(-) diff --git a/storage/store/database/database.go b/storage/store/database/database.go index f67f1f30..5bdda4ba 100644 --- a/storage/store/database/database.go +++ b/storage/store/database/database.go @@ -4,12 +4,18 @@ import ( "database/sql" "errors" "log" + "strings" + "time" "github.com/TwinProduction/gatus/core" "github.com/TwinProduction/gatus/util" _ "modernc.org/sqlite" ) +const ( + arraySeparator = "|~|" +) + var ( // ErrFilePathNotSpecified is the error returned when path parameter passed in NewStore is blank ErrFilePathNotSpecified = errors.New("file path cannot be empty") @@ -105,6 +111,8 @@ func (s *Store) createSchema() error { // TODO: add parameter event and uptime & only fetch them if necessary +// GetAllServiceStatusesWithResultPagination returns all monitored core.ServiceStatus +// with a subset of core.Result defined by the page and pageSize parameters func (s *Store) GetAllServiceStatusesWithResultPagination(page, pageSize int) map[string]*core.ServiceStatus { panic("implement me") } @@ -121,17 +129,17 @@ func (s *Store) GetServiceStatusByKey(key string) *core.ServiceStatus { return nil } serviceStatus := &core.ServiceStatus{ - Name: serviceName, - Group: serviceGroup, - Key: key, - Results: nil, - Events: nil, - Uptime: nil, + Name: serviceName, + Group: serviceGroup, + Key: key, + Uptime: nil, } if serviceStatus.Events, err = s.getEventsByServiceID(serviceID); err != nil { log.Printf("[database][GetServiceStatusByKey] Failed to retrieve events for key=%s: %s", key, err.Error()) } serviceStatus.Results, err = s.getResultsByServiceID(serviceID) + // TODO: populate Uptime + // TODO: add flag to decide whether to retrieve uptime or not return serviceStatus } @@ -150,7 +158,7 @@ func (s *Store) getServiceIDNameAndGroupByKey(key string) (id int64, group, name } func (s *Store) getEventsByServiceID(serviceID int64) (events []*core.Event, err error) { - rows, err := s.db.Query("SELECT event_type, event_timestamp FROM service_event WHERE service_id = $1 ORDER BY event_timestamp DESC LIMIT $2", serviceID, core.MaximumNumberOfEvents) + rows, err := s.db.Query("SELECT event_type, event_timestamp FROM service_event WHERE service_id = $1 ORDER BY service_event_id DESC LIMIT $2", serviceID, core.MaximumNumberOfEvents) if err != nil { return nil, err } @@ -164,8 +172,9 @@ func (s *Store) getEventsByServiceID(serviceID int64) (events []*core.Event, err } func (s *Store) getResultsByServiceID(serviceID int64) (results []*core.Result, err error) { - rows, err := s.db.Query(` - SELECT service_result_id, success, connected, status, dns_rcode, certificate_expiration, hostname, ip, duration, timestamp + rows, err := s.db.Query( + ` + SELECT service_result_id, success, errors, connected, status, dns_rcode, certificate_expiration, hostname, ip, duration, timestamp FROM service_result WHERE service_id = $1 ORDER BY timestamp DESC @@ -180,7 +189,9 @@ func (s *Store) getResultsByServiceID(serviceID int64) (results []*core.Result, for rows.Next() { result := &core.Result{} var id int64 - _ = rows.Scan(&id, &result.Success, &result.Connected, &result.HTTPStatus, &result.DNSRCode, &result.CertificateExpiration, &result.Hostname, &result.IP, &result.Duration, &result.Timestamp) + var joinedErrors string + _ = rows.Scan(&id, &result.Success, &joinedErrors, &result.Connected, &result.HTTPStatus, &result.DNSRCode, &result.CertificateExpiration, &result.Hostname, &result.IP, &result.Duration, &result.Timestamp) + result.Errors = strings.Split(joinedErrors, arraySeparator) results = append(results, result) idResultMap[id] = result } @@ -191,10 +202,12 @@ func (s *Store) getResultsByServiceID(serviceID int64) (results []*core.Result, return } for serviceResultID, result := range idResultMap { - rows, err = transaction.Query(` - SELECT service_result_id, condition, success - FROM service_result_condition - WHERE service_result_id = $1`, + rows, err = transaction.Query( + ` + SELECT service_result_id, condition, success + FROM service_result_condition + WHERE service_result_id = $1 + `, serviceResultID, ) if err != nil { @@ -248,29 +261,40 @@ func (s *Store) Insert(service *core.Service, result *core.Result) { if err != nil { return } + + // Clean up old events if there's more than twice the maximum number of events + // This lets us both keep the table clean without impacting performance too much + // (since we're only deleting MaximumNumberOfEvents at a time instead of 1) + if numberOfEvents > core.MaximumNumberOfEvents*2 { + err = s.deleteOldEvents(tx, serviceID) + if err != nil { + log.Printf("[database][Insert] Failed to delete old events for group=%s; service=%s: %s", service.Group, service.Name, err.Error()) + } + } + //log.Printf("there are currently %d events", numberOfEvents) if numberOfEvents == 0 { // There's no events yet, which means we need to add the EventStart and the first healthy/unhealthy event err = s.insertEvent(tx, serviceID, &core.Event{ Type: core.EventStart, - Timestamp: result.Timestamp.Add(-result.Duration), + Timestamp: result.Timestamp.Add(-50 * time.Millisecond), }) if err != nil { // Silently fail - log.Printf("[database][Insert] Failed to insert event=%s for group=%s; service=%s: %s", core.EventStart, service.Name, service.Group, err.Error()) + log.Printf("[database][Insert] Failed to insert event=%s for group=%s; service=%s: %s", core.EventStart, service.Group, service.Name, err.Error()) } event := generateEventBasedOnResult(result) err = s.insertEvent(tx, serviceID, event) if err != nil { // Silently fail - log.Printf("[database][Insert] Failed to insert event=%s for group=%s; service=%s: %s", event.Type, service.Name, service.Group, err.Error()) + log.Printf("[database][Insert] Failed to insert event=%s for group=%s; service=%s: %s", event.Type, service.Group, service.Name, err.Error()) } } else { // Get the success value of the previous result var lastResultSuccess bool lastResultSuccess, err = s.getLastServiceResultSuccessValue(tx, serviceID) if err != nil { - log.Printf("[database][Insert] Failed to retrieve outcome of previous result for group=%s; service=%s: %s", service.Name, service.Group, err.Error()) + log.Printf("[database][Insert] Failed to retrieve outcome of previous result for group=%s; service=%s: %s", service.Group, service.Name, err.Error()) } else { // If we managed to retrieve the outcome of the previous result, we'll compare it with the new result. // If the final outcome (success or failure) of the previous and the new result aren't the same, it means @@ -281,7 +305,7 @@ func (s *Store) Insert(service *core.Service, result *core.Result) { err = s.insertEvent(tx, serviceID, event) if err != nil { // Silently fail - log.Printf("[database][Insert] Failed to insert event=%s for group=%s; service=%s: %s", event.Type, service.Name, service.Group, err.Error()) + log.Printf("[database][Insert] Failed to insert event=%s for group=%s; service=%s: %s", event.Type, service.Group, service.Name, err.Error()) } } } @@ -290,7 +314,7 @@ func (s *Store) Insert(service *core.Service, result *core.Result) { err = s.insertResult(tx, serviceID, result) if err != nil { // Silently fail - log.Printf("[database][Insert] Failed to insert result for group=%s; service=%s: %s", service.Name, service.Group, err.Error()) + log.Printf("[database][Insert] Failed to insert result for group=%s; service=%s: %s", service.Group, service.Name, err.Error()) } //log.Printf("[database][Insert] Successfully inserted result in duration=%dns", time.Since(start).Nanoseconds()) err = tx.Commit() @@ -372,7 +396,7 @@ func (s *Store) insertEvent(tx *sql.Tx, serviceID int64, event *core.Event) erro } func (s *Store) getLastServiceResultSuccessValue(tx *sql.Tx, serviceID int64) (bool, error) { - rows, err := tx.Query("SELECT success FROM service_result WHERE service_id = $1 ORDER BY timestamp LIMIT 1", serviceID) + rows, err := tx.Query("SELECT success FROM service_result WHERE service_id = $1 ORDER BY service_result_id DESC LIMIT 1", serviceID) if err != nil { return false, err } @@ -392,11 +416,14 @@ func (s *Store) getLastServiceResultSuccessValue(tx *sql.Tx, serviceID int64) (b // insertResult inserts a result in the store func (s *Store) insertResult(tx *sql.Tx, serviceID int64, result *core.Result) error { - res, err := tx.Exec(` - INSERT INTO service_result (service_id, success, connected, status, dns_rcode, certificate_expiration, hostname, ip, duration, timestamp) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)`, + res, err := tx.Exec( + ` + INSERT INTO service_result (service_id, success, errors, connected, status, dns_rcode, certificate_expiration, hostname, ip, duration, timestamp) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) + `, serviceID, result.Success, + strings.Join(result.Errors, arraySeparator), result.Connected, result.HTTPStatus, result.DNSRCode, @@ -432,6 +459,31 @@ func (s *Store) insertConditionResults(tx *sql.Tx, serviceResultID int64, condit return nil } +// insertService inserts a service in the store and returns the generated id of said service +func (s *Store) deleteOldEvents(tx *sql.Tx, serviceID int64) error { + result, err := tx.Exec( + ` + DELETE FROM service_event + WHERE service_id = $1 + AND service_event_id NOT IN ( + SELECT service_event_id + FROM service_event + WHERE service_id = $1 + ORDER BY service_event_id DESC + LIMIT $2 + ) + `, + serviceID, + core.MaximumNumberOfEvents, + ) + if err != nil { + return err + } + rowsAffected, _ := result.RowsAffected() + log.Printf("deleted %d rows", rowsAffected) + return nil +} + // DeleteAllServiceStatusesNotInKeys removes all rows owned by a service whose key is not within the keys provided func (s *Store) DeleteAllServiceStatusesNotInKeys(keys []string) int { panic("implement me") @@ -447,6 +499,7 @@ func (s *Store) Save() error { return nil } +// Close the database handle func (s *Store) Close() { _ = s.db.Close() }