Automatically clear up old events
This commit is contained in:
		| @ -4,12 +4,18 @@ import ( | ||||
| 	"database/sql" | ||||
| 	"errors" | ||||
| 	"log" | ||||
| 	"strings" | ||||
| 	"time" | ||||
|  | ||||
| 	"github.com/TwinProduction/gatus/core" | ||||
| 	"github.com/TwinProduction/gatus/util" | ||||
| 	_ "modernc.org/sqlite" | ||||
| ) | ||||
|  | ||||
| const ( | ||||
| 	arraySeparator = "|~|" | ||||
| ) | ||||
|  | ||||
| var ( | ||||
| 	// ErrFilePathNotSpecified is the error returned when path parameter passed in NewStore is blank | ||||
| 	ErrFilePathNotSpecified = errors.New("file path cannot be empty") | ||||
| @ -105,6 +111,8 @@ func (s *Store) createSchema() error { | ||||
|  | ||||
| // TODO: add parameter event and uptime & only fetch them if necessary | ||||
|  | ||||
| // GetAllServiceStatusesWithResultPagination returns all monitored core.ServiceStatus | ||||
| // with a subset of core.Result defined by the page and pageSize parameters | ||||
| func (s *Store) GetAllServiceStatusesWithResultPagination(page, pageSize int) map[string]*core.ServiceStatus { | ||||
| 	panic("implement me") | ||||
| } | ||||
| @ -121,17 +129,17 @@ func (s *Store) GetServiceStatusByKey(key string) *core.ServiceStatus { | ||||
| 		return nil | ||||
| 	} | ||||
| 	serviceStatus := &core.ServiceStatus{ | ||||
| 		Name:    serviceName, | ||||
| 		Group:   serviceGroup, | ||||
| 		Key:     key, | ||||
| 		Results: nil, | ||||
| 		Events:  nil, | ||||
| 		Uptime:  nil, | ||||
| 		Name:   serviceName, | ||||
| 		Group:  serviceGroup, | ||||
| 		Key:    key, | ||||
| 		Uptime: nil, | ||||
| 	} | ||||
| 	if serviceStatus.Events, err = s.getEventsByServiceID(serviceID); err != nil { | ||||
| 		log.Printf("[database][GetServiceStatusByKey] Failed to retrieve events for key=%s: %s", key, err.Error()) | ||||
| 	} | ||||
| 	serviceStatus.Results, err = s.getResultsByServiceID(serviceID) | ||||
| 	// TODO: populate Uptime | ||||
| 	// TODO: add flag to decide whether to retrieve uptime or not | ||||
| 	return serviceStatus | ||||
| } | ||||
|  | ||||
| @ -150,7 +158,7 @@ func (s *Store) getServiceIDNameAndGroupByKey(key string) (id int64, group, name | ||||
| } | ||||
|  | ||||
| func (s *Store) getEventsByServiceID(serviceID int64) (events []*core.Event, err error) { | ||||
| 	rows, err := s.db.Query("SELECT event_type, event_timestamp FROM service_event WHERE service_id = $1 ORDER BY event_timestamp DESC LIMIT $2", serviceID, core.MaximumNumberOfEvents) | ||||
| 	rows, err := s.db.Query("SELECT event_type, event_timestamp FROM service_event WHERE service_id = $1 ORDER BY service_event_id DESC LIMIT $2", serviceID, core.MaximumNumberOfEvents) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| @ -164,8 +172,9 @@ func (s *Store) getEventsByServiceID(serviceID int64) (events []*core.Event, err | ||||
| } | ||||
|  | ||||
| func (s *Store) getResultsByServiceID(serviceID int64) (results []*core.Result, err error) { | ||||
| 	rows, err := s.db.Query(` | ||||
| 			SELECT service_result_id, success, connected, status, dns_rcode, certificate_expiration, hostname, ip, duration, timestamp | ||||
| 	rows, err := s.db.Query( | ||||
| 		` | ||||
| 			SELECT service_result_id, success, errors, connected, status, dns_rcode, certificate_expiration, hostname, ip, duration, timestamp | ||||
| 			FROM service_result | ||||
| 			WHERE service_id = $1 | ||||
| 			ORDER BY timestamp DESC | ||||
| @ -180,7 +189,9 @@ func (s *Store) getResultsByServiceID(serviceID int64) (results []*core.Result, | ||||
| 	for rows.Next() { | ||||
| 		result := &core.Result{} | ||||
| 		var id int64 | ||||
| 		_ = rows.Scan(&id, &result.Success, &result.Connected, &result.HTTPStatus, &result.DNSRCode, &result.CertificateExpiration, &result.Hostname, &result.IP, &result.Duration, &result.Timestamp) | ||||
| 		var joinedErrors string | ||||
| 		_ = rows.Scan(&id, &result.Success, &joinedErrors, &result.Connected, &result.HTTPStatus, &result.DNSRCode, &result.CertificateExpiration, &result.Hostname, &result.IP, &result.Duration, &result.Timestamp) | ||||
| 		result.Errors = strings.Split(joinedErrors, arraySeparator) | ||||
| 		results = append(results, result) | ||||
| 		idResultMap[id] = result | ||||
| 	} | ||||
| @ -191,10 +202,12 @@ func (s *Store) getResultsByServiceID(serviceID int64) (results []*core.Result, | ||||
| 		return | ||||
| 	} | ||||
| 	for serviceResultID, result := range idResultMap { | ||||
| 		rows, err = transaction.Query(` | ||||
| 			SELECT service_result_id, condition, success | ||||
| 			FROM service_result_condition | ||||
| 			WHERE service_result_id = $1`, | ||||
| 		rows, err = transaction.Query( | ||||
| 			` | ||||
| 				SELECT service_result_id, condition, success | ||||
| 				FROM service_result_condition | ||||
| 				WHERE service_result_id = $1 | ||||
| 			`, | ||||
| 			serviceResultID, | ||||
| 		) | ||||
| 		if err != nil { | ||||
| @ -248,29 +261,40 @@ func (s *Store) Insert(service *core.Service, result *core.Result) { | ||||
| 	if err != nil { | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	// Clean up old events if there's more than twice the maximum number of events | ||||
| 	// This lets us both keep the table clean without impacting performance too much | ||||
| 	// (since we're only deleting MaximumNumberOfEvents at a time instead of 1) | ||||
| 	if numberOfEvents > core.MaximumNumberOfEvents*2 { | ||||
| 		err = s.deleteOldEvents(tx, serviceID) | ||||
| 		if err != nil { | ||||
| 			log.Printf("[database][Insert] Failed to delete old events for group=%s; service=%s: %s", service.Group, service.Name, err.Error()) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	//log.Printf("there are currently %d events", numberOfEvents) | ||||
| 	if numberOfEvents == 0 { | ||||
| 		// There's no events yet, which means we need to add the EventStart and the first healthy/unhealthy event | ||||
| 		err = s.insertEvent(tx, serviceID, &core.Event{ | ||||
| 			Type:      core.EventStart, | ||||
| 			Timestamp: result.Timestamp.Add(-result.Duration), | ||||
| 			Timestamp: result.Timestamp.Add(-50 * time.Millisecond), | ||||
| 		}) | ||||
| 		if err != nil { | ||||
| 			// Silently fail | ||||
| 			log.Printf("[database][Insert] Failed to insert event=%s for group=%s; service=%s: %s", core.EventStart, service.Name, service.Group, err.Error()) | ||||
| 			log.Printf("[database][Insert] Failed to insert event=%s for group=%s; service=%s: %s", core.EventStart, service.Group, service.Name, err.Error()) | ||||
| 		} | ||||
| 		event := generateEventBasedOnResult(result) | ||||
| 		err = s.insertEvent(tx, serviceID, event) | ||||
| 		if err != nil { | ||||
| 			// Silently fail | ||||
| 			log.Printf("[database][Insert] Failed to insert event=%s for group=%s; service=%s: %s", event.Type, service.Name, service.Group, err.Error()) | ||||
| 			log.Printf("[database][Insert] Failed to insert event=%s for group=%s; service=%s: %s", event.Type, service.Group, service.Name, err.Error()) | ||||
| 		} | ||||
| 	} else { | ||||
| 		// Get the success value of the previous result | ||||
| 		var lastResultSuccess bool | ||||
| 		lastResultSuccess, err = s.getLastServiceResultSuccessValue(tx, serviceID) | ||||
| 		if err != nil { | ||||
| 			log.Printf("[database][Insert] Failed to retrieve outcome of previous result for group=%s; service=%s: %s", service.Name, service.Group, err.Error()) | ||||
| 			log.Printf("[database][Insert] Failed to retrieve outcome of previous result for group=%s; service=%s: %s", service.Group, service.Name, err.Error()) | ||||
| 		} else { | ||||
| 			// If we managed to retrieve the outcome of the previous result, we'll compare it with the new result. | ||||
| 			// If the final outcome (success or failure) of the previous and the new result aren't the same, it means | ||||
| @ -281,7 +305,7 @@ func (s *Store) Insert(service *core.Service, result *core.Result) { | ||||
| 				err = s.insertEvent(tx, serviceID, event) | ||||
| 				if err != nil { | ||||
| 					// Silently fail | ||||
| 					log.Printf("[database][Insert] Failed to insert event=%s for group=%s; service=%s: %s", event.Type, service.Name, service.Group, err.Error()) | ||||
| 					log.Printf("[database][Insert] Failed to insert event=%s for group=%s; service=%s: %s", event.Type, service.Group, service.Name, err.Error()) | ||||
| 				} | ||||
| 			} | ||||
| 		} | ||||
| @ -290,7 +314,7 @@ func (s *Store) Insert(service *core.Service, result *core.Result) { | ||||
| 	err = s.insertResult(tx, serviceID, result) | ||||
| 	if err != nil { | ||||
| 		// Silently fail | ||||
| 		log.Printf("[database][Insert] Failed to insert result for group=%s; service=%s: %s", service.Name, service.Group, err.Error()) | ||||
| 		log.Printf("[database][Insert] Failed to insert result for group=%s; service=%s: %s", service.Group, service.Name, err.Error()) | ||||
| 	} | ||||
| 	//log.Printf("[database][Insert] Successfully inserted result in duration=%dns", time.Since(start).Nanoseconds()) | ||||
| 	err = tx.Commit() | ||||
| @ -372,7 +396,7 @@ func (s *Store) insertEvent(tx *sql.Tx, serviceID int64, event *core.Event) erro | ||||
| } | ||||
|  | ||||
| func (s *Store) getLastServiceResultSuccessValue(tx *sql.Tx, serviceID int64) (bool, error) { | ||||
| 	rows, err := tx.Query("SELECT success FROM service_result WHERE service_id = $1 ORDER BY timestamp LIMIT 1", serviceID) | ||||
| 	rows, err := tx.Query("SELECT success FROM service_result WHERE service_id = $1 ORDER BY service_result_id DESC LIMIT 1", serviceID) | ||||
| 	if err != nil { | ||||
| 		return false, err | ||||
| 	} | ||||
| @ -392,11 +416,14 @@ func (s *Store) getLastServiceResultSuccessValue(tx *sql.Tx, serviceID int64) (b | ||||
|  | ||||
| // insertResult inserts a result in the store | ||||
| func (s *Store) insertResult(tx *sql.Tx, serviceID int64, result *core.Result) error { | ||||
| 	res, err := tx.Exec(` | ||||
| 			INSERT INTO service_result (service_id, success, connected, status, dns_rcode, certificate_expiration, hostname, ip, duration, timestamp) | ||||
| 			VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)`, | ||||
| 	res, err := tx.Exec( | ||||
| 		` | ||||
| 			INSERT INTO service_result (service_id, success, errors, connected, status, dns_rcode, certificate_expiration, hostname, ip, duration, timestamp) | ||||
| 			VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) | ||||
| 		`, | ||||
| 		serviceID, | ||||
| 		result.Success, | ||||
| 		strings.Join(result.Errors, arraySeparator), | ||||
| 		result.Connected, | ||||
| 		result.HTTPStatus, | ||||
| 		result.DNSRCode, | ||||
| @ -432,6 +459,31 @@ func (s *Store) insertConditionResults(tx *sql.Tx, serviceResultID int64, condit | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // insertService inserts a service in the store and returns the generated id of said service | ||||
| func (s *Store) deleteOldEvents(tx *sql.Tx, serviceID int64) error { | ||||
| 	result, err := tx.Exec( | ||||
| 		` | ||||
| 			DELETE FROM service_event  | ||||
| 			WHERE service_id = $1  | ||||
| 			  AND service_event_id NOT IN ( | ||||
| 			      SELECT service_event_id  | ||||
| 			      FROM service_event | ||||
| 			      WHERE service_id = $1 | ||||
| 			      ORDER BY service_event_id DESC | ||||
| 			      LIMIT $2 | ||||
| 			  	) | ||||
| 		`, | ||||
| 		serviceID, | ||||
| 		core.MaximumNumberOfEvents, | ||||
| 	) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	rowsAffected, _ := result.RowsAffected() | ||||
| 	log.Printf("deleted %d rows", rowsAffected) | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // DeleteAllServiceStatusesNotInKeys removes all rows owned by a service whose key is not within the keys provided | ||||
| func (s *Store) DeleteAllServiceStatusesNotInKeys(keys []string) int { | ||||
| 	panic("implement me") | ||||
| @ -447,6 +499,7 @@ func (s *Store) Save() error { | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // Close the database handle | ||||
| func (s *Store) Close() { | ||||
| 	_ = s.db.Close() | ||||
| } | ||||
|  | ||||
		Reference in New Issue
	
	Block a user