From 0a3267e49992f0b03a09d3ab14fecfab172a2750 Mon Sep 17 00:00:00 2001 From: TwinProduction Date: Mon, 12 Jul 2021 02:54:16 -0400 Subject: [PATCH] Reuse transaction on insert to improve performance --- storage/store/database/database.go | 78 +++++++++++++++--------------- 1 file changed, 40 insertions(+), 38 deletions(-) diff --git a/storage/store/database/database.go b/storage/store/database/database.go index 574afcf4..f67f1f30 100644 --- a/storage/store/database/database.go +++ b/storage/store/database/database.go @@ -78,6 +78,7 @@ func (s *Store) createSchema() error { service_result_id INTEGER PRIMARY KEY, service_id INTEGER REFERENCES service(id), success INTEGER, + errors TEXT, connected INTEGER, status INTEGER, dns_rcode TEXT, @@ -218,12 +219,16 @@ func (s *Store) getResultsByServiceID(serviceID int64) (results []*core.Result, // Insert adds the observed result for the specified service into the store func (s *Store) Insert(service *core.Service, result *core.Result) { + tx, err := s.db.Begin() + if err != nil { + return + } //start := time.Now() - serviceID, err := s.getServiceID(service) + serviceID, err := s.getServiceID(tx, service) if err != nil { if err == errServiceNotFoundInDatabase { // Service doesn't exist in the database, insert it - if serviceID, err = s.insertService(service); err != nil { + if serviceID, err = s.insertService(tx, service); err != nil { return // failed to insert service } } else { @@ -239,14 +244,14 @@ func (s *Store) Insert(service *core.Service, result *core.Result) { // 2. The lastResult.Success != result.Success. This implies that the service went from healthy to unhealthy or // vice-versa, in which case we will have to create a new event of type EventHealthy or EventUnhealthy // based on result.Success. - numberOfEvents, err := s.getNumberOfEventsByServiceID(serviceID) + numberOfEvents, err := s.getNumberOfEventsByServiceID(tx, serviceID) if err != nil { return } //log.Printf("there are currently %d events", numberOfEvents) if numberOfEvents == 0 { // There's no events yet, which means we need to add the EventStart and the first healthy/unhealthy event - err = s.insertEvent(serviceID, &core.Event{ + err = s.insertEvent(tx, serviceID, &core.Event{ Type: core.EventStart, Timestamp: result.Timestamp.Add(-result.Duration), }) @@ -255,7 +260,7 @@ func (s *Store) Insert(service *core.Service, result *core.Result) { log.Printf("[database][Insert] Failed to insert event=%s for group=%s; service=%s: %s", core.EventStart, service.Name, service.Group, err.Error()) } event := generateEventBasedOnResult(result) - err = s.insertEvent(serviceID, event) + err = s.insertEvent(tx, serviceID, event) if err != nil { // Silently fail log.Printf("[database][Insert] Failed to insert event=%s for group=%s; service=%s: %s", event.Type, service.Name, service.Group, err.Error()) @@ -263,7 +268,7 @@ func (s *Store) Insert(service *core.Service, result *core.Result) { } else { // Get the success value of the previous result var lastResultSuccess bool - lastResultSuccess, err = s.getLastServiceResultSuccessValue(serviceID) + lastResultSuccess, err = s.getLastServiceResultSuccessValue(tx, serviceID) if err != nil { log.Printf("[database][Insert] Failed to retrieve outcome of previous result for group=%s; service=%s: %s", service.Name, service.Group, err.Error()) } else { @@ -273,7 +278,7 @@ func (s *Store) Insert(service *core.Service, result *core.Result) { // an event to mark the change in state if lastResultSuccess != result.Success { event := generateEventBasedOnResult(result) - err = s.insertEvent(serviceID, event) + err = s.insertEvent(tx, serviceID, event) if err != nil { // Silently fail log.Printf("[database][Insert] Failed to insert event=%s for group=%s; service=%s: %s", event.Type, service.Name, service.Group, err.Error()) @@ -282,19 +287,23 @@ func (s *Store) Insert(service *core.Service, result *core.Result) { } } // Second, we need to insert the result. - err = s.insertResult(serviceID, result) + err = s.insertResult(tx, serviceID, result) if err != nil { // Silently fail log.Printf("[database][Insert] Failed to insert result for group=%s; service=%s: %s", service.Name, service.Group, err.Error()) } - //log.Printf("[database][Insert] Successfully inserted result in duration=%dms", time.Since(start).Milliseconds()) + //log.Printf("[database][Insert] Successfully inserted result in duration=%dns", time.Since(start).Nanoseconds()) + err = tx.Commit() + if err != nil { + _ = tx.Rollback() + } return } -func (s *Store) getServiceID(service *core.Service) (int64, error) { - rows, err := s.db.Query( - "SELECT service_id FROM service WHERE service_name = $1 AND service_group = $2", - service.Name, +func (s *Store) getServiceID(tx *sql.Tx, service *core.Service) (int64, error) { + rows, err := tx.Query( + "SELECT service_id FROM service WHERE service_key = $1", + service.Key(), service.Group, ) if err != nil { @@ -315,8 +324,9 @@ func (s *Store) getServiceID(service *core.Service) (int64, error) { } // insertService inserts a service in the store and returns the generated id of said service -func (s *Store) insertService(service *core.Service) (int64, error) { - result, err := s.db.Exec( +func (s *Store) insertService(tx *sql.Tx, service *core.Service) (int64, error) { + //log.Printf("[database][insertService] Inserting service with group=%s and name=%s", service.Group, service.Name) + result, err := tx.Exec( "INSERT INTO service (service_key, service_name, service_group) VALUES ($1, $2, $3)", service.Key(), service.Name, @@ -328,8 +338,8 @@ func (s *Store) insertService(service *core.Service) (int64, error) { return result.LastInsertId() } -func (s *Store) getNumberOfEventsByServiceID(serviceID int64) (int64, error) { - rows, err := s.db.Query("SELECT COUNT(1) FROM service_event WHERE service_id = $1", serviceID) +func (s *Store) getNumberOfEventsByServiceID(tx *sql.Tx, serviceID int64) (int64, error) { + rows, err := tx.Query("SELECT COUNT(1) FROM service_event WHERE service_id = $1", serviceID) if err != nil { return 0, err } @@ -347,9 +357,9 @@ func (s *Store) getNumberOfEventsByServiceID(serviceID int64) (int64, error) { return numberOfEvents, nil } -// insertService inserts a service in the store and returns the generated id of said service -func (s *Store) insertEvent(serviceID int64, event *core.Event) error { - _, err := s.db.Exec( +// insertEvent inserts a service event in the store +func (s *Store) insertEvent(tx *sql.Tx, serviceID int64, event *core.Event) error { + _, err := tx.Exec( "INSERT INTO service_event (service_id, event_type, event_timestamp) VALUES ($1, $2, $3)", serviceID, event.Type, @@ -361,8 +371,8 @@ func (s *Store) insertEvent(serviceID int64, event *core.Event) error { return nil } -func (s *Store) getLastServiceResultSuccessValue(serviceID int64) (bool, error) { - rows, err := s.db.Query("SELECT success FROM service_result WHERE service_id = $1 ORDER BY timestamp LIMIT 1", serviceID) +func (s *Store) getLastServiceResultSuccessValue(tx *sql.Tx, serviceID int64) (bool, error) { + rows, err := tx.Query("SELECT success FROM service_result WHERE service_id = $1 ORDER BY timestamp LIMIT 1", serviceID) if err != nil { return false, err } @@ -380,9 +390,9 @@ func (s *Store) getLastServiceResultSuccessValue(serviceID int64) (bool, error) return success, nil } -// insertService inserts a service in the store and returns the generated id of said service -func (s *Store) insertResult(serviceID int64, result *core.Result) error { - res, err := s.db.Exec(` +// insertResult inserts a result in the store +func (s *Store) insertResult(tx *sql.Tx, serviceID int64, result *core.Result) error { + res, err := tx.Exec(` INSERT INTO service_result (service_id, success, connected, status, dns_rcode, certificate_expiration, hostname, ip, duration, timestamp) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)`, serviceID, @@ -403,30 +413,22 @@ func (s *Store) insertResult(serviceID int64, result *core.Result) error { if err != nil { return err } - return s.insertConditionResults(serviceResultID, result.ConditionResults) + return s.insertConditionResults(tx, serviceResultID, result.ConditionResults) } -func (s *Store) insertConditionResults(serviceResultID int64, conditionResults []*core.ConditionResult) error { - transaction, err := s.db.Begin() - if err != nil { - return err - } +func (s *Store) insertConditionResults(tx *sql.Tx, serviceResultID int64, conditionResults []*core.ConditionResult) error { + var err error for _, cr := range conditionResults { - _, err = transaction.Exec("INSERT INTO service_result_condition (service_result_id, condition, success) VALUES ($1, $2, $3)", + _, err = tx.Exec("INSERT INTO service_result_condition (service_result_id, condition, success) VALUES ($1, $2, $3)", serviceResultID, cr.Condition, cr.Success, ) if err != nil { - _ = transaction.Rollback() + _ = tx.Rollback() return err } } - err = transaction.Commit() - if err != nil { - _ = transaction.Rollback() - return err - } return nil }