Modified copy and delete record to sql routines to lock the queue record before attempting to prevent long running copy or delete calls form being processed more than once and wasting resources.
This commit is contained in:
@ -1,6 +1,4 @@
|
||||
Compile subroutine Copy_Pending_Records_To_SQL(VOID)
|
||||
#pragma precomp SRP_PreCompiler
|
||||
|
||||
/*****************************************************************************\
|
||||
This is a utility function. For each record found in SQL_PENDING, it makes
|
||||
sure the identified record is copied to SQL.
|
||||
@ -9,98 +7,104 @@ Compile subroutine Copy_Pending_Records_To_SQL(VOID)
|
||||
-------
|
||||
02/06/2023 DJS Adapted from Copy_Backlog_Records_To_SQL
|
||||
\*****************************************************************************/
|
||||
|
||||
#pragma precomp SRP_PreCompiler
|
||||
$Insert APP_INSERTS
|
||||
|
||||
EQU FIVE_MINUTES$ to 0.003472
|
||||
EQU SECONDS_PER_DAY$ to 86400
|
||||
EQU THIRTY_MINUTES$ to 1800
|
||||
|
||||
Declare subroutine Copy_Record_To_SQL, Delete_Record_From_SQL, SRP_TcpClient, Database_Services
|
||||
Declare function GetTickCount, Database_Services, SRP_TcpClient, Datetime, Database_Services, Environment_Services
|
||||
|
||||
// This only needs to be called every once in a while (e.g. 5 minutes ~ 0.003472)
|
||||
Run = False$
|
||||
CurrDtm = Datetime()
|
||||
LastRunDtm = Database_Services('ReadDataRow', 'APP_INFO', 'SQL_PENDING_LAST_RUNTIME')
|
||||
If LastRunDtm NE '' then
|
||||
TimeDiff = CurrDtm - LastRunDtm
|
||||
If TimeDiff GT 0.003472 then
|
||||
Run = True$
|
||||
LastRunDtm = CurrDtm
|
||||
end
|
||||
end else
|
||||
Run = True$
|
||||
LastRunDtm = CurrDtm
|
||||
end
|
||||
Main:
|
||||
|
||||
If Run then
|
||||
Database_Services('WriteDataRow', 'APP_INFO', 'SQL_PENDING_LAST_RUNTIME', LastRunDtm)
|
||||
hSysLists = Database_Services('GetTableHandle', 'SYSLISTS')
|
||||
Lock hSysLists, 'Copy_Pending_Records_To_SQL' then
|
||||
// This only needs to be called every once in a while (e.g. 5 minutes ~ 0.003472)
|
||||
Run = False$
|
||||
CurrDtm = Datetime()
|
||||
LastRunDtm = Database_Services('ReadDataRow', 'APP_INFO', 'SQL_PENDING_LAST_RUNTIME')
|
||||
If LastRunDtm NE '' then
|
||||
TimeDiff = CurrDtm - LastRunDtm
|
||||
If TimeDiff GT FIVE_MINUTES$ then
|
||||
Run = True$
|
||||
LastRunDtm = CurrDtm
|
||||
end
|
||||
end else
|
||||
Run = True$
|
||||
LastRunDtm = CurrDtm
|
||||
end
|
||||
|
||||
NonCriticalTables = Database_Services('ReadDataRow', 'APP_INFO', 'NONCRITICAL_SQL_TABLES')
|
||||
Open "SQL_PENDING" to pTable then
|
||||
|
||||
ClearSelect
|
||||
Select pTable
|
||||
Done = False$
|
||||
Loop
|
||||
ReadNext Key else Done = True$
|
||||
Command = ''
|
||||
Until Done
|
||||
|
||||
Read Rec from pTable, Key then
|
||||
TransDtm = Rec<1>
|
||||
ElapSecs = (CurrDtm - TransDtm) * 86400
|
||||
If ElapSecs LT 1800 then
|
||||
// Retry recent pending transactions (1800 seconds = 30 minutes)
|
||||
Table = Key[1, "*"]
|
||||
Locate Table in NonCriticalTables using @FM setting Dummy then
|
||||
ID = Key[Col2() + 1, Len(Key)]
|
||||
Begin Case
|
||||
Case ID[-7, 7] EQ "*DELETE"
|
||||
ID[-7, 7] = ""
|
||||
Command := 'RUN DELETE_RECORD_FROM_SQL ':Quote(Table):', ':Quote(ID):', 1, ':Quote(Key)
|
||||
Case ID[-6, 6] EQ "*WRITE"
|
||||
ID[-6, 6] = ""
|
||||
Command := 'RUN COPY_RECORD_TO_SQL ':Quote(Table):', ':Quote(ID):', 1, ':Quote(Key)
|
||||
End Case
|
||||
end else
|
||||
TempKey = Key
|
||||
TempKey[1, Len(Table) + 1] = ''
|
||||
TransTime = TempKey[-1, 'B*']
|
||||
TempKey[-1, Neg(Len(TransTime) + 1)] = ''
|
||||
TransDate = TempKey[-1, 'B*']
|
||||
TempKey[-1, Neg(Len(TransDate) + 1)] = ''
|
||||
Process = TempKey[-1, 'B*']
|
||||
TempKey[-1, Neg(Len(Process) + 1)] = ''
|
||||
ID = TempKey
|
||||
If Process EQ 'DELETE' then
|
||||
Command := 'RUN DELETE_RECORD_FROM_SQL ':Quote(Table):', ':Quote(ID):', 1, ':Quote(Key)
|
||||
end else
|
||||
Command := 'RUN COPY_RECORD_TO_SQL ':Quote(Table):', ':Quote(ID):', 1, ':Quote(Key)
|
||||
end
|
||||
end
|
||||
|
||||
TcpClientHandle = 0
|
||||
ServerIP = Environment_Services('GetApplicationRootIP')
|
||||
Convert '\' to '' in ServerIP
|
||||
|
||||
If SRP_TcpClient(TcpClientHandle, 'CONNECT', ServerIP, '20779') then
|
||||
SRP_TcpClient(TcpClientHandle, 'SEND_SES', Command)
|
||||
SRP_TcpClient(TcpClientHandle, 'CLOSE_SES')
|
||||
end
|
||||
|
||||
end else
|
||||
// This transaction most likely won't replicate due to a schema mapping issue or because
|
||||
// the record no longer exists in OI, so remove this transaction from the pending queue.
|
||||
Delete pTable, Key else null
|
||||
end
|
||||
end
|
||||
If Run then
|
||||
Database_Services('WriteDataRow', 'APP_INFO', 'SQL_PENDING_LAST_RUNTIME', LastRunDtm)
|
||||
hSysLists = Database_Services('GetTableHandle', 'SYSLISTS')
|
||||
Lock hSysLists, 'Copy_Pending_Records_To_SQL' then
|
||||
|
||||
Repeat
|
||||
end
|
||||
Unlock hSysLists, 'Copy_Pending_Records_To_SQL' else Null
|
||||
end
|
||||
end
|
||||
NonCriticalTables = Database_Services('ReadDataRow', 'APP_INFO', 'NONCRITICAL_SQL_TABLES')
|
||||
Open "SQL_PENDING" to pTable then
|
||||
|
||||
ClearSelect
|
||||
Select pTable
|
||||
Done = False$
|
||||
Loop
|
||||
ReadNext Key else Done = True$
|
||||
Command = ''
|
||||
Until Done
|
||||
|
||||
Read Rec from pTable, Key then
|
||||
TransDtm = Rec<1>
|
||||
ElapSecs = (CurrDtm - TransDtm) * SECONDS_PER_DAY$
|
||||
If ElapSecs LT THIRTY_MINUTES$ then
|
||||
// Retry recent pending transactions (i.e., those submitted in the last 30 minutes)
|
||||
Table = Key[1, "*"]
|
||||
Locate Table in NonCriticalTables using @FM setting Dummy then
|
||||
ID = Key[Col2() + 1, Len(Key)]
|
||||
Begin Case
|
||||
Case ID[-7, 7] EQ "*DELETE"
|
||||
ID[-7, 7] = ""
|
||||
Command := 'RUN DELETE_RECORD_FROM_SQL ':Quote(Table):', ':Quote(ID):', 1, ':Quote(Key)
|
||||
Case ID[-6, 6] EQ "*WRITE"
|
||||
ID[-6, 6] = ""
|
||||
Command := 'RUN COPY_RECORD_TO_SQL ':Quote(Table):', ':Quote(ID):', 1, ':Quote(Key)
|
||||
End Case
|
||||
end else
|
||||
TempKey = Key
|
||||
TempKey[1, Len(Table) + 1] = ''
|
||||
TransTime = TempKey[-1, 'B*']
|
||||
TempKey[-1, Neg(Len(TransTime) + 1)] = ''
|
||||
TransDate = TempKey[-1, 'B*']
|
||||
TempKey[-1, Neg(Len(TransDate) + 1)] = ''
|
||||
Process = TempKey[-1, 'B*']
|
||||
TempKey[-1, Neg(Len(Process) + 1)] = ''
|
||||
ID = TempKey
|
||||
If Process EQ 'DELETE' then
|
||||
Command := 'RUN DELETE_RECORD_FROM_SQL ':Quote(Table):', ':Quote(ID):', 1, ':Quote(Key)
|
||||
end else
|
||||
Command := 'RUN COPY_RECORD_TO_SQL ':Quote(Table):', ':Quote(ID):', 1, ':Quote(Key)
|
||||
end
|
||||
end
|
||||
|
||||
TcpClientHandle = 0
|
||||
ServerIP = Environment_Services('GetApplicationRootIP')
|
||||
ServerPort = Environment_Services('GetScrapeServerPort')
|
||||
Convert '\' to '' in ServerIP
|
||||
|
||||
If SRP_TcpClient(TcpClientHandle, 'CONNECT', ServerIP, ServerPort) then
|
||||
SRP_TcpClient(TcpClientHandle, 'SEND_SES', Command)
|
||||
SRP_TcpClient(TcpClientHandle, 'CLOSE_SES')
|
||||
end
|
||||
|
||||
end else
|
||||
// This transaction most likely won't replicate due to a schema mapping issue or because
|
||||
// the record no longer exists in OI, so remove this transaction from the pending queue.
|
||||
Delete pTable, Key else null
|
||||
end
|
||||
end
|
||||
|
||||
Repeat
|
||||
end
|
||||
Unlock hSysLists, 'Copy_Pending_Records_To_SQL' else Null
|
||||
end
|
||||
end
|
||||
|
||||
Return
|
||||
|
||||
|
||||
|
Reference in New Issue
Block a user