Data Factory or Integrated Pipelines under Synapse Analytics suite can be very useful as an extracting and orchestrating tool. It is a common scenario when we extract data from the source system and save it in a dedicated landing zone located in Azure Data Lake Storage Gen 2. The only question that can appear is what kind of file format should we use to store this data. We have a few options here but one of the most common choices will be PARQUET which has some great benefits like columnar storage, very good compression, etc. It comes also with some limitations that we have to deal with. One of those limitations is connected to the column names that cannot contain special characters or spaces in them. If we will have those prohibited names in columns we will end up with a very common error message:
Operation on target Copy data1 failed: ErrorCode=ParquetInvalidColumnName,'Type=Microsoft.DataTransfer.Common.Shared.HybridDeliveryException,Message=The column name is invalid. Column name cannot contain these character:[,;{}()\n\t=],Source=Microsoft.DataTransfer.Common,'
So how to deal with that? It depends on the source that we have – in this article I will show you how to overcome such errors when our source is Azure SQL or any other TSQL data source.
First of all, I will show you two Linked Services that I configured:
I configured a connection to Azure SQL Database and Azure Data Lake Storage in a standard way. I used System Managed Identity assigned to my instance of data factory to authenticate to the database (db_owner role) and storage (blob data contributor). Based on those two linked services I created two datasets.
The first one is for Azure SQL – as you can see I also added two parameters, one for schema and another one for table – they are not necessary for this demo but regardless of that I added them:
Dataset for Data Lake Storage and parquet file is also simple because it has just a pointer to the data container, raw folder and then I added expression to dynamically build hierarchy of folder based on schema and name of a table:
@concat( dataset().paramSchema ,'/' ,dataset().paramTable ,'/',dataset().paramTable ,'.parquet' )
The above configuration should result in data placed in the following path data/raw/schema/table/table.parquet
Before we will create a pipeline for our data extraction we have to build a query that will extract the needed data. In my Azure SQL I have World Wide Importers DW database – to extract a list of tables that are interesting for me I used sys.tables and filtered this system table only for Fact and Dimension schemas:
SELECT OBJECT_SCHEMA_NAME(object_id) as schemaName ,OBJECT_NAME(object_id) as tableName FROM sys.tables WHERE OBJECT_SCHEMA_NAME(object_id) in ('Fact','Dimension')
When we will execute the above query we should see the expected result:
Now we are sure that everything works as expected so we can add a new pipeline and lookup activity and paste the query we built above:
After execution we can check the output of this activity to know how to reference it in further steps:
So now we have a list of objects to extract. Now we have to build a query string that will select all columns, add aliases without prohibited characters and skip those that don’t have compliant types. This query can look like this:
DECLARE @Query NVARCHAR(MAX) SELECT @Query = CONCAT_WS( ' ' ,'SELECT' ,STRING_AGG('['+name+'] AS '+'['+REPLACE(name,' ','')+']',',') ,'FROM' ,'['+OBJECT_SCHEMA_NAME(object_id)+'].['+OBJECT_NAME(object_id)+']' ) FROM sys.columns WHERE OBJECT_NAME(object_id) = 'City' AND OBJECT_SCHEMA_NAME(object_id) = 'Dimension' AND system_type_id IN ( SELECT system_type_id FROM sys.types as t WHERE name not in ('hierarchyid','geometry' ,'varbinary','geography') ) GROUP BY OBJECT_SCHEMA_NAME(object_id) ,OBJECT_NAME(object_id) SELECT @Query
When I executed it for the sample table I received the following SELECT statement and it looks pretty ok:
SELECT [City Key] AS [CityKey] ,[WWI City ID] AS [WWICityID] ,[City] AS [City] ,[State Province] AS [StateProvince] ,[Country] AS [Country] ,[Continent] AS [Continent] ,[Sales Territory] AS [SalesTerritory] ,[Region] AS [Region] ,[Subregion] AS [Subregion] ,[Latest Recorded Population] AS [LatestRecordedPopulation] ,[Valid From] AS [ValidFrom] ,[Valid To] AS [ValidTo] ,[Lineage Key] AS [LineageKey] FROM [Dimension].[City]
Our query is ready so we can wrap it up into a stored procedure and execute it for testing purposes:
CREATE PROC dbo.spGetDataOfSpecificObject @paramSchemaName NVARCHAR(100), @paramTableName NVARCHAR(100) AS DECLARE @Query NVARCHAR(MAX) SELECT @Query = CONCAT_WS( ' ' ,'SELECT' ,STRING_AGG('['+name+'] AS'+'['+REPLACE(name,' ','')+']',',') ,'FROM' ,'['+OBJECT_SCHEMA_NAME(object_id)+'].['+OBJECT_NAME(object_id)+']' ) FROM sys.columns WHERE OBJECT_NAME(object_id) = @paramTableName AND OBJECT_SCHEMA_NAME(object_id) = @paramSchemaName AND system_type_id IN ( SELECT system_type_id FROM sys.types as t WHERE name not in ('hierarchyid','geometry' ,'varbinary','geography') ) GROUP BY OBJECT_SCHEMA_NAME(object_id) ,OBJECT_NAME(object_id) EXEC sp_executesql @Query
dbo.spGetDataOfSpecificObject @paramSchemaName= 'Dimension' ,@paramTableName = 'City'
The result is also correct so we can move forward:
Let’s go back to our pipeline and add Foreach that will loop over the list of objects returned by lookup activity ( I configured it to be sequential but it can be also executed in parallel):
After that, I added Copy activity to the Foreach loop. The source is configured to execute the procedure presented above that will return data with adjusted column names:
The sink is our Data Lake Storage:
Everything is ready to go so I executed it and the result is visible in the below screenshots.
Everything works as expected. As you can see it is pretty simple to extract data dynamically with ADF/Integrated Pipelines but a few additional steps are needed. Of course, it is not the only way to achieve the final result – you can achieve similar in many different ways but this is the method that was useful for me. I hope you enjoy the article.
- Avoiding Issues: Monitoring Query Pushdowns in Databricks Federated Queries - October 27, 2024
- Microsoft Fabric: Using Workspace Identity for Authentication - September 25, 2024
- Executing SQL queries from Azure DevOps using Service Connection credentials - August 28, 2024
I did exactly the same, when I facet that kimś of limitation last time 🙂