Refactor code and enable WAL for 4x performance improvement
This commit is contained in:
		| @ -47,6 +47,13 @@ func NewStore(driver, path string) (*Store, error) { | ||||
| 	if store.db, err = sql.Open(driver, path); err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	if driver == "sqlite" { | ||||
| 		_, _ = store.db.Exec("PRAGMA foreign_keys=ON") | ||||
| 		_, _ = store.db.Exec("PRAGMA journal_mode=WAL") | ||||
| 		// Prevents driver from running into "database is locked" errors | ||||
| 		// This is because we're using WAL to improve performance | ||||
| 		store.db.SetMaxOpenConns(1) | ||||
| 	} | ||||
| 	if err = store.createSchema(); err != nil { | ||||
| 		_ = store.db.Close() | ||||
| 		return nil, err | ||||
| @ -71,7 +78,7 @@ func (s *Store) createSchema() error { | ||||
| 	_, err = s.db.Exec(` | ||||
| 		CREATE TABLE IF NOT EXISTS service_event ( | ||||
| 		    service_event_id   INTEGER PRIMARY KEY, | ||||
| 		    service_id         INTEGER REFERENCES service(id), | ||||
| 		    service_id         INTEGER REFERENCES service(service_id) ON DELETE CASCADE, | ||||
| 			event_type         TEXT, | ||||
| 			event_timestamp    TIMESTAMP | ||||
| 		) | ||||
| @ -82,7 +89,7 @@ func (s *Store) createSchema() error { | ||||
| 	_, err = s.db.Exec(` | ||||
| 		CREATE TABLE IF NOT EXISTS service_result ( | ||||
| 		    service_result_id      INTEGER PRIMARY KEY, | ||||
| 		    service_id             INTEGER REFERENCES service(id), | ||||
| 		    service_id             INTEGER REFERENCES service(service_id) ON DELETE CASCADE, | ||||
| 			success                INTEGER, | ||||
| 		    errors                 TEXT, | ||||
| 		    connected              INTEGER, | ||||
| @ -101,7 +108,7 @@ func (s *Store) createSchema() error { | ||||
| 	_, err = s.db.Exec(` | ||||
| 		CREATE TABLE IF NOT EXISTS service_result_condition ( | ||||
| 		    service_result_condition_id  INTEGER PRIMARY KEY, | ||||
| 		    service_result_id            INTEGER REFERENCES service_result(service_result_id), | ||||
| 		    service_result_id            INTEGER REFERENCES service_result(service_result_id) ON DELETE CASCADE, | ||||
| 		    condition                    TEXT, | ||||
| 		    success                      INTEGER | ||||
| 		) | ||||
| @ -143,6 +150,127 @@ func (s *Store) GetServiceStatusByKey(key string) *core.ServiceStatus { | ||||
| 	return serviceStatus | ||||
| } | ||||
|  | ||||
| // Insert adds the observed result for the specified service into the store | ||||
| func (s *Store) Insert(service *core.Service, result *core.Result) { | ||||
| 	tx, err := s.db.Begin() | ||||
| 	if err != nil { | ||||
| 		return | ||||
| 	} | ||||
| 	//start := time.Now() | ||||
| 	serviceID, err := s.getServiceID(tx, service) | ||||
| 	if err != nil { | ||||
| 		if err == errServiceNotFoundInDatabase { | ||||
| 			// Service doesn't exist in the database, insert it | ||||
| 			if serviceID, err = s.insertService(tx, service); err != nil { | ||||
| 				return // failed to insert service | ||||
| 			} | ||||
| 		} else { | ||||
| 			return | ||||
| 		} | ||||
| 	} | ||||
| 	// First, we need to check if we need to insert a new event. | ||||
| 	// | ||||
| 	// A new event must be added if either of the following cases happen: | ||||
| 	// 1. There is only 1 event. The total number of events for a service can only be 1 if the only existing event is | ||||
| 	//    of type EventStart, in which case we will have to create a new event of type EventHealthy or EventUnhealthy | ||||
| 	//    based on result.Success. | ||||
| 	// 2. The lastResult.Success != result.Success. This implies that the service went from healthy to unhealthy or | ||||
| 	//    vice-versa, in which case we will have to create a new event of type EventHealthy or EventUnhealthy | ||||
| 	//	  based on result.Success. | ||||
| 	numberOfEvents, err := s.getNumberOfEventsByServiceID(tx, serviceID) | ||||
| 	if err != nil { | ||||
| 		return | ||||
| 	} | ||||
| 	if numberOfEvents == 0 { | ||||
| 		// There's no events yet, which means we need to add the EventStart and the first healthy/unhealthy event | ||||
| 		err = s.insertEvent(tx, serviceID, &core.Event{ | ||||
| 			Type:      core.EventStart, | ||||
| 			Timestamp: result.Timestamp.Add(-50 * time.Millisecond), | ||||
| 		}) | ||||
| 		if err != nil { | ||||
| 			// Silently fail | ||||
| 			log.Printf("[database][Insert] Failed to insert event=%s for group=%s; service=%s: %s", core.EventStart, service.Group, service.Name, err.Error()) | ||||
| 		} | ||||
| 		event := generateEventBasedOnResult(result) | ||||
| 		err = s.insertEvent(tx, serviceID, event) | ||||
| 		if err != nil { | ||||
| 			// Silently fail | ||||
| 			log.Printf("[database][Insert] Failed to insert event=%s for group=%s; service=%s: %s", event.Type, service.Group, service.Name, err.Error()) | ||||
| 		} | ||||
| 	} else { | ||||
| 		// Get the success value of the previous result | ||||
| 		var lastResultSuccess bool | ||||
| 		lastResultSuccess, err = s.getLastServiceResultSuccessValue(tx, serviceID) | ||||
| 		if err != nil { | ||||
| 			log.Printf("[database][Insert] Failed to retrieve outcome of previous result for group=%s; service=%s: %s", service.Group, service.Name, err.Error()) | ||||
| 		} else { | ||||
| 			// If we managed to retrieve the outcome of the previous result, we'll compare it with the new result. | ||||
| 			// If the final outcome (success or failure) of the previous and the new result aren't the same, it means | ||||
| 			// that the service either went from Healthy to Unhealthy or Unhealthy -> Healthy, therefore, we'll add | ||||
| 			// an event to mark the change in state | ||||
| 			if lastResultSuccess != result.Success { | ||||
| 				event := generateEventBasedOnResult(result) | ||||
| 				err = s.insertEvent(tx, serviceID, event) | ||||
| 				if err != nil { | ||||
| 					// Silently fail | ||||
| 					log.Printf("[database][Insert] Failed to insert event=%s for group=%s; service=%s: %s", event.Type, service.Group, service.Name, err.Error()) | ||||
| 				} | ||||
| 			} | ||||
| 		} | ||||
| 		// Clean up old events if there's more than twice the maximum number of events | ||||
| 		// This lets us both keep the table clean without impacting performance too much | ||||
| 		// (since we're only deleting MaximumNumberOfEvents at a time instead of 1) | ||||
| 		if numberOfEvents > core.MaximumNumberOfEvents*2 { | ||||
| 			err = s.deleteOldEvents(tx, serviceID) | ||||
| 			if err != nil { | ||||
| 				log.Printf("[database][Insert] Failed to delete old events for group=%s; service=%s: %s", service.Group, service.Name, err.Error()) | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| 	// Second, we need to insert the result. | ||||
| 	err = s.insertResult(tx, serviceID, result) | ||||
| 	if err != nil { | ||||
| 		// Silently fail | ||||
| 		log.Printf("[database][Insert] Failed to insert result for group=%s; service=%s: %s", service.Group, service.Name, err.Error()) | ||||
| 	} | ||||
| 	// Clean up old results | ||||
| 	numberOfResults, err := s.getNumberOfResultsByServiceID(tx, serviceID) | ||||
| 	if err != nil { | ||||
| 		return | ||||
| 	} | ||||
| 	if numberOfResults > core.MaximumNumberOfResults*2 { | ||||
| 		err = s.deleteOldResults(tx, serviceID) | ||||
| 		if err != nil { | ||||
| 			log.Printf("[database][Insert] Failed to delete old results for group=%s; service=%s: %s", service.Group, service.Name, err.Error()) | ||||
| 		} | ||||
| 	} | ||||
| 	//log.Printf("[database][Insert] Successfully inserted result in duration=%dns", time.Since(start).Nanoseconds()) | ||||
| 	if err = tx.Commit(); err != nil { | ||||
| 		_ = tx.Rollback() | ||||
| 	} | ||||
| 	return | ||||
| } | ||||
|  | ||||
| // DeleteAllServiceStatusesNotInKeys removes all rows owned by a service whose key is not within the keys provided | ||||
| func (s *Store) DeleteAllServiceStatusesNotInKeys(keys []string) int { | ||||
| 	panic("implement me") | ||||
| } | ||||
|  | ||||
| // Clear deletes everything from the store | ||||
| func (s *Store) Clear() { | ||||
| 	_, _ = s.db.Exec("DELETE FROM service") | ||||
| } | ||||
|  | ||||
| // Save does nothing, because this store is immediately persistent. | ||||
| func (s *Store) Save() error { | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // Close the database handle | ||||
| func (s *Store) Close() { | ||||
| 	_ = s.db.Close() | ||||
| } | ||||
|  | ||||
| func (s *Store) getServiceIDNameAndGroupByKey(key string) (id int64, group, name string, err error) { | ||||
| 	rows, err := s.db.Query("SELECT service_id, service_name, service_group FROM service WHERE service_key = $1 LIMIT 1", key) | ||||
| 	if err != nil { | ||||
| @ -151,6 +279,7 @@ func (s *Store) getServiceIDNameAndGroupByKey(key string) (id int64, group, name | ||||
| 	for rows.Next() { | ||||
| 		_ = rows.Scan(&id, &name, &group) | ||||
| 	} | ||||
| 	_ = rows.Close() | ||||
| 	if id == 0 { | ||||
| 		return 0, "", "", errServiceNotFoundInDatabase | ||||
| 	} | ||||
| @ -178,7 +307,8 @@ func (s *Store) getResultsByServiceID(serviceID int64) (results []*core.Result, | ||||
| 			FROM service_result | ||||
| 			WHERE service_id = $1 | ||||
| 			ORDER BY timestamp DESC | ||||
| 			LIMIT $2`, | ||||
| 			LIMIT $2 | ||||
| 		`, | ||||
| 		serviceID, | ||||
| 		core.MaximumNumberOfResults, | ||||
| 	) | ||||
| @ -222,108 +352,13 @@ func (s *Store) getResultsByServiceID(serviceID int64) (results []*core.Result, | ||||
| 		} | ||||
| 		_ = rows.Close() | ||||
| 	} | ||||
| 	err = transaction.Commit() | ||||
| 	if err != nil { | ||||
| 	if err = transaction.Commit(); err != nil { | ||||
| 		_ = transaction.Rollback() | ||||
| 		return | ||||
| 	} | ||||
| 	return | ||||
| } | ||||
|  | ||||
| // Insert adds the observed result for the specified service into the store | ||||
| func (s *Store) Insert(service *core.Service, result *core.Result) { | ||||
| 	tx, err := s.db.Begin() | ||||
| 	if err != nil { | ||||
| 		return | ||||
| 	} | ||||
| 	//start := time.Now() | ||||
| 	serviceID, err := s.getServiceID(tx, service) | ||||
| 	if err != nil { | ||||
| 		if err == errServiceNotFoundInDatabase { | ||||
| 			// Service doesn't exist in the database, insert it | ||||
| 			if serviceID, err = s.insertService(tx, service); err != nil { | ||||
| 				return // failed to insert service | ||||
| 			} | ||||
| 		} else { | ||||
| 			return | ||||
| 		} | ||||
| 	} | ||||
| 	// First, we need to check if we need to insert a new event. | ||||
| 	// | ||||
| 	// A new event must be added if either of the following cases happen: | ||||
| 	// 1. There is only 1 event. The total number of events for a service can only be 1 if the only existing event is | ||||
| 	//    of type EventStart, in which case we will have to create a new event of type EventHealthy or EventUnhealthy | ||||
| 	//    based on result.Success. | ||||
| 	// 2. The lastResult.Success != result.Success. This implies that the service went from healthy to unhealthy or | ||||
| 	//    vice-versa, in which case we will have to create a new event of type EventHealthy or EventUnhealthy | ||||
| 	//	  based on result.Success. | ||||
| 	numberOfEvents, err := s.getNumberOfEventsByServiceID(tx, serviceID) | ||||
| 	if err != nil { | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	// Clean up old events if there's more than twice the maximum number of events | ||||
| 	// This lets us both keep the table clean without impacting performance too much | ||||
| 	// (since we're only deleting MaximumNumberOfEvents at a time instead of 1) | ||||
| 	if numberOfEvents > core.MaximumNumberOfEvents*2 { | ||||
| 		err = s.deleteOldEvents(tx, serviceID) | ||||
| 		if err != nil { | ||||
| 			log.Printf("[database][Insert] Failed to delete old events for group=%s; service=%s: %s", service.Group, service.Name, err.Error()) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	//log.Printf("there are currently %d events", numberOfEvents) | ||||
| 	if numberOfEvents == 0 { | ||||
| 		// There's no events yet, which means we need to add the EventStart and the first healthy/unhealthy event | ||||
| 		err = s.insertEvent(tx, serviceID, &core.Event{ | ||||
| 			Type:      core.EventStart, | ||||
| 			Timestamp: result.Timestamp.Add(-50 * time.Millisecond), | ||||
| 		}) | ||||
| 		if err != nil { | ||||
| 			// Silently fail | ||||
| 			log.Printf("[database][Insert] Failed to insert event=%s for group=%s; service=%s: %s", core.EventStart, service.Group, service.Name, err.Error()) | ||||
| 		} | ||||
| 		event := generateEventBasedOnResult(result) | ||||
| 		err = s.insertEvent(tx, serviceID, event) | ||||
| 		if err != nil { | ||||
| 			// Silently fail | ||||
| 			log.Printf("[database][Insert] Failed to insert event=%s for group=%s; service=%s: %s", event.Type, service.Group, service.Name, err.Error()) | ||||
| 		} | ||||
| 	} else { | ||||
| 		// Get the success value of the previous result | ||||
| 		var lastResultSuccess bool | ||||
| 		lastResultSuccess, err = s.getLastServiceResultSuccessValue(tx, serviceID) | ||||
| 		if err != nil { | ||||
| 			log.Printf("[database][Insert] Failed to retrieve outcome of previous result for group=%s; service=%s: %s", service.Group, service.Name, err.Error()) | ||||
| 		} else { | ||||
| 			// If we managed to retrieve the outcome of the previous result, we'll compare it with the new result. | ||||
| 			// If the final outcome (success or failure) of the previous and the new result aren't the same, it means | ||||
| 			// that the service either went from Healthy to Unhealthy or Unhealthy -> Healthy, therefore, we'll add | ||||
| 			// an event to mark the change in state | ||||
| 			if lastResultSuccess != result.Success { | ||||
| 				event := generateEventBasedOnResult(result) | ||||
| 				err = s.insertEvent(tx, serviceID, event) | ||||
| 				if err != nil { | ||||
| 					// Silently fail | ||||
| 					log.Printf("[database][Insert] Failed to insert event=%s for group=%s; service=%s: %s", event.Type, service.Group, service.Name, err.Error()) | ||||
| 				} | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| 	// Second, we need to insert the result. | ||||
| 	err = s.insertResult(tx, serviceID, result) | ||||
| 	if err != nil { | ||||
| 		// Silently fail | ||||
| 		log.Printf("[database][Insert] Failed to insert result for group=%s; service=%s: %s", service.Group, service.Name, err.Error()) | ||||
| 	} | ||||
| 	//log.Printf("[database][Insert] Successfully inserted result in duration=%dns", time.Since(start).Nanoseconds()) | ||||
| 	err = tx.Commit() | ||||
| 	if err != nil { | ||||
| 		_ = tx.Rollback() | ||||
| 	} | ||||
| 	return | ||||
| } | ||||
|  | ||||
| func (s *Store) getServiceID(tx *sql.Tx, service *core.Service) (int64, error) { | ||||
| 	rows, err := tx.Query( | ||||
| 		"SELECT service_id FROM service WHERE service_key = $1", | ||||
| @ -381,6 +416,25 @@ func (s *Store) getNumberOfEventsByServiceID(tx *sql.Tx, serviceID int64) (int64 | ||||
| 	return numberOfEvents, nil | ||||
| } | ||||
|  | ||||
| func (s *Store) getNumberOfResultsByServiceID(tx *sql.Tx, serviceID int64) (int64, error) { | ||||
| 	rows, err := tx.Query("SELECT COUNT(1) FROM service_result WHERE service_id = $1", serviceID) | ||||
| 	if err != nil { | ||||
| 		return 0, err | ||||
| 	} | ||||
| 	var numberOfResults int64 | ||||
| 	var found bool | ||||
| 	for rows.Next() { | ||||
| 		_ = rows.Scan(&numberOfResults) | ||||
| 		found = true | ||||
| 		break | ||||
| 	} | ||||
| 	_ = rows.Close() | ||||
| 	if !found { | ||||
| 		return 0, errNoRowsReturned | ||||
| 	} | ||||
| 	return numberOfResults, nil | ||||
| } | ||||
|  | ||||
| // insertEvent inserts a service event in the store | ||||
| func (s *Store) insertEvent(tx *sql.Tx, serviceID int64, event *core.Event) error { | ||||
| 	_, err := tx.Exec( | ||||
| @ -459,9 +513,9 @@ func (s *Store) insertConditionResults(tx *sql.Tx, serviceResultID int64, condit | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // insertService inserts a service in the store and returns the generated id of said service | ||||
| // deleteOldEvents deletes old service events that are no longer needed | ||||
| func (s *Store) deleteOldEvents(tx *sql.Tx, serviceID int64) error { | ||||
| 	result, err := tx.Exec( | ||||
| 	_, err := tx.Exec( | ||||
| 		` | ||||
| 			DELETE FROM service_event  | ||||
| 			WHERE service_id = $1  | ||||
| @ -479,27 +533,32 @@ func (s *Store) deleteOldEvents(tx *sql.Tx, serviceID int64) error { | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	rowsAffected, _ := result.RowsAffected() | ||||
| 	log.Printf("deleted %d rows", rowsAffected) | ||||
| 	//rowsAffected, _ := result.RowsAffected() | ||||
| 	//log.Printf("deleted %d rows from service_event", rowsAffected) | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // DeleteAllServiceStatusesNotInKeys removes all rows owned by a service whose key is not within the keys provided | ||||
| func (s *Store) DeleteAllServiceStatusesNotInKeys(keys []string) int { | ||||
| 	panic("implement me") | ||||
| } | ||||
|  | ||||
| // Clear deletes everything from the store | ||||
| func (s *Store) Clear() { | ||||
| 	panic("implement me") | ||||
| } | ||||
|  | ||||
| // Save does nothing, because this store is immediately persistent. | ||||
| func (s *Store) Save() error { | ||||
| // deleteOldResults deletes old service results that are no longer needed | ||||
| func (s *Store) deleteOldResults(tx *sql.Tx, serviceID int64) error { | ||||
| 	_, err := tx.Exec( | ||||
| 		` | ||||
| 			DELETE FROM service_result  | ||||
| 			WHERE service_id = $1  | ||||
| 			  AND service_result_id NOT IN ( | ||||
| 			      SELECT service_result_id | ||||
| 			      FROM service_result | ||||
| 			      WHERE service_id = $1 | ||||
| 			      ORDER BY service_result_id DESC | ||||
| 			      LIMIT $2 | ||||
| 			  	) | ||||
| 		`, | ||||
| 		serviceID, | ||||
| 		core.MaximumNumberOfResults, | ||||
| 	) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	//rowsAffected, _ := result.RowsAffected() | ||||
| 	//log.Printf("deleted %d rows from service_result", rowsAffected) | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // Close the database handle | ||||
| func (s *Store) Close() { | ||||
| 	_ = s.db.Close() | ||||
| } | ||||
|  | ||||
| @ -33,6 +33,8 @@ type Store interface { | ||||
| 	Save() error | ||||
| } | ||||
|  | ||||
| // TODO: add method to check state of store (by keeping track of silent errors) | ||||
|  | ||||
| var ( | ||||
| 	// Validate interface implementation on compile | ||||
| 	_ Store = (*memory.Store)(nil) | ||||
|  | ||||
		Reference in New Issue
	
	Block a user