Claus on Code

A data dudes random musings about code, bugs, products, life etc.


How to dynamically copy specific tables from source systems with Azure data factory

When you are working on a data project, you often have a list of tables you want to copy to your DWH database or staging database. With Azure data factory(ADF), this is easy to set up(Kind of). In this blog post, I will show you how. The result will end up looking something like the image aove.

The scenario. I have a source system called WideWorldImportersand a DWH database called DWH_DEV. After talking with the business and the developers, I have a list of tables I want to copy to my DWH. 

In my test scenario, I have the databases on the same database server as shown below, but in real life, that is not the case, but the development process is the same. 

All my work I will do in the DWH_DEV database. First, I want a schema to the sourcesystem. Like so.

CREATE SCHEMA WideWorldImporters

Then I want a table with the tables I want to copy, and information about where to copy them. Like this

CREATE TABLE [Framework].[TablesToBeCopied](
	[SourceSystem] [NVARCHAR](250) NULL,		--The sourcesystem. We can have multiple
	[SourceSchema] [NVARCHAR](250) NULL,		--The Source schema, if it has one
	[SourceTable] [NVARCHAR](250) NULL,			--The Source table 
	[DestinationSchema] [NVARCHAR](250) NULL,	--The Destination Schema in your DWH
	[DestinationTable] [NVARCHAR](250) NULL		--The Destination table in your DWH
)

I want to add the 2 tables [Sales].[Invoices] and [Sales].[Customers]. So my TablesToBeCopied table will look like this: 

When I do a bulk load from different source systems, I want to drop the tables and recreate them again when I copy them. The reason for that is, that I don’t want to worry about remapping columns, if the change datatypes or are renamed. This is maybe not what you need, and then you can just skip this step. But I always create a stored procedure for that, like so

-- =============================================
-- Author:		CKC, DataWise
-- Create date: 20230719
-- Description:	Drop all tables in a schema. 
--				Primarly used for extract tables
-- Parameters
--				@Schema - The name of the schema we want to truncate. 
--				It can not be the dbo schema. 
-- Change history
-- 
--				20230719 CKC, DataWise
--					First version

BEGIN
    /*Declare required variables*/
    DECLARE @SQLStatement NVARCHAR(MAX);

    IF @Schema = 'dbo'
    BEGIN
        RAISERROR('This procedure should not be used on the schema dbo', 16, 1);
    END;

    IF @Schema <> 'dbo'
    BEGIN
        /* BUILD TABLE TRUNCATION SQL*/
        SELECT @SQLStatement =
        (
            SELECT STUFF(
                   (
                       SELECT '; ' + 'Drop TABLE ' + [SchemaName] + '.' + [TableName] AS [text()]
                       FROM
                       (
                           SELECT SCHEMA_NAME(schema_id) AS [SchemaName],
                                  name AS [TableName]
                           FROM sys.tables
                           WHERE schema_id = SCHEMA_ID(@Schema)
                       ) x
                       FOR XML PATH('')
                   ),
                   1,
                   1,
                   ''
                        ) + ';'
        );
        /*EXECUTE TRUNCATION SQL*/
        EXEC sp_executesql @stmt = @SQLStatement;
    END;
END;
GO

Now we are ready to setup ADF

We create a new pipeline and name it something meaningful, like pipelineCopyDynamicFromWorldWide

We start by adding a script task to the pipeline, that will drop the table. We add this SQL to it

EXEC [Framework].[dropTablesBySchema]
		@Schema = N'WideWorldImporters'

After that, we add a lookup task, to get the tables we want to copy.

In the lookup task we add this SQL

SELECT [SourceSystem]
      ,[SourceSchema]
      ,[SourceTable]
      ,[DestinationSchema]
      ,[DestinationTable]
  FROM [DWH_DEV].[Framework].[TablesToBeCopied]
  WHERE SourceSystem='WideWorldImporters'

The pipeline now look like this 

We now want to iterate over the tables with the forreach iteratior. 

We drag it in, and connect to it. We click the settings, and click add dynamic content like below

Then click the get tables to copy like below

Now we want to add an activity in our foreach loop. To do that, click the add activity plus sign, and select the copy data activity like so

Then we setup the copy data activity. First we add the source. Start by connection to a table in your world wide importers database. When you are done, go to the dataset that is created. Click on the parameter tab, and add 2 like this

Then click the connection again, and click the edit button, and dynamiccontent and add the table and schema like so

Do the same with the destination dataset. 

Go to the copy activity again, and setup the source and the sink. The setup should look like this: 

Where you dynamically set the schema and table properties like shown. 

Do the same with the sink(Destination, but use the destination names from your SQL instead). 

Remember to clik the auto create table 

We can now run the pipeline. We should get a result like so

And you should see the table in your DWH like this

As with most in ADF, setting up is a little complicated. But when you are up and running, it is easy to add the table you want in your daily copy job, and it is very easy to maintain. 

You add multiple source systems in your table with the tables you want to copy. I normally create a pipeline for each source system. 

But this is it. I hope you can use it 🙂



Leave a Reply

Your email address will not be published. Required fields are marked *