As I work with Azure Data Factory (ADF) and help others in the community more and more I encounter some confusion that seems to exist surrounding how to construct a complete dependency driven ADF solution. One that chains multiple executions and handles all of your requirements. In this post I hope to address some of that confusion and will allude to some emerging best practices for Azure Data Factory usage.
First a few simple questions:
- Why is there confusion? In my opinion this is because the ADF copy wizard available via the Azure portal doesn’t help you architect a complete solution. It can be handy to reverse certain things, but really the wizard tells you nothing about the choices you make and what the JSON behind it is doing. Like most wizards, it just leads to bad practices!
- Do I need several data factory services for different business functions? No, you don’t have to. Pipelines within a single data factory service can be disconnected for different processes and often having all your linked services in one place is easier to manage. Plus a single factor offers reusability and means I single set of source code etc.
- Do I need one pipeline per activity. No, you can house many activities in a single pipeline. Pipelines are just logic containers to assist you when managing data orchestration tasks. If you want an SSIS comparison, think of them as sequence containers. In a factory I may group all my on premises gateway uploads into a single pipeline. This means I can pause that stream of uploads on demand. Maybe when the gateway keys needs to be refreshed etc.
- Is the whole data factory a pipeline? Yes, in concept. But for technical terminology a pipeline is a specific ADF component. The marketing people do love to confuse us!
- Can an activity support multiple inputs and multiple outputs? Generally yes. But there are exceptions depending on the activity type. U-SQL calls to Azure Data Lake can have multiples of both. ADF doesn’t care as long as you know what the called service is doing. On the other hand a copy activity needs to be one to one (so Microsoft can charge more for data movements).
- Does an activity have to have an input dataset? No. For example, you can create a custom activity that executes your code for a defined time slice without an input dataset, just the output.
Datasets
Moving on, lets go a little deeper and think about a scenario that I use in my community talks. We have an on premises CSV file. We want to upload it. Clean it and aggregate the output. For each stage of this process we need to define a dataset for Azure Data Factory to use.
To be clear, a dataset in this context is not the actual data. It is just a set of JSON instructions that defines where and how our data is stored. For example, its file path, its extension, its structure, its relationship to the executing time slice.
Lets define each of the datasets we need in ADF to complete the above scenario for just 1 file:
- The on premises version of the file. Linked to information about the data management gateway to be used, with local credentials and file server/path where it can be accessed.
- A raw Azure version of the file. Linked to information about the data lake storage folder to be used for landing the uploaded file.
- A clean version of the file. Linked to information about the output directory of the cleaning process.
- The aggregated output file. Linked to information about the output directory of the query being used to do the aggregation.
All of the linked information to these datasets should come from your ADF linked services.
So, we have 1 file to process, but in ADF we now need 4 datasets defined for each stage of the data flow. These datasets don’t need to be complex, something as simple as the following bit of JSON will do.
{
"name": "LkpsCurrencyDataLakeOut",
"properties": {
"type": "AzureDataLakeStore",
"linkedServiceName": "DataLakeStore",
"structure": [ ],
"typeProperties": {
"folderPath": "Out",
"fileName": "Dim.Currency.csv"
},
"availability": {
"frequency": "Day",
"interval": 1
}
}
}
Activities
Next, our activities. Now the datasets are defined above we need ADF to invoke the services that are going to do the work for each stage. As follows:
Activity (JSON Value) | Task Description | Input Dataset | Output Dataset |
Copy | Upload file from local storage to Data Lake storage. | 1 | 2 |
DotNetActivity | Perform transformation/cleaning on raw source file. | 2 | 3 |
DataLakeAnalyticsU-SQL | Aggregate the datasets to produce a reporting output. | 3 | 4 |
From the above table we can clearly see the output dataset of the first activity becomes the input of the second. The output dataset of the second activity becomes the input of the third. Apologies if this seems obvious, but I have know it to confuse people.
Pipelines
For our ADF pipeline(s) we can now make some decisions about how we want to manage the data flow.
- Add all the activities to a single pipeline meaning we can stop/start everything for this 1 dataset end to end.
- Add each activity to a different pipeline dependant on its type. This is my starting preference.
- Have the on premises upload in one pipeline and everything else in a second pipeline.
- Maybe separate your pipelines and data flows depending on the type of data. Eg. Fact/dimension. Finance and HR.
The point here, is that it doesn’t matter to ADF, it’s just down to how you want to control it. When I created the pipelines for my talk demo I went with option 2. Meaning I get the following pretty diagram, arranged to fit the width of my blog 🙂
Here we can clearly see at the top level each dataset flowing into a pipeline and its child activity. If I’m constructed this using option 1 above I would simply see the first dataset and the fourth with 1 pipeline box. I could then drill into the pipeline to see the chain activities within. A repeat, this doesn’t matter to ADF.
I hope you found the above useful and a good starting point for constructing your ADF data flows.
Best Practices
As our understanding of Azure Data Factory matures I’m sure some of the following points will need to be re-written, but for now I’m happy to go first and start laying the ground work of what I consider to be best for ADF usage. Comments very welcome.
- Resist using the wizard, please.
- Keep everything within a single ADF service if you can. Meaning linked services can be reused.
- Disconnect your on premises uploads using a single pipeline. For ease of management.
- Group your activities into natural pipeline containers for the operation type or data category.
- Layout your ADF diagram carefully. Left to right. It makes understanding it much easier for others.
- Use Visual Studio configuration files to deploy ADF projects between Dev/Test/Live. Ease of source control and development.
- Monitor activity concurrency and time outs carefully. ADF will kill called service executions if breached.
- Be mindful of activity cost and group inputs/outputs for data compute where possible.
- Use time slices to control your data volumes. Eg. Pass the time slice as a parameter to the called compute service.
What next? Well, I’m currently working on this beast…
- 127x datasets.
- 71x activities.
- 9x pipelines.
… and I’ve got about another third left to build!
Many thanks for reading.
what is the execution order of activities if Pipeline have multiple .net custom activity.
Hi Pawan, thanks for your comment. This really depends on the time slices provisioned for each of your activities. Cheers Paul
How do we manage dependencies between pipelines? Say pipeline1 and pipeline2 run in parallel and pipeline3 needs to run immediately after pipeline1 and pipeline2 are complete, how can we achieve this using ADF v1.0?
Hi Lakshmi, thanks for your comment. You need to deal with this at the dataset level. It is datasets that are dependency driven, not pipelines. In “pipeline 3” (as you describe it) your activity simply needs to have both inputs included, these inputs will be the outputs of your first two pipelines. Apologies if the concepts and terminology here don’t make explaining it very easy. But its certainly possible. Cheers Paul
Thanks Paul, the concept is much clearer.
Hey Paul, if I have 3 activities in one pipeline that are sequentially dependent, is there a way to downstream cascade re-running of activities 2 & 3 if I re-run activity 1?
Hi Tony, the PowerShell cmdlet to override the time slice status would be the simplest way. It has the option to “cascade” (as you describe it) from the defined dataset. Cheers
Hi Paul,
Thanks for such a nice article.
I am having custom activity as my first pipeline which has only one dummy output dataset and then I want to U-SQL scripts after my first pipeline. Can you guide me how we can address this?
Thanks
Nutan Patel
Hey Paul. Thanks for speaking at PASS Summit. It was a good session although I have moved on to ADF v2.
Having a heck of a time finding JSON property documentation for V2. MS should have a page that just lists each object, and each JSON property available in the object, and what the allowable values are for those properties.
Just trying to figure out what supports a parameters node or not, for instance, as I’m attempting to genericize a dataset definition by passing a table name parameter to it.
Hi Luis, thanks for the comment and thank you for coming to my session at PASS. I feel your pain for ADFv2. It is one of the reasons I’ve resisted using it so far. Until its GA and with a reasonable developer environment its going to be hard. Are you using .Net to develop or PowerShell currently?
I started with PowerShell. Using PowerShell is pretty easy as the AzureRm commands are actually pretty well documented. It’s just the JSON that is not so trying different things to see if something actually works 🙂
I managed to create a “generic” dataset that I can pass a table parameter to. It actually works:
{
“name”: “Dataset_AzureSqlTable_Generic”,
“properties”:
{
“type”: “AzureSqlTable”,
“typeProperties”:
{
“tableName”:
{
“value”: “@dataset().TableName”,
“type”: “Expression”
}
},
“linkedServiceName”:
{
“referenceName”: “LinkedService_AzureSqlDatabase”,
“type”: “LinkedServiceReference”
},
“parameters”:
{
“TableName”:
{
“type”: “String”
}
}
}
}
I use a Lookup activity in my master pipeline to get a list of tables I need to work on by calling that and passing an “ArchiveTables” parameter which is a table in the database that has the table list. That works great.
The next activity in my master pipeline is an ExecutePipeLine activity, which references the values returned from the first lookup activity.
{
“name”: “Pipeline_TriggerArchiveTables”,
“description”: “Uses the LookupArchiveTables activity to get the tables to archive and passes that list down to the ArchiveTables pipeline.”,
“properties”:
{
“activities”:
[
{
“name”: “LookupArchiveTables”,
“description”: “Reads all rows from the ArchiveTables table that have the Enabled column set to 1.”,
“type”: “Lookup”,
“typeProperties”:
{
“source”:
{
“type”: “SqlSource”,
“sqlReaderQuery”: “SELECT TableName, DateTimeColumnName FROM ArchiveTables WHERE Enabled = 1 ORDER BY TableName”
},
“dataset”:
{
“referenceName”: “Dataset_AzureSqlTable_Generic”,
“type”: “DatasetReference”,
“parameters”:
{
“TableName”:
{
“value”: “ArchiveTables”,
“type”: “Expression”
}
}
},
“firstRowOnly”: false
}
},
{
“name”: “ExecuteArchiveTables”,
“type”: “ExecutePipeLine”,
“typeProperties”:
{
“pipeline”:
{
“referenceName”: “Pipeline_ArchiveTables”,
“type”: “PipelineReference”
},
“parameters”:
{
“TableList”:
{
“value”: “@{activity(‘LookupArchiveTables’).output.value}”,
“type”: “Expression”
}
},
“waitOnCompletion”: true
},
“dependsOn”:
[
{
“activity”: “LookupArchiveTables”,
“dependencyConditions”: [“Succeeded”]
}
]
}
]
}
}
That seems to all work as well. The pipeline “Pipeline_ArchiveTables” has a foreach activity that iterates through the table list. I followed the online examples from MS to the letter. I confirmed in the log that the table and column names were being properly passed into the TableList parameter so I know it’s all good up to this pipeline. I kept it simple for troubleshooting by simply inserting the same lookup activity I know worked earlier to see if the foreach works, but it consistently fails with a 400 error. It complains about the inner activity, but I know that activity works. Just been staring at it wondering what the problem is. It seems rather cut and dry to me. It should do one iteration since there is only one table in the TableList parameter, and the lookup activity should be executed.
{
“errorCode”: “400”,
“message”: “Activity failed because an inner activity failed”,
“failureType”: “UserError”,
“target”: “IterateArchiveTables”
}
{
“name”: “Pipeline_ArchiveTables”,
“properties”:
{
“activities”:
[
{
“name”: “IterateArchiveTables”,
“description”: “Executes in parallel the ArchiveTable pipeline for each table passed to it from the TriggerArchiveTables pipeline.”,
“type”: “ForEach”,
“typeProperties”:
{
“isSequential”: false,
“items”:
{
“value”: “@pipeline().parameters.TableList”,
“type”: “Expression”
},
“activities”:
[
{
“name”: “LookupArchiveTables”,
“description”: “Reads all rows from the ArchiveTables table that have the Enabled column set to 1.”,
“type”: “Lookup”,
“typeProperties”:
{
“source”:
{
“type”: “SqlSource”,
“sqlReaderQuery”: “SELECT TableName, DateTimeColumnName FROM ArchiveTables WHERE Enabled = 1 ORDER BY TableName”
},
“dataset”:
{
“referenceName”: “Dataset_AzureSqlTable_Generic”,
“type”: “DatasetReference”,
“parameters”:
{
“TableName”:
{
“value”: “ArchiveTables”,
“type”: “Expression”
}
}
},
“firstRowOnly”: false
}
}
]
}
}
],
“parameters”:
{
“TableList”:
{
“type”: “Array”
}
}
}
}