Migrate data from an SQL Server table to another, without locking it
Here is the scenario:
- you have two tables: a source of recent data and an SQL Server destination table with the same structure that has to be updated with that data
- they are both on the same server (we assume that if the data source is external you have copied it into a local temporary table)
- the destination table is in active use
- the source destination may be in active use also
- the tables are large, locking them and waiting for the merge to finish is not an option
- the difference between the tables is usually small. Not so small as to be irrelevant, but small relative to the size of the tables
- we don't care about full data consistency (any intermediate state between the current and the updated one is acceptable)
There are some quirks in SQL Server that affect our task, mainly that if you try to modify 5000 rows or more of a table, then the locking will escalate from row locks to a table lock, turning it unusable during the merge. Also, for operations involving a lot of data, the log of the database will increase to accommodate that size, then the changes will be persisted and the log invalidated. This takes a lot of disk space and if anything happens during the operation, the entire operation will be rolled back, with all of the data in the log having to be restored, which also takes a lot of time, blocking the database.
The first idea is to find rows that are different, new or missing and just sync 5000 of them at a time. One can do this with a MERGE
or classic INSERT
, UPDATE
, DELETE
operations. And this works, but it has a major flaw: after the first 5000 rows have been found and synchronized, the next operation will go through them again anyway to find the next 5000 rows, and again, and again, and again. The execution time will increase exponentially with every new batch of rows.
What we actually need is to take a list of changes and apply them in batches, kind of like an artificial transaction log. The following stored procedure will take two parameters: the full schema name of the source table and the full schema name of the destination table. It will create a log of changes, take 4900 of them at a time and apply them to the destination table. The only restrictions are that the destination table has to have primary key columns and the source and destination tables have the same columns. For performance reasons, it's best that the source table also has the same primary keys or at least an index on the same columns. The usage would look like EXEC usp_MergeTables 'SomeSchemaIncludingDbo.SourceTable','MaybeSomeOtherSchema.DestinationTable'
I will explain what it does after the code:
-- this is for batched merge (del, upd, ins) of the data of a table into another (nolock/readpast for reduced locking of source and destination tables)
CREATE OR ALTER PROC usp_MergeTables(@SourceTable NVARCHAR(256),@DestinationTable NVARCHAR(256))
AS
BEGIN
SET NOCOUNT ON
IF NOT EXISTS(SELECT *
FROM sys.schemas s
INNER JOIN sys.tables t
ON s.schema_id=t.schema_id
WHERE REPLACE(REPLACE(@SourceTable,']',''),'[','')=s.name+'.'+t.name)
BEGIN
DECLARE @Err1 VARCHAR(100) = 'Source table '+@SourceTable+' not found!'
;THROW 50404,@Err1,1
END
SELECT CAST(s.name as NVARCHAR(Max)) as schemaName,CAST(t.name as NVARCHAR(Max)) as tableName,CAST(c.name as NVARCHAR(Max)) as columnName,c.is_computed,c.is_identity,
CASE
WHEN tp.name IN ( 'varchar', 'char', 'varbinary' ) THEN tp.name +
CASE WHEN c.max_length = -1 THEN '(max)'
ELSE '(' + CAST(c.max_length AS VARCHAR(4)) + ')' END
--types that have an unicode character type that requires length to be halved
WHEN tp.name IN ( 'nvarchar', 'nchar' ) THEN tp.name +
CASE WHEN c.max_length = -1 THEN '(max)'
ELSE '(' + CAST(c.max_length / 2 AS VARCHAR(4)) + ')'
END
--types with a datetime precision
WHEN tp.name IN ( 'time', 'datetime2', 'datetimeoffset' ) THEN tp.name +
'(' + CAST(c.scale AS VARCHAR(4)) + ')'
--types with a precision/scale
WHEN tp.name IN ( 'numeric', 'decimal' )
THEN tp.name + '(' + CAST(c.precision AS VARCHAR(4)) + ',' +
CAST(c.scale AS VARCHAR(4)) + ')'
--timestamp should be reported as rowversion
WHEN tp.name = 'timestamp' THEN 'rowversion'
--and the rest. Note, float is declared with a bit length, but is
--represented as either float or real in types
ELSE tp.name
END as typeName,
ISNULL(( SELECT pk.is_primary_key FROM sys.indexes pk
INNER JOIN sys.index_columns ic
ON ic.object_id = pk.object_id
AND ic.index_id = pk.index_id
AND c.column_id=ic.column_id
WHERE t.object_id = pk.object_id
AND pk.is_primary_key = 1
),0) as is_primary_key
INTO #tgtColumns
FROM sys.schemas s
INNER JOIN sys.tables t
ON s.schema_id=t.schema_id
INNER JOIN sys.all_columns c
ON t.object_id=c.object_id
INNER JOIN sys.types tp
ON c.system_type_id=tp.system_type_id
AND c.user_type_id=tp.user_type_id
WHERE REPLACE(REPLACE(@DestinationTable,']',''),'[','')=s.name+'.'+t.name
IF NOT EXISTS(SELECT * FROM #tgtColumns)
BEGIN
DECLARE @Err2 VARCHAR(100) = 'Destination table '+@DestinationTable+' not found!'
;THROW 50404,@Err2,2
END
IF NOT EXISTS(SELECT * FROM #tgtColumns WHERE is_primary_key=1)
BEGIN
DECLARE @Err3 VARCHAR(100) = 'Destination table '+@DestinationTable+' has no primary keys!'
;THROW 50404,@Err3,3
END
DECLARE @operSql NVARCHAR(Max)
DECLARE @delSql NVARCHAR(Max)
DECLARE @updSql NVARCHAR(Max)
DECLARE @insSql NVARCHAR(Max)
DECLARE @identityInsertOn NVARCHAR(Max)=''
DECLARE @identityInsertOff NVARCHAR(Max)=''
IF EXISTS(SELECT * FROM #tgtColumns WHERE is_identity=1)
BEGIN
SET @identityInsertOn=CONCAT(N'
SET IDENTITY_INSERT ',@DestinationTable,N' ON
')
SET @identityInsertOff=CONCAT(N'
SET IDENTITY_INSERT ',@DestinationTable,N' OFF
')
END
SELECT @operSql = CONCAT(N'DROP TABLE IF EXISTS #oper
SELECT *
INTO #oper
FROM (
SELECT ',STRING_AGG(CONCAT(N'ISNULL(src.[',c.columnName,N'],tgt.[',c.columnName,N']) as [',c.columnName,N']'),N', '),N',
CASE
WHEN ',STRING_AGG(CONCAT(N'src.[',c.columnName,N'] IS NULL'),N' AND '),N' THEN ''DEL''
WHEN ',STRING_AGG(CONCAT(N'tgt.[',c.columnName,N'] IS NULL'),N' AND '),N' THEN ''INS''
WHEN (
SELECT * FROM ',@SourceTable,N' R
WHERE ',STRING_AGG(CONCAT('R.[',c.columnName,N'] = src.[',c.columnName,N']'),N' AND '),N'
FOR XML PATH(''Row''), ELEMENTS XSINIL
) <> (
SELECT * FROM ',@DestinationTable,N' R
WHERE ',STRING_AGG(CONCAT('R.[',c.columnName,N'] = tgt.[',c.columnName,N']'),N' AND '),N'
FOR XML PATH(''Row''), ELEMENTS XSINIL
) THEN ''UPD''
END as __oper__
FROM ',@SourceTable,N' src (NOLOCK)
FULL OUTER JOIN ',@DestinationTable,N' tgt (NOLOCK)
ON ',STRING_AGG(CONCAT('src.[',c.columnName,N'] = tgt.[',c.columnName,N']'),N' AND '),'
) x
WHERE __oper__ IS NOT NULL
CREATE INDEX temp_id ON #oper(',STRING_AGG(CONCAT('[',c.columnName,N']'),N', '),N')
CREATE INDEX temp_oper ON #oper(__oper__)')
FROM #tgtColumns c
WHERE c.is_primary_key=1
SELECT @delSql = CONCAT(N'
DECLARE @batch TABLE(',STRING_AGG(CONCAT('[',c.columnName,N'] ',c.typeName),N', '),N',
PRIMARY KEY(',STRING_AGG(CONCAT('[',c.columnName,N']'),N', '),N'))
DECLARE @ROWS INT = 1
WHILE (@ROWS>0)
BEGIN
DELETE TOP (4900) tgt
OUTPUT ',STRING_AGG(CONCAT('deleted.[',c.columnName,N']'),N', '),N'
INTO @batch
FROM ',@DestinationTable,N' tgt (READPAST)
INNER JOIN #oper src
ON ',STRING_AGG(CONCAT('src.[',c.columnName,N'] = tgt.[',c.columnName,N']'),N' AND '),N'
WHERE src.__oper__=''DEL''
SET @ROWS=@@ROWCOUNT
DELETE o
FROM #oper o
INNER JOIN @batch b
ON ',STRING_AGG(CONCAT('o.[',c.columnName,N'] = b.[',c.columnName,N']'),N' AND '),N'
DELETE FROM @batch
IF (@ROWS=0)
SELECT @ROWS=COUNT(*) FROM #oper WHERE __oper__=''DEL''
END')
FROM #tgtColumns c
WHERE c.is_primary_key=1
SELECT @updSql = CONCAT(N'
SET @ROWS = 1
WHILE (@ROWS>0)
BEGIN
UPDATE tgt
SET ',(SELECT STRING_AGG(CONCAT('[',c.columnName,N'] = src.[',c.columnName,N']'),N', ')
FROM #tgtColumns c
WHERE c.is_primary_key=0 AND c.is_computed=0),N' OUTPUT ',STRING_AGG(CONCAT('inserted.[',c.columnName,N']'),N', '),N'
INTO @batch
FROM ',@DestinationTable,N' tgt (READPAST)
INNER JOIN (
SELECT TOP (4900) s.*
FROM #oper o
INNER JOIN ',@SourceTable,N' s
ON ',STRING_AGG(CONCAT('s.[',c.columnName,N'] = o.[',c.columnName,N']'),N' AND '),N'
WHERE __oper__=''UPD''
) src
ON ',STRING_AGG(CONCAT('src.[',c.columnName,N'] = tgt.[',c.columnName,N']'),N' AND '),N'
SET @ROWS=@@ROWCOUNT
DELETE o
FROM #oper o
INNER JOIN @batch b
ON ',STRING_AGG(CONCAT('o.[',c.columnName,N'] = b.[',c.columnName,N']'),N' AND '),N'
DELETE FROM @batch
IF (@ROWS=0)
SELECT @ROWS=COUNT(*) FROM #oper WHERE __oper__=''UPD''
END')
FROM #tgtColumns c
WHERE c.is_primary_key=1
SELECT @insSql = CONCAT(N'
SET @ROWS = 1
WHILE (@ROWS>0)
BEGIN
INSERT INTO ',@DestinationTable,N'(',(SELECT STRING_AGG(CONCAT('[',c.columnName,N']'),N', ')
FROM #tgtColumns c
WHERE c.is_computed=0),N') OUTPUT ',STRING_AGG(CONCAT('inserted.[',c.columnName,N']'),N', '),N'
INTO @batch
SELECT TOP (4900) ',(SELECT STRING_AGG(CONCAT('s.[',c.columnName,N']'),N', ')
FROM #tgtColumns c
WHERE c.is_computed=0),N'
FROM #oper o
INNER JOIN ',@SourceTable,N' s
ON ',STRING_AGG(CONCAT('s.[',c.columnName,N'] = o.[',c.columnName,N']'),N' AND '),N'
WHERE __oper__=''INS''
SET @ROWS=@@ROWCOUNT
DELETE o
FROM #oper o
INNER JOIN @batch b
ON ',STRING_AGG(CONCAT('o.[',c.columnName,N'] = b.[',c.columnName,N']'),N' AND '),N'
DELETE FROM @batch
IF (@ROWS=0)
SELECT @ROWS=COUNT(*) FROM #oper WHERE __oper__=''INS''
END
DROP TABLE #oper
')
FROM #tgtColumns c
WHERE c.is_primary_key=1
DROP TABLE #tgtColumns
--PRINT @operSql
--PRINT @delSql
--PRINT @updSql
--PRINT @identityInsertOn
--PRINT @identityInsertOff
--PRINT @insSql
DECLARE @sql NVARCHAR(Max) = CONCAT(@operSql, @delSql, @identityInserton, @updSql, @insSql, @identityInsertOff)
EXEC sp_sqlexec @sql
END
OK, this is a large thing, but the principles used are simple:
- first we create a table containing information about the columns of the tables: schema, table, column name, whether the column is primary key or computed, and the type name. The schema and table name are not used, but may be useful for debugging. Note that this SP doesn't check the tables have the same number and type of columns. That's on you to ensure.
- using the column information we create four strings that will contain the SQL for the following operations:
- create an "operations table"
- delete rows that are not needed
- update rows that need to be updated
- insert rows that are missing
- there are four strings mostly for debugging purposes to keep them smaller than the 8000 characters that Microsoft SQL Server Management Studio can print at a time, but they are concatenated and executed as one.
- implementation details:
- we use
FOR XML PATH('Row'), ELEMENTS XSINIL
to generate a string with all the data in each row, so we don't have to compare rows column by column. We could have made this work with comparisons, but the code would have been bulky and ugly when comparing for NULL or for having values.ELEMENTS XSINIL
will ensure that there is a difference between empty space and NULL. - a
FULL OUTER JOIN
is used to find (based on the primary key columns of the destination table) if rows need to be either deleted, updated or inserted. That operation is specified in the__oper__
column - the operations table is thus created, containing the primary key values and the operation required with two indexes: one on the primary keys and one on the operation. These indexes are not really that relevant, so one could choose to remove them.
- a @batch table variable is used with a
PRIMARY KEY
on the primary key columns of the destination table, for performance reasons - the batching is done via a
DELETE... OUTPUT
operation. We delete the 4900 rows we process and we output them in the @batch table - for each segment we either: delete destination rows with the same primary keys for 'DEL', update destination rows with the same primary keys for 'UPD' and insert source rows with the same primary keys for 'INS'
- before we update, we set IDENTITY_INSERT to ON and at the end to OFF, if any identity columns
- if some rows were affected by the operation, then we continue in the same segment. If not, then we look in the operations table to see if there are still rows to be processed. No rows may be affected while there are still rows to be processed due to the locking avoidance hints
- we use
- improvements to avoid locking:
- when we generate the operations table we use
NOLOCK
, which reads uncommitted values ignoring locks. This may not be what you want, but if the source table is locked for whatever reason, this ensures the merge operation is not blocked - when we process the batches we use
READPAST
, which ignores locked destination rows. This ensures that rows that can be updated will be, while the others can be done later, meaning that if you have 1 locked row, the merge operation will go around it, then wait until it is unlocked to update or delete it.
- when we generate the operations table we use
If you want to see what the generated SQL looks like, uncomment the PRINT
lines and comment the EXEC
one.
Now, I just wrote this stored procedure, so I may have missed some cases. Let me know if you find a situation where this doesn't work as expected.
Hope it helps!
Comments
Be the first to post a comment