Rename Service to Endpoint (#192)
* Add clarifications in comments * #191: Rename Service to Endpoint
This commit is contained in:
@ -89,44 +89,44 @@ func (s *Store) createSchema() error {
|
||||
return s.createPostgresSchema()
|
||||
}
|
||||
|
||||
// GetAllServiceStatuses returns all monitored core.ServiceStatus
|
||||
// GetAllEndpointStatuses returns all monitored core.EndpointStatus
|
||||
// with a subset of core.Result defined by the page and pageSize parameters
|
||||
func (s *Store) GetAllServiceStatuses(params *paging.ServiceStatusParams) ([]*core.ServiceStatus, error) {
|
||||
func (s *Store) GetAllEndpointStatuses(params *paging.EndpointStatusParams) ([]*core.EndpointStatus, error) {
|
||||
tx, err := s.db.Begin()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
keys, err := s.getAllServiceKeys(tx)
|
||||
keys, err := s.getAllEndpointKeys(tx)
|
||||
if err != nil {
|
||||
_ = tx.Rollback()
|
||||
return nil, err
|
||||
}
|
||||
serviceStatuses := make([]*core.ServiceStatus, 0, len(keys))
|
||||
endpointStatuses := make([]*core.EndpointStatus, 0, len(keys))
|
||||
for _, key := range keys {
|
||||
serviceStatus, err := s.getServiceStatusByKey(tx, key, params)
|
||||
endpointStatus, err := s.getEndpointStatusByKey(tx, key, params)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
serviceStatuses = append(serviceStatuses, serviceStatus)
|
||||
endpointStatuses = append(endpointStatuses, endpointStatus)
|
||||
}
|
||||
if err = tx.Commit(); err != nil {
|
||||
_ = tx.Rollback()
|
||||
}
|
||||
return serviceStatuses, err
|
||||
return endpointStatuses, err
|
||||
}
|
||||
|
||||
// GetServiceStatus returns the service status for a given service name in the given group
|
||||
func (s *Store) GetServiceStatus(groupName, serviceName string, params *paging.ServiceStatusParams) (*core.ServiceStatus, error) {
|
||||
return s.GetServiceStatusByKey(util.ConvertGroupAndServiceToKey(groupName, serviceName), params)
|
||||
// GetEndpointStatus returns the endpoint status for a given endpoint name in the given group
|
||||
func (s *Store) GetEndpointStatus(groupName, endpointName string, params *paging.EndpointStatusParams) (*core.EndpointStatus, error) {
|
||||
return s.GetEndpointStatusByKey(util.ConvertGroupAndEndpointNameToKey(groupName, endpointName), params)
|
||||
}
|
||||
|
||||
// GetServiceStatusByKey returns the service status for a given key
|
||||
func (s *Store) GetServiceStatusByKey(key string, params *paging.ServiceStatusParams) (*core.ServiceStatus, error) {
|
||||
// GetEndpointStatusByKey returns the endpoint status for a given key
|
||||
func (s *Store) GetEndpointStatusByKey(key string, params *paging.EndpointStatusParams) (*core.EndpointStatus, error) {
|
||||
tx, err := s.db.Begin()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
serviceStatus, err := s.getServiceStatusByKey(tx, key, params)
|
||||
endpointStatus, err := s.getEndpointStatusByKey(tx, key, params)
|
||||
if err != nil {
|
||||
_ = tx.Rollback()
|
||||
return nil, err
|
||||
@ -134,7 +134,7 @@ func (s *Store) GetServiceStatusByKey(key string, params *paging.ServiceStatusPa
|
||||
if err = tx.Commit(); err != nil {
|
||||
_ = tx.Rollback()
|
||||
}
|
||||
return serviceStatus, err
|
||||
return endpointStatus, err
|
||||
}
|
||||
|
||||
// GetUptimeByKey returns the uptime percentage during a time range
|
||||
@ -146,12 +146,12 @@ func (s *Store) GetUptimeByKey(key string, from, to time.Time) (float64, error)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
serviceID, _, _, err := s.getServiceIDGroupAndNameByKey(tx, key)
|
||||
endpointID, _, _, err := s.getEndpointIDGroupAndNameByKey(tx, key)
|
||||
if err != nil {
|
||||
_ = tx.Rollback()
|
||||
return 0, err
|
||||
}
|
||||
uptime, _, err := s.getServiceUptime(tx, serviceID, from, to)
|
||||
uptime, _, err := s.getEndpointUptime(tx, endpointID, from, to)
|
||||
if err != nil {
|
||||
_ = tx.Rollback()
|
||||
return 0, err
|
||||
@ -171,12 +171,12 @@ func (s *Store) GetAverageResponseTimeByKey(key string, from, to time.Time) (int
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
serviceID, _, _, err := s.getServiceIDGroupAndNameByKey(tx, key)
|
||||
endpointID, _, _, err := s.getEndpointIDGroupAndNameByKey(tx, key)
|
||||
if err != nil {
|
||||
_ = tx.Rollback()
|
||||
return 0, err
|
||||
}
|
||||
averageResponseTime, err := s.getServiceAverageResponseTime(tx, serviceID, from, to)
|
||||
averageResponseTime, err := s.getEndpointAverageResponseTime(tx, endpointID, from, to)
|
||||
if err != nil {
|
||||
_ = tx.Rollback()
|
||||
return 0, err
|
||||
@ -196,12 +196,12 @@ func (s *Store) GetHourlyAverageResponseTimeByKey(key string, from, to time.Time
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
serviceID, _, _, err := s.getServiceIDGroupAndNameByKey(tx, key)
|
||||
endpointID, _, _, err := s.getEndpointIDGroupAndNameByKey(tx, key)
|
||||
if err != nil {
|
||||
_ = tx.Rollback()
|
||||
return nil, err
|
||||
}
|
||||
hourlyAverageResponseTimes, err := s.getServiceHourlyAverageResponseTimes(tx, serviceID, from, to)
|
||||
hourlyAverageResponseTimes, err := s.getEndpointHourlyAverageResponseTimes(tx, endpointID, from, to)
|
||||
if err != nil {
|
||||
_ = tx.Rollback()
|
||||
return nil, err
|
||||
@ -212,71 +212,71 @@ func (s *Store) GetHourlyAverageResponseTimeByKey(key string, from, to time.Time
|
||||
return hourlyAverageResponseTimes, nil
|
||||
}
|
||||
|
||||
// Insert adds the observed result for the specified service into the store
|
||||
func (s *Store) Insert(service *core.Service, result *core.Result) error {
|
||||
// Insert adds the observed result for the specified endpoint into the store
|
||||
func (s *Store) Insert(endpoint *core.Endpoint, result *core.Result) error {
|
||||
tx, err := s.db.Begin()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
serviceID, err := s.getServiceID(tx, service)
|
||||
endpointID, err := s.getEndpointID(tx, endpoint)
|
||||
if err != nil {
|
||||
if err == common.ErrServiceNotFound {
|
||||
// Service doesn't exist in the database, insert it
|
||||
if serviceID, err = s.insertService(tx, service); err != nil {
|
||||
if err == common.ErrEndpointNotFound {
|
||||
// Endpoint doesn't exist in the database, insert it
|
||||
if endpointID, err = s.insertEndpoint(tx, endpoint); err != nil {
|
||||
_ = tx.Rollback()
|
||||
log.Printf("[sql][Insert] Failed to create service with group=%s; service=%s: %s", service.Group, service.Name, err.Error())
|
||||
log.Printf("[sql][Insert] Failed to create endpoint with group=%s; endpoint=%s: %s", endpoint.Group, endpoint.Name, err.Error())
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
_ = tx.Rollback()
|
||||
log.Printf("[sql][Insert] Failed to retrieve id of service with group=%s; service=%s: %s", service.Group, service.Name, err.Error())
|
||||
log.Printf("[sql][Insert] Failed to retrieve id of endpoint with group=%s; endpoint=%s: %s", endpoint.Group, endpoint.Name, err.Error())
|
||||
return err
|
||||
}
|
||||
}
|
||||
// First, we need to check if we need to insert a new event.
|
||||
//
|
||||
// A new event must be added if either of the following cases happen:
|
||||
// 1. There is only 1 event. The total number of events for a service can only be 1 if the only existing event is
|
||||
// 1. There is only 1 event. The total number of events for a endpoint can only be 1 if the only existing event is
|
||||
// of type EventStart, in which case we will have to create a new event of type EventHealthy or EventUnhealthy
|
||||
// based on result.Success.
|
||||
// 2. The lastResult.Success != result.Success. This implies that the service went from healthy to unhealthy or
|
||||
// 2. The lastResult.Success != result.Success. This implies that the endpoint went from healthy to unhealthy or
|
||||
// vice-versa, in which case we will have to create a new event of type EventHealthy or EventUnhealthy
|
||||
// based on result.Success.
|
||||
numberOfEvents, err := s.getNumberOfEventsByServiceID(tx, serviceID)
|
||||
numberOfEvents, err := s.getNumberOfEventsByEndpointID(tx, endpointID)
|
||||
if err != nil {
|
||||
// Silently fail
|
||||
log.Printf("[sql][Insert] Failed to retrieve total number of events for group=%s; service=%s: %s", service.Group, service.Name, err.Error())
|
||||
log.Printf("[sql][Insert] Failed to retrieve total number of events for group=%s; endpoint=%s: %s", endpoint.Group, endpoint.Name, err.Error())
|
||||
}
|
||||
if numberOfEvents == 0 {
|
||||
// There's no events yet, which means we need to add the EventStart and the first healthy/unhealthy event
|
||||
err = s.insertEvent(tx, serviceID, &core.Event{
|
||||
err = s.insertEndpointEvent(tx, endpointID, &core.Event{
|
||||
Type: core.EventStart,
|
||||
Timestamp: result.Timestamp.Add(-50 * time.Millisecond),
|
||||
})
|
||||
if err != nil {
|
||||
// Silently fail
|
||||
log.Printf("[sql][Insert] Failed to insert event=%s for group=%s; service=%s: %s", core.EventStart, service.Group, service.Name, err.Error())
|
||||
log.Printf("[sql][Insert] Failed to insert event=%s for group=%s; endpoint=%s: %s", core.EventStart, endpoint.Group, endpoint.Name, err.Error())
|
||||
}
|
||||
event := core.NewEventFromResult(result)
|
||||
if err = s.insertEvent(tx, serviceID, event); err != nil {
|
||||
if err = s.insertEndpointEvent(tx, endpointID, event); err != nil {
|
||||
// Silently fail
|
||||
log.Printf("[sql][Insert] Failed to insert event=%s for group=%s; service=%s: %s", event.Type, service.Group, service.Name, err.Error())
|
||||
log.Printf("[sql][Insert] Failed to insert event=%s for group=%s; endpoint=%s: %s", event.Type, endpoint.Group, endpoint.Name, err.Error())
|
||||
}
|
||||
} else {
|
||||
// Get the success value of the previous result
|
||||
var lastResultSuccess bool
|
||||
if lastResultSuccess, err = s.getLastServiceResultSuccessValue(tx, serviceID); err != nil {
|
||||
log.Printf("[sql][Insert] Failed to retrieve outcome of previous result for group=%s; service=%s: %s", service.Group, service.Name, err.Error())
|
||||
if lastResultSuccess, err = s.getLastEndpointResultSuccessValue(tx, endpointID); err != nil {
|
||||
log.Printf("[sql][Insert] Failed to retrieve outcome of previous result for group=%s; endpoint=%s: %s", endpoint.Group, endpoint.Name, err.Error())
|
||||
} else {
|
||||
// If we managed to retrieve the outcome of the previous result, we'll compare it with the new result.
|
||||
// If the final outcome (success or failure) of the previous and the new result aren't the same, it means
|
||||
// that the service either went from Healthy to Unhealthy or Unhealthy -> Healthy, therefore, we'll add
|
||||
// that the endpoint either went from Healthy to Unhealthy or Unhealthy -> Healthy, therefore, we'll add
|
||||
// an event to mark the change in state
|
||||
if lastResultSuccess != result.Success {
|
||||
event := core.NewEventFromResult(result)
|
||||
if err = s.insertEvent(tx, serviceID, event); err != nil {
|
||||
if err = s.insertEndpointEvent(tx, endpointID, event); err != nil {
|
||||
// Silently fail
|
||||
log.Printf("[sql][Insert] Failed to insert event=%s for group=%s; service=%s: %s", event.Type, service.Group, service.Name, err.Error())
|
||||
log.Printf("[sql][Insert] Failed to insert event=%s for group=%s; endpoint=%s: %s", event.Type, endpoint.Group, endpoint.Name, err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -284,41 +284,41 @@ func (s *Store) Insert(service *core.Service, result *core.Result) error {
|
||||
// This lets us both keep the table clean without impacting performance too much
|
||||
// (since we're only deleting MaximumNumberOfEvents at a time instead of 1)
|
||||
if numberOfEvents > eventsCleanUpThreshold {
|
||||
if err = s.deleteOldServiceEvents(tx, serviceID); err != nil {
|
||||
log.Printf("[sql][Insert] Failed to delete old events for group=%s; service=%s: %s", service.Group, service.Name, err.Error())
|
||||
if err = s.deleteOldEndpointEvents(tx, endpointID); err != nil {
|
||||
log.Printf("[sql][Insert] Failed to delete old events for group=%s; endpoint=%s: %s", endpoint.Group, endpoint.Name, err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
// Second, we need to insert the result.
|
||||
if err = s.insertResult(tx, serviceID, result); err != nil {
|
||||
log.Printf("[sql][Insert] Failed to insert result for group=%s; service=%s: %s", service.Group, service.Name, err.Error())
|
||||
if err = s.insertEndpointResult(tx, endpointID, result); err != nil {
|
||||
log.Printf("[sql][Insert] Failed to insert result for group=%s; endpoint=%s: %s", endpoint.Group, endpoint.Name, err.Error())
|
||||
_ = tx.Rollback() // If we can't insert the result, we'll rollback now since there's no point continuing
|
||||
return err
|
||||
}
|
||||
// Clean up old results
|
||||
numberOfResults, err := s.getNumberOfResultsByServiceID(tx, serviceID)
|
||||
numberOfResults, err := s.getNumberOfResultsByEndpointID(tx, endpointID)
|
||||
if err != nil {
|
||||
log.Printf("[sql][Insert] Failed to retrieve total number of results for group=%s; service=%s: %s", service.Group, service.Name, err.Error())
|
||||
log.Printf("[sql][Insert] Failed to retrieve total number of results for group=%s; endpoint=%s: %s", endpoint.Group, endpoint.Name, err.Error())
|
||||
} else {
|
||||
if numberOfResults > resultsCleanUpThreshold {
|
||||
if err = s.deleteOldServiceResults(tx, serviceID); err != nil {
|
||||
log.Printf("[sql][Insert] Failed to delete old results for group=%s; service=%s: %s", service.Group, service.Name, err.Error())
|
||||
if err = s.deleteOldEndpointResults(tx, endpointID); err != nil {
|
||||
log.Printf("[sql][Insert] Failed to delete old results for group=%s; endpoint=%s: %s", endpoint.Group, endpoint.Name, err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
// Finally, we need to insert the uptime data.
|
||||
// Because the uptime data significantly outlives the results, we can't rely on the results for determining the uptime
|
||||
if err = s.updateServiceUptime(tx, serviceID, result); err != nil {
|
||||
log.Printf("[sql][Insert] Failed to update uptime for group=%s; service=%s: %s", service.Group, service.Name, err.Error())
|
||||
if err = s.updateEndpointUptime(tx, endpointID, result); err != nil {
|
||||
log.Printf("[sql][Insert] Failed to update uptime for group=%s; endpoint=%s: %s", endpoint.Group, endpoint.Name, err.Error())
|
||||
}
|
||||
// Clean up old uptime entries
|
||||
ageOfOldestUptimeEntry, err := s.getAgeOfOldestServiceUptimeEntry(tx, serviceID)
|
||||
ageOfOldestUptimeEntry, err := s.getAgeOfOldestEndpointUptimeEntry(tx, endpointID)
|
||||
if err != nil {
|
||||
log.Printf("[sql][Insert] Failed to retrieve oldest service uptime entry for group=%s; service=%s: %s", service.Group, service.Name, err.Error())
|
||||
log.Printf("[sql][Insert] Failed to retrieve oldest endpoint uptime entry for group=%s; endpoint=%s: %s", endpoint.Group, endpoint.Name, err.Error())
|
||||
} else {
|
||||
if ageOfOldestUptimeEntry > uptimeCleanUpThreshold {
|
||||
if err = s.deleteOldUptimeEntries(tx, serviceID, time.Now().Add(-(uptimeRetention + time.Hour))); err != nil {
|
||||
log.Printf("[sql][Insert] Failed to delete old uptime entries for group=%s; service=%s: %s", service.Group, service.Name, err.Error())
|
||||
if err = s.deleteOldUptimeEntries(tx, endpointID, time.Now().Add(-(uptimeRetention + time.Hour))); err != nil {
|
||||
log.Printf("[sql][Insert] Failed to delete old uptime entries for group=%s; endpoint=%s: %s", endpoint.Group, endpoint.Name, err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -328,25 +328,25 @@ func (s *Store) Insert(service *core.Service, result *core.Result) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// DeleteAllServiceStatusesNotInKeys removes all rows owned by a service whose key is not within the keys provided
|
||||
func (s *Store) DeleteAllServiceStatusesNotInKeys(keys []string) int {
|
||||
// DeleteAllEndpointStatusesNotInKeys removes all rows owned by an endpoint whose key is not within the keys provided
|
||||
func (s *Store) DeleteAllEndpointStatusesNotInKeys(keys []string) int {
|
||||
var err error
|
||||
var result sql.Result
|
||||
if len(keys) == 0 {
|
||||
// Delete everything
|
||||
result, err = s.db.Exec("DELETE FROM service")
|
||||
result, err = s.db.Exec("DELETE FROM endpoints")
|
||||
} else {
|
||||
args := make([]interface{}, 0, len(keys))
|
||||
query := "DELETE FROM service WHERE service_key NOT IN ("
|
||||
query := "DELETE FROM endpoints WHERE endpoint_key NOT IN ("
|
||||
for i := range keys {
|
||||
query += fmt.Sprintf("$%d,", i+1)
|
||||
args = append(args, keys[i])
|
||||
}
|
||||
query = query[:len(query)-1] + ")"
|
||||
query = query[:len(query)-1] + ")" // Remove the last comma and close the parenthesis
|
||||
result, err = s.db.Exec(query, args...)
|
||||
}
|
||||
if err != nil {
|
||||
log.Printf("[sql][DeleteAllServiceStatusesNotInKeys] Failed to delete rows that do not belong to any of keys=%v: %s", keys, err.Error())
|
||||
log.Printf("[sql][DeleteAllEndpointStatusesNotInKeys] Failed to delete rows that do not belong to any of keys=%v: %s", keys, err.Error())
|
||||
return 0
|
||||
}
|
||||
rowsAffects, _ := result.RowsAffected()
|
||||
@ -355,7 +355,7 @@ func (s *Store) DeleteAllServiceStatusesNotInKeys(keys []string) int {
|
||||
|
||||
// Clear deletes everything from the store
|
||||
func (s *Store) Clear() {
|
||||
_, _ = s.db.Exec("DELETE FROM service")
|
||||
_, _ = s.db.Exec("DELETE FROM endpoints")
|
||||
}
|
||||
|
||||
// Save does nothing, because this store is immediately persistent.
|
||||
@ -368,15 +368,15 @@ func (s *Store) Close() {
|
||||
_ = s.db.Close()
|
||||
}
|
||||
|
||||
// insertService inserts a service in the store and returns the generated id of said service
|
||||
func (s *Store) insertService(tx *sql.Tx, service *core.Service) (int64, error) {
|
||||
//log.Printf("[sql][insertService] Inserting service with group=%s and name=%s", service.Group, service.Name)
|
||||
// insertEndpoint inserts an endpoint in the store and returns the generated id of said endpoint
|
||||
func (s *Store) insertEndpoint(tx *sql.Tx, endpoint *core.Endpoint) (int64, error) {
|
||||
//log.Printf("[sql][insertEndpoint] Inserting endpoint with group=%s and name=%s", endpoint.Group, endpoint.Name)
|
||||
var id int64
|
||||
err := tx.QueryRow(
|
||||
"INSERT INTO service (service_key, service_name, service_group) VALUES ($1, $2, $3) RETURNING service_id",
|
||||
service.Key(),
|
||||
service.Name,
|
||||
service.Group,
|
||||
"INSERT INTO endpoints (endpoint_key, endpoint_name, endpoint_group) VALUES ($1, $2, $3) RETURNING endpoint_id",
|
||||
endpoint.Key(),
|
||||
endpoint.Name,
|
||||
endpoint.Group,
|
||||
).Scan(&id)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
@ -384,11 +384,11 @@ func (s *Store) insertService(tx *sql.Tx, service *core.Service) (int64, error)
|
||||
return id, nil
|
||||
}
|
||||
|
||||
// insertEvent inserts a service event in the store
|
||||
func (s *Store) insertEvent(tx *sql.Tx, serviceID int64, event *core.Event) error {
|
||||
// insertEndpointEvent inserts en event in the store
|
||||
func (s *Store) insertEndpointEvent(tx *sql.Tx, endpointID int64, event *core.Event) error {
|
||||
_, err := tx.Exec(
|
||||
"INSERT INTO service_event (service_id, event_type, event_timestamp) VALUES ($1, $2, $3)",
|
||||
serviceID,
|
||||
"INSERT INTO endpoint_events (endpoint_id, event_type, event_timestamp) VALUES ($1, $2, $3)",
|
||||
endpointID,
|
||||
event.Type,
|
||||
event.Timestamp.UTC(),
|
||||
)
|
||||
@ -398,16 +398,16 @@ func (s *Store) insertEvent(tx *sql.Tx, serviceID int64, event *core.Event) erro
|
||||
return nil
|
||||
}
|
||||
|
||||
// insertResult inserts a result in the store
|
||||
func (s *Store) insertResult(tx *sql.Tx, serviceID int64, result *core.Result) error {
|
||||
var serviceResultID int64
|
||||
// insertEndpointResult inserts a result in the store
|
||||
func (s *Store) insertEndpointResult(tx *sql.Tx, endpointID int64, result *core.Result) error {
|
||||
var endpointResultID int64
|
||||
err := tx.QueryRow(
|
||||
`
|
||||
INSERT INTO service_result (service_id, success, errors, connected, status, dns_rcode, certificate_expiration, hostname, ip, duration, timestamp)
|
||||
INSERT INTO endpoint_results (endpoint_id, success, errors, connected, status, dns_rcode, certificate_expiration, hostname, ip, duration, timestamp)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
|
||||
RETURNING service_result_id
|
||||
RETURNING endpoint_result_id
|
||||
`,
|
||||
serviceID,
|
||||
endpointID,
|
||||
result.Success,
|
||||
strings.Join(result.Errors, arraySeparator),
|
||||
result.Connected,
|
||||
@ -418,18 +418,18 @@ func (s *Store) insertResult(tx *sql.Tx, serviceID int64, result *core.Result) e
|
||||
result.IP,
|
||||
result.Duration,
|
||||
result.Timestamp.UTC(),
|
||||
).Scan(&serviceResultID)
|
||||
).Scan(&endpointResultID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return s.insertConditionResults(tx, serviceResultID, result.ConditionResults)
|
||||
return s.insertConditionResults(tx, endpointResultID, result.ConditionResults)
|
||||
}
|
||||
|
||||
func (s *Store) insertConditionResults(tx *sql.Tx, serviceResultID int64, conditionResults []*core.ConditionResult) error {
|
||||
func (s *Store) insertConditionResults(tx *sql.Tx, endpointResultID int64, conditionResults []*core.ConditionResult) error {
|
||||
var err error
|
||||
for _, cr := range conditionResults {
|
||||
_, err = tx.Exec("INSERT INTO service_result_condition (service_result_id, condition, success) VALUES ($1, $2, $3)",
|
||||
serviceResultID,
|
||||
_, err = tx.Exec("INSERT INTO endpoint_result_conditions (endpoint_result_id, condition, success) VALUES ($1, $2, $3)",
|
||||
endpointResultID,
|
||||
cr.Condition,
|
||||
cr.Success,
|
||||
)
|
||||
@ -440,7 +440,7 @@ func (s *Store) insertConditionResults(tx *sql.Tx, serviceResultID int64, condit
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Store) updateServiceUptime(tx *sql.Tx, serviceID int64, result *core.Result) error {
|
||||
func (s *Store) updateEndpointUptime(tx *sql.Tx, endpointID int64, result *core.Result) error {
|
||||
unixTimestampFlooredAtHour := result.Timestamp.Truncate(time.Hour).Unix()
|
||||
var successfulExecutions int
|
||||
if result.Success {
|
||||
@ -448,14 +448,14 @@ func (s *Store) updateServiceUptime(tx *sql.Tx, serviceID int64, result *core.Re
|
||||
}
|
||||
_, err := tx.Exec(
|
||||
`
|
||||
INSERT INTO service_uptime (service_id, hour_unix_timestamp, total_executions, successful_executions, total_response_time)
|
||||
INSERT INTO endpoint_uptimes (endpoint_id, hour_unix_timestamp, total_executions, successful_executions, total_response_time)
|
||||
VALUES ($1, $2, $3, $4, $5)
|
||||
ON CONFLICT(service_id, hour_unix_timestamp) DO UPDATE SET
|
||||
total_executions = excluded.total_executions + service_uptime.total_executions,
|
||||
successful_executions = excluded.successful_executions + service_uptime.successful_executions,
|
||||
total_response_time = excluded.total_response_time + service_uptime.total_response_time
|
||||
ON CONFLICT(endpoint_id, hour_unix_timestamp) DO UPDATE SET
|
||||
total_executions = excluded.total_executions + endpoint_uptimes.total_executions,
|
||||
successful_executions = excluded.successful_executions + endpoint_uptimes.successful_executions,
|
||||
total_response_time = excluded.total_response_time + endpoint_uptimes.total_response_time
|
||||
`,
|
||||
serviceID,
|
||||
endpointID,
|
||||
unixTimestampFlooredAtHour,
|
||||
1,
|
||||
successfulExecutions,
|
||||
@ -464,8 +464,8 @@ func (s *Store) updateServiceUptime(tx *sql.Tx, serviceID int64, result *core.Re
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *Store) getAllServiceKeys(tx *sql.Tx) (keys []string, err error) {
|
||||
rows, err := tx.Query("SELECT service_key FROM service ORDER BY service_key")
|
||||
func (s *Store) getAllEndpointKeys(tx *sql.Tx) (keys []string, err error) {
|
||||
rows, err := tx.Query("SELECT endpoint_key FROM endpoints ORDER BY endpoint_key")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -477,60 +477,60 @@ func (s *Store) getAllServiceKeys(tx *sql.Tx) (keys []string, err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (s *Store) getServiceStatusByKey(tx *sql.Tx, key string, parameters *paging.ServiceStatusParams) (*core.ServiceStatus, error) {
|
||||
serviceID, serviceGroup, serviceName, err := s.getServiceIDGroupAndNameByKey(tx, key)
|
||||
func (s *Store) getEndpointStatusByKey(tx *sql.Tx, key string, parameters *paging.EndpointStatusParams) (*core.EndpointStatus, error) {
|
||||
endpointID, group, endpointName, err := s.getEndpointIDGroupAndNameByKey(tx, key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
serviceStatus := core.NewServiceStatus(key, serviceGroup, serviceName)
|
||||
endpointStatus := core.NewEndpointStatus(group, endpointName)
|
||||
if parameters.EventsPageSize > 0 {
|
||||
if serviceStatus.Events, err = s.getEventsByServiceID(tx, serviceID, parameters.EventsPage, parameters.EventsPageSize); err != nil {
|
||||
log.Printf("[sql][getServiceStatusByKey] Failed to retrieve events for key=%s: %s", key, err.Error())
|
||||
if endpointStatus.Events, err = s.getEndpointEventsByEndpointID(tx, endpointID, parameters.EventsPage, parameters.EventsPageSize); err != nil {
|
||||
log.Printf("[sql][getEndpointStatusByKey] Failed to retrieve events for key=%s: %s", key, err.Error())
|
||||
}
|
||||
}
|
||||
if parameters.ResultsPageSize > 0 {
|
||||
if serviceStatus.Results, err = s.getResultsByServiceID(tx, serviceID, parameters.ResultsPage, parameters.ResultsPageSize); err != nil {
|
||||
log.Printf("[sql][getServiceStatusByKey] Failed to retrieve results for key=%s: %s", key, err.Error())
|
||||
if endpointStatus.Results, err = s.getEndpointResultsByEndpointID(tx, endpointID, parameters.ResultsPage, parameters.ResultsPageSize); err != nil {
|
||||
log.Printf("[sql][getEndpointStatusByKey] Failed to retrieve results for key=%s: %s", key, err.Error())
|
||||
}
|
||||
}
|
||||
//if parameters.IncludeUptime {
|
||||
// now := time.Now()
|
||||
// serviceStatus.Uptime.LastHour, _, err = s.getServiceUptime(tx, serviceID, now.Add(-time.Hour), now)
|
||||
// serviceStatus.Uptime.LastTwentyFourHours, _, err = s.getServiceUptime(tx, serviceID, now.Add(-24*time.Hour), now)
|
||||
// serviceStatus.Uptime.LastSevenDays, _, err = s.getServiceUptime(tx, serviceID, now.Add(-7*24*time.Hour), now)
|
||||
// endpointStatus.Uptime.LastHour, _, err = s.getEndpointUptime(tx, endpointID, now.Add(-time.Hour), now)
|
||||
// endpointStatus.Uptime.LastTwentyFourHours, _, err = s.getEndpointUptime(tx, endpointID, now.Add(-24*time.Hour), now)
|
||||
// endpointStatus.Uptime.LastSevenDays, _, err = s.getEndpointUptime(tx, endpointID, now.Add(-7*24*time.Hour), now)
|
||||
//}
|
||||
return serviceStatus, nil
|
||||
return endpointStatus, nil
|
||||
}
|
||||
|
||||
func (s *Store) getServiceIDGroupAndNameByKey(tx *sql.Tx, key string) (id int64, group, name string, err error) {
|
||||
func (s *Store) getEndpointIDGroupAndNameByKey(tx *sql.Tx, key string) (id int64, group, name string, err error) {
|
||||
err = tx.QueryRow(
|
||||
`
|
||||
SELECT service_id, service_group, service_name
|
||||
FROM service
|
||||
WHERE service_key = $1
|
||||
SELECT endpoint_id, endpoint_group, endpoint_name
|
||||
FROM endpoints
|
||||
WHERE endpoint_key = $1
|
||||
LIMIT 1
|
||||
`,
|
||||
key,
|
||||
).Scan(&id, &group, &name)
|
||||
if err != nil {
|
||||
if err == sql.ErrNoRows {
|
||||
return 0, "", "", common.ErrServiceNotFound
|
||||
return 0, "", "", common.ErrEndpointNotFound
|
||||
}
|
||||
return 0, "", "", err
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (s *Store) getEventsByServiceID(tx *sql.Tx, serviceID int64, page, pageSize int) (events []*core.Event, err error) {
|
||||
func (s *Store) getEndpointEventsByEndpointID(tx *sql.Tx, endpointID int64, page, pageSize int) (events []*core.Event, err error) {
|
||||
rows, err := tx.Query(
|
||||
`
|
||||
SELECT event_type, event_timestamp
|
||||
FROM service_event
|
||||
WHERE service_id = $1
|
||||
ORDER BY service_event_id ASC
|
||||
FROM endpoint_events
|
||||
WHERE endpoint_id = $1
|
||||
ORDER BY endpoint_event_id ASC
|
||||
LIMIT $2 OFFSET $3
|
||||
`,
|
||||
serviceID,
|
||||
endpointID,
|
||||
pageSize,
|
||||
(page-1)*pageSize,
|
||||
)
|
||||
@ -545,16 +545,16 @@ func (s *Store) getEventsByServiceID(tx *sql.Tx, serviceID int64, page, pageSize
|
||||
return
|
||||
}
|
||||
|
||||
func (s *Store) getResultsByServiceID(tx *sql.Tx, serviceID int64, page, pageSize int) (results []*core.Result, err error) {
|
||||
func (s *Store) getEndpointResultsByEndpointID(tx *sql.Tx, endpointID int64, page, pageSize int) (results []*core.Result, err error) {
|
||||
rows, err := tx.Query(
|
||||
`
|
||||
SELECT service_result_id, success, errors, connected, status, dns_rcode, certificate_expiration, hostname, ip, duration, timestamp
|
||||
FROM service_result
|
||||
WHERE service_id = $1
|
||||
ORDER BY service_result_id DESC -- Normally, we'd sort by timestamp, but sorting by service_result_id is faster
|
||||
SELECT endpoint_result_id, success, errors, connected, status, dns_rcode, certificate_expiration, hostname, ip, duration, timestamp
|
||||
FROM endpoint_results
|
||||
WHERE endpoint_id = $1
|
||||
ORDER BY endpoint_result_id DESC -- Normally, we'd sort by timestamp, but sorting by endpoint_result_id is faster
|
||||
LIMIT $2 OFFSET $3
|
||||
`,
|
||||
serviceID,
|
||||
endpointID,
|
||||
pageSize,
|
||||
(page-1)*pageSize,
|
||||
)
|
||||
@ -576,13 +576,13 @@ func (s *Store) getResultsByServiceID(tx *sql.Tx, serviceID int64, page, pageSiz
|
||||
}
|
||||
// Get condition results
|
||||
args := make([]interface{}, 0, len(idResultMap))
|
||||
query := `SELECT service_result_id, condition, success
|
||||
FROM service_result_condition
|
||||
WHERE service_result_id IN (`
|
||||
query := `SELECT endpoint_result_id, condition, success
|
||||
FROM endpoint_result_conditions
|
||||
WHERE endpoint_result_id IN (`
|
||||
index := 1
|
||||
for serviceResultID := range idResultMap {
|
||||
for endpointResultID := range idResultMap {
|
||||
query += fmt.Sprintf("$%d,", index)
|
||||
args = append(args, serviceResultID)
|
||||
args = append(args, endpointResultID)
|
||||
index++
|
||||
}
|
||||
query = query[:len(query)-1] + ")"
|
||||
@ -593,25 +593,25 @@ func (s *Store) getResultsByServiceID(tx *sql.Tx, serviceID int64, page, pageSiz
|
||||
defer rows.Close() // explicitly defer the close in case an error happens during the scan
|
||||
for rows.Next() {
|
||||
conditionResult := &core.ConditionResult{}
|
||||
var serviceResultID int64
|
||||
if err = rows.Scan(&serviceResultID, &conditionResult.Condition, &conditionResult.Success); err != nil {
|
||||
var endpointResultID int64
|
||||
if err = rows.Scan(&endpointResultID, &conditionResult.Condition, &conditionResult.Success); err != nil {
|
||||
return
|
||||
}
|
||||
idResultMap[serviceResultID].ConditionResults = append(idResultMap[serviceResultID].ConditionResults, conditionResult)
|
||||
idResultMap[endpointResultID].ConditionResults = append(idResultMap[endpointResultID].ConditionResults, conditionResult)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (s *Store) getServiceUptime(tx *sql.Tx, serviceID int64, from, to time.Time) (uptime float64, avgResponseTime time.Duration, err error) {
|
||||
func (s *Store) getEndpointUptime(tx *sql.Tx, endpointID int64, from, to time.Time) (uptime float64, avgResponseTime time.Duration, err error) {
|
||||
rows, err := tx.Query(
|
||||
`
|
||||
SELECT SUM(total_executions), SUM(successful_executions), SUM(total_response_time)
|
||||
FROM service_uptime
|
||||
WHERE service_id = $1
|
||||
FROM endpoint_uptimes
|
||||
WHERE endpoint_id = $1
|
||||
AND hour_unix_timestamp >= $2
|
||||
AND hour_unix_timestamp <= $3
|
||||
`,
|
||||
serviceID,
|
||||
endpointID,
|
||||
from.Unix(),
|
||||
to.Unix(),
|
||||
)
|
||||
@ -629,17 +629,17 @@ func (s *Store) getServiceUptime(tx *sql.Tx, serviceID int64, from, to time.Time
|
||||
return
|
||||
}
|
||||
|
||||
func (s *Store) getServiceAverageResponseTime(tx *sql.Tx, serviceID int64, from, to time.Time) (int, error) {
|
||||
func (s *Store) getEndpointAverageResponseTime(tx *sql.Tx, endpointID int64, from, to time.Time) (int, error) {
|
||||
rows, err := tx.Query(
|
||||
`
|
||||
SELECT SUM(total_executions), SUM(total_response_time)
|
||||
FROM service_uptime
|
||||
WHERE service_id = $1
|
||||
FROM endpoint_uptimes
|
||||
WHERE endpoint_id = $1
|
||||
AND total_executions > 0
|
||||
AND hour_unix_timestamp >= $2
|
||||
AND hour_unix_timestamp <= $3
|
||||
`,
|
||||
serviceID,
|
||||
endpointID,
|
||||
from.Unix(),
|
||||
to.Unix(),
|
||||
)
|
||||
@ -656,17 +656,17 @@ func (s *Store) getServiceAverageResponseTime(tx *sql.Tx, serviceID int64, from,
|
||||
return int(float64(totalResponseTime) / float64(totalExecutions)), nil
|
||||
}
|
||||
|
||||
func (s *Store) getServiceHourlyAverageResponseTimes(tx *sql.Tx, serviceID int64, from, to time.Time) (map[int64]int, error) {
|
||||
func (s *Store) getEndpointHourlyAverageResponseTimes(tx *sql.Tx, endpointID int64, from, to time.Time) (map[int64]int, error) {
|
||||
rows, err := tx.Query(
|
||||
`
|
||||
SELECT hour_unix_timestamp, total_executions, total_response_time
|
||||
FROM service_uptime
|
||||
WHERE service_id = $1
|
||||
FROM endpoint_uptimes
|
||||
WHERE endpoint_id = $1
|
||||
AND total_executions > 0
|
||||
AND hour_unix_timestamp >= $2
|
||||
AND hour_unix_timestamp <= $3
|
||||
`,
|
||||
serviceID,
|
||||
endpointID,
|
||||
from.Unix(),
|
||||
to.Unix(),
|
||||
)
|
||||
@ -683,59 +683,59 @@ func (s *Store) getServiceHourlyAverageResponseTimes(tx *sql.Tx, serviceID int64
|
||||
return hourlyAverageResponseTimes, nil
|
||||
}
|
||||
|
||||
func (s *Store) getServiceID(tx *sql.Tx, service *core.Service) (int64, error) {
|
||||
func (s *Store) getEndpointID(tx *sql.Tx, endpoint *core.Endpoint) (int64, error) {
|
||||
var id int64
|
||||
err := tx.QueryRow("SELECT service_id FROM service WHERE service_key = $1", service.Key()).Scan(&id)
|
||||
err := tx.QueryRow("SELECT endpoint_id FROM endpoints WHERE endpoint_key = $1", endpoint.Key()).Scan(&id)
|
||||
if err != nil {
|
||||
if err == sql.ErrNoRows {
|
||||
return 0, common.ErrServiceNotFound
|
||||
return 0, common.ErrEndpointNotFound
|
||||
}
|
||||
return 0, err
|
||||
}
|
||||
return id, nil
|
||||
}
|
||||
|
||||
func (s *Store) getNumberOfEventsByServiceID(tx *sql.Tx, serviceID int64) (int64, error) {
|
||||
func (s *Store) getNumberOfEventsByEndpointID(tx *sql.Tx, endpointID int64) (int64, error) {
|
||||
var numberOfEvents int64
|
||||
err := tx.QueryRow("SELECT COUNT(1) FROM service_event WHERE service_id = $1", serviceID).Scan(&numberOfEvents)
|
||||
err := tx.QueryRow("SELECT COUNT(1) FROM endpoint_events WHERE endpoint_id = $1", endpointID).Scan(&numberOfEvents)
|
||||
return numberOfEvents, err
|
||||
}
|
||||
|
||||
func (s *Store) getNumberOfResultsByServiceID(tx *sql.Tx, serviceID int64) (int64, error) {
|
||||
func (s *Store) getNumberOfResultsByEndpointID(tx *sql.Tx, endpointID int64) (int64, error) {
|
||||
var numberOfResults int64
|
||||
err := tx.QueryRow("SELECT COUNT(1) FROM service_result WHERE service_id = $1", serviceID).Scan(&numberOfResults)
|
||||
err := tx.QueryRow("SELECT COUNT(1) FROM endpoint_results WHERE endpoint_id = $1", endpointID).Scan(&numberOfResults)
|
||||
return numberOfResults, err
|
||||
}
|
||||
|
||||
func (s *Store) getAgeOfOldestServiceUptimeEntry(tx *sql.Tx, serviceID int64) (time.Duration, error) {
|
||||
func (s *Store) getAgeOfOldestEndpointUptimeEntry(tx *sql.Tx, endpointID int64) (time.Duration, error) {
|
||||
rows, err := tx.Query(
|
||||
`
|
||||
SELECT hour_unix_timestamp
|
||||
FROM service_uptime
|
||||
WHERE service_id = $1
|
||||
FROM endpoint_uptimes
|
||||
WHERE endpoint_id = $1
|
||||
ORDER BY hour_unix_timestamp
|
||||
LIMIT 1
|
||||
`,
|
||||
serviceID,
|
||||
endpointID,
|
||||
)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
var oldestServiceUptimeUnixTimestamp int64
|
||||
var oldestEndpointUptimeUnixTimestamp int64
|
||||
var found bool
|
||||
for rows.Next() {
|
||||
_ = rows.Scan(&oldestServiceUptimeUnixTimestamp)
|
||||
_ = rows.Scan(&oldestEndpointUptimeUnixTimestamp)
|
||||
found = true
|
||||
}
|
||||
if !found {
|
||||
return 0, errNoRowsReturned
|
||||
}
|
||||
return time.Since(time.Unix(oldestServiceUptimeUnixTimestamp, 0)), nil
|
||||
return time.Since(time.Unix(oldestEndpointUptimeUnixTimestamp, 0)), nil
|
||||
}
|
||||
|
||||
func (s *Store) getLastServiceResultSuccessValue(tx *sql.Tx, serviceID int64) (bool, error) {
|
||||
func (s *Store) getLastEndpointResultSuccessValue(tx *sql.Tx, endpointID int64) (bool, error) {
|
||||
var success bool
|
||||
err := tx.QueryRow("SELECT success FROM service_result WHERE service_id = $1 ORDER BY service_result_id DESC LIMIT 1", serviceID).Scan(&success)
|
||||
err := tx.QueryRow("SELECT success FROM endpoint_results WHERE endpoint_id = $1 ORDER BY endpoint_result_id DESC LIMIT 1", endpointID).Scan(&success)
|
||||
if err != nil {
|
||||
if err == sql.ErrNoRows {
|
||||
return false, errNoRowsReturned
|
||||
@ -745,47 +745,47 @@ func (s *Store) getLastServiceResultSuccessValue(tx *sql.Tx, serviceID int64) (b
|
||||
return success, nil
|
||||
}
|
||||
|
||||
// deleteOldServiceEvents deletes old service events that are no longer needed
|
||||
func (s *Store) deleteOldServiceEvents(tx *sql.Tx, serviceID int64) error {
|
||||
// deleteOldEndpointEvents deletes endpoint events that are no longer needed
|
||||
func (s *Store) deleteOldEndpointEvents(tx *sql.Tx, endpointID int64) error {
|
||||
_, err := tx.Exec(
|
||||
`
|
||||
DELETE FROM service_event
|
||||
WHERE service_id = $1
|
||||
AND service_event_id NOT IN (
|
||||
SELECT service_event_id
|
||||
FROM service_event
|
||||
WHERE service_id = $1
|
||||
ORDER BY service_event_id DESC
|
||||
DELETE FROM endpoint_events
|
||||
WHERE endpoint_id = $1
|
||||
AND endpoint_event_id NOT IN (
|
||||
SELECT endpoint_event_id
|
||||
FROM endpoint_events
|
||||
WHERE endpoint_id = $1
|
||||
ORDER BY endpoint_event_id DESC
|
||||
LIMIT $2
|
||||
)
|
||||
`,
|
||||
serviceID,
|
||||
endpointID,
|
||||
common.MaximumNumberOfEvents,
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
// deleteOldServiceResults deletes old service results that are no longer needed
|
||||
func (s *Store) deleteOldServiceResults(tx *sql.Tx, serviceID int64) error {
|
||||
// deleteOldEndpointResults deletes endpoint results that are no longer needed
|
||||
func (s *Store) deleteOldEndpointResults(tx *sql.Tx, endpointID int64) error {
|
||||
_, err := tx.Exec(
|
||||
`
|
||||
DELETE FROM service_result
|
||||
WHERE service_id = $1
|
||||
AND service_result_id NOT IN (
|
||||
SELECT service_result_id
|
||||
FROM service_result
|
||||
WHERE service_id = $1
|
||||
ORDER BY service_result_id DESC
|
||||
DELETE FROM endpoint_results
|
||||
WHERE endpoint_id = $1
|
||||
AND endpoint_result_id NOT IN (
|
||||
SELECT endpoint_result_id
|
||||
FROM endpoint_results
|
||||
WHERE endpoint_id = $1
|
||||
ORDER BY endpoint_result_id DESC
|
||||
LIMIT $2
|
||||
)
|
||||
`,
|
||||
serviceID,
|
||||
endpointID,
|
||||
common.MaximumNumberOfResults,
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *Store) deleteOldUptimeEntries(tx *sql.Tx, serviceID int64, maxAge time.Time) error {
|
||||
_, err := tx.Exec("DELETE FROM service_uptime WHERE service_id = $1 AND hour_unix_timestamp < $2", serviceID, maxAge.Unix())
|
||||
func (s *Store) deleteOldUptimeEntries(tx *sql.Tx, endpointID int64, maxAge time.Time) error {
|
||||
_, err := tx.Exec("DELETE FROM endpoint_uptimes WHERE endpoint_id = $1 AND hour_unix_timestamp < $2", endpointID, maxAge.Unix())
|
||||
return err
|
||||
}
|
||||
|
Reference in New Issue
Block a user