Generating model.json as a part of SSIS export of legacy data into ADLS

Introduction

In the previous article, we have seen how to export all data from all or selected tables only into ADLS, using SSIS control flow. In this article, we will extend the example with model.json export, so that we can later easily consume the data using Power BI Dataflows

Adding a new script task to generate model.json

First, add a new Script Task to the control flow:
Control flow with model_json

You will need to introduce a new variable EXPORT_PATH_ADLS of type string:
Control flow with model_json introducing new variable

and set it to contain a path prefix of the blob URL + directory path. In the new Export model_json script task editor, set the read only variables to User::DB_CONNECTION, User::EXPORT_PATH, User::EXPORT_PATH_ADLS, User::TABLE_NAMES:
Control flow with export model_json task editor

After clicking on the Edit script, replace the Main method with the following C# code to generate model.json in the same directory as the .csv files:

{
    // Export all the columns from the table, but skips the BLOBs
    object rawDBConnection = null;
    string dbConnection = null;
    try
    {

        // Retrieve variables
        if (!Dts.Variables.Contains("TABLE_NAMES"))
        {
            Dts.Events.FireError(18, "TABLE_NAMES", "Variable missing", "", 0);
            throw new Exception("Variable TABLE_NAMES is missing.");
        }
        if (!Dts.Variables.Contains("EXPORT_PATH"))
        {
            Dts.Events.FireError(18, "EXPORT_PATH", "Variable missing", "", 0);
            throw new Exception("Variable EXPORT_PATH is missing.");
        }
        if (!Dts.Variables.Contains("EXPORT_PATH_ADLS"))
        {
            Dts.Events.FireError(18, "EXPORT_PATH_ADLS", "Variable missing", "", 0);
            throw new Exception("Variable EXPORT_PATH_ADLS is missing.");
        }
        if (!Dts.Variables.Contains("DB_CONNECTION"))
        {
            Dts.Events.FireError(18, "DB_CONNECTION", "Variable missing", "", 0);
            throw new Exception("Variable DB_CONNECTION is missing.");
        }
        var tableNames = (DataSet)Dts.Variables["TABLE_NAMES"].Value;
        string exportPathADLS = (string)Dts.Variables["EXPORT_PATH_ADLS"].Value;
        string exportPath = (string)Dts.Variables["EXPORT_PATH"].Value;
        dbConnection = (string)Dts.Variables["DB_CONNECTION"].Value;

        //Retrieve DB connection
        rawDBConnection = Dts.Connections[dbConnection].AcquireConnection(Dts.Transaction);
        SqlConnection sqlConnection = (SqlConnection)rawDBConnection;

        using (var sw = new StreamWriter(Path.Combine(exportPath, "model.json"), false) { NewLine = "\n" })
        {
            sw.Write("{\n");
            sw.Write("  \"application\": \"Dynamics AX 2009\",\n");
            sw.Write("  \"name\": \"Archive\",\n");
            sw.Write("  \"description\": \"Archive\",\n");
            sw.Write("  \"version\": \"1.0\",\n");
            sw.Write("  \"culture\": \"en-US\",\n");
            sw.Write("  \"modifiedTime\": \"" + DateTime.UtcNow.ToString("U") + "\",\n");
            sw.Write("  \"annotations\": [\n");
            sw.Write("    {\n");
            sw.Write("      \"name\": \"D365FO:ApplicationBuildVersion\",\n");
            sw.Write("      \"value\": \"10.0.319.10005\"\n");
            sw.Write("    },\n");
            sw.Write("    {\n");
            sw.Write("      \"name\": \"D365FO:ModelName\",\n");
            sw.Write("      \"value\": \"Archive\"\n");
            sw.Write("    },\n");
            sw.Write("    {\n");
            sw.Write("      \"name\": \"D365FO:ModelPublisher\",\n");
            sw.Write("      \"value\": \"Demo\"\n");
            sw.Write("    }\n");
            sw.Write("  ],\n");
            sw.Write("  \"entities\": [\n");

            StringBuilder sb = new StringBuilder();
            foreach (DataTable dataTable in tableNames.Tables)
            {
                foreach (DataRow dataRow in dataTable.Rows)
                {
                    string currentTableName = (string)dataRow.ItemArray[0];
                    if (!String.IsNullOrWhiteSpace(currentTableName))
                    {
                        sb.Append("    {\n");
                        sb.Append("      \"$type\": \"LocalEntity\",\n");
                        sb.Append("      \"name\": \"" + currentTableName + "\",\n");
                        sb.Append("      \"description\": \"" + currentTableName + "\",\n");
                        sb.Append("      \"attributes\": [\n");

                        var columnInfo = GetColumnInfo(sqlConnection, currentTableName);
                        string[] columnNames = GetColumnNames(columnInfo);
                        string[] columnTypes = GetColumnTypes(columnInfo);

                        for (int i = 0; i < columnNames.Length; i++)
                        {
                            string currentColumnTypeJSON = "";
                            switch (columnTypes[i])
                            {
                                case "int":
                                    currentColumnTypeJSON = "int64";
                                    break;
                                case "datetime":
                                    currentColumnTypeJSON = "dateTime";
                                    break;
                                case "numeric":
                                    currentColumnTypeJSON = "double";
                                    break;
                                case "uniqueidentifier":
                                    currentColumnTypeJSON = "string";
                                    break;
                                case "bigint":
                                    currentColumnTypeJSON = "int64";
                                    break;
                                case "ntext":
                                    currentColumnTypeJSON = "string";
                                    break;
                                case "nvarchar":
                                    currentColumnTypeJSON = "string";
                                    break;
                                default:
                                    currentColumnTypeJSON = "string";
                                    break;

                            }
                            sb.Append("        {\n");
                            sb.Append("          \"name\": \"" + columnNames[i] + "\",\n");
                            sb.Append("          \"description\": \"" + columnNames[i] + "\",\n");
                            sb.Append("          \"dataType\": \"" + currentColumnTypeJSON + "\"\n");
                            if (i < (columnNames.Length - 1))
                                sb.Append("        },\n");
                            else
                                sb.Append("        }\n");
                        }
                        sb.Append("      ],\n");
                        sb.Append("      \"partitions\": [\n");
                        sb.Append("        {\n");
                        sb.Append("          \"name\": \"" + currentTableName + "\",\n");
                        sb.Append("          \"location\": \"" + exportPathADLS.TrimEnd('/') + "/" + currentTableName + ".csv\"\n");
                        sb.Append("        }\n");
                        sb.Append("      ]\n");
                        sb.Append("    },\n");
                    }
                }
            }
            if (sb.Length > 3) //remove the last ",\n"
                sb.Remove(sb.Length - 2, 2);
            sw.Write(sb.ToString());

            sw.Write("\n");
            sw.Write("  ],\n");
            sw.Write("  \"relationships\": [\n");
            sw.Write("  ]\n");
            sw.Write("}\n");

        }
        Dts.Connections[dbConnection].ReleaseConnection(rawDBConnection);

        Dts.TaskResult = (int)ScriptResults.Success;
    }
    catch (Exception e)
    {
        try
        {
            if (rawDBConnection != null)
                Dts.Connections[dbConnection].ReleaseConnection(rawDBConnection);
        }
        catch { }
        Dts.Log(e.Message, (int)ScriptResults.Failure, null);
        Dts.TaskResult = (int)ScriptResults.Failure;
    }
}

private List<Tuple<string, string>> GetColumnInfo(SqlConnection sqlConnection, string tableName)
{
    var retVal = new List<Tuple<string, string>>();
    using (var sqlCommand = new SqlCommand("SELECT COLUMN_NAME,DATA_TYPE FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME='" + tableName + "' AND COLUMN_NAME NOT LIKE 'DEL_%'", sqlConnection))
    {
        using (var reader = sqlCommand.ExecuteReader())
        {
            while (reader.Read())
            {
                var columnName = reader.GetString(0);
                var columnType = reader.GetString(1);
                if (!String.Equals(columnType, "image", StringComparison.InvariantCultureIgnoreCase))
                    retVal.Add(new Tuple<string, string>(columnName, columnType));
            }
        }
    }
    return retVal;
}
private string[] GetColumnNames(List<Tuple<string, string>> columns)
{
    var retVal = new List();
    foreach (var column in columns)
    {
        retVal.Add(column.Item1);
    }
    return retVal.ToArray();
}
private string[] GetColumnTypes(List<Tuple<string, string>> columns)
{
    var retVal = new List();
    foreach (var column in columns)
    {
        retVal.Add(column.Item2);
    }
    return retVal.ToArray();
}

Running the script, you should see something similar to the following screen:

Control flow with model_json result in storage explorer

Summary

With a small extension to the SSIS control flow designed in the previous article, we made a CDM (Common Data Model) out of our data lake based archive. This will allow the data analysts to use these data easily in power bi dashboard, by connecting to ADLS using Power BI Dataflows.

Exporting legacy ERP data into Azure Data Lake Storage (ADLS gen2) using SSIS

Introduction

When implementing or upgrading to Dynamics 365 Finance and Operations (or any other ERP system), we need to decide what to do with the legacy data – be it a collection of tables from information system databases replaced by the new ERP or excel files used for ad-hoc process management. Ideally, we only want to import a bare minimum of master data and balances into the new ERP, as additional disk space consumption is almost always associated with extra costs and slower performance. At the same time, legacy data often can’t be simply deleted. For the next few years, users will still need to be able to use legacy data to as an input for various dashboards, reports or machine learning pipelines. In this article we will explore how to use the Azure Blob Upload Task from Azure Feature Pack for Integration Services (SSIS) to upload the old SQL data into ADLS gen2 storage. We are going to use an SSIS project with only four tasks to achieve it:

  • Execute SQL Task to retrieve names of the tables to export
  • Foreach Loop Container to loop over the retrieved table names
  • Script Task to export each table into a directory defined in a user variable
  • Azure Blob Upload Task to upload the generated .csv files into ADLS gen2 blob container

As an example of a legacy ERP system, we will be using Microsoft Dynamics AX 2009, which is still widely used by many customers.

Prerequisites

Creating and running an integration project

  • Start Visual Studio 2017 (SSDT)
  • Click on File->New->Project, select the Integration Services (Azure enabled) project, give it a meaningful name and click on enterCreate SSIS Project
  • You should see a screen similar to this:Empty SSIS Project
  • Right mouse click on the Connection Managers pane at the bottom of the screen, select New ADO.NET Connection choose .Net providers\SqlClient Data provider, enter your database connection information and click OKConfigure ADO Net Connection
  • To prepare variables we will need later on to exchange data between each tasks, select the pane Variables and click on New button to create the variables TABLE_NAMES (Object), CURRENT_TABLE_NAME (String), DB_CONNECTION (String) and EXPORT_PATH (String). My database connection’s name is DynamicsAX2009.AXDB and the temporary file store path is C:\Temp\tmpssis; yours are likely to be differentVariables in SSIS Project
  • Drag and drop Execute SQL Task from the SSIS Toolbox on the left into the control flow, and double click the task to open the edit dialog. Name the task to GetAllAX2009Tables, select the connection DynamicsAX2009.AXDB created previously and set the ResultSet property to Full result set. Modify the select statement to retrieve tables containing relevant data from the database schema by editing the SQL Statement propertyExecute SQL Task Dialogand entering a similar SQL statement:
    SELECT TABLE_NAME
      FROM INFORMATION_SCHEMA.TABLES
     WHERE TABLES.TABLE_TYPE='BASE TABLE'
       AND TABLES.TABLE_NAME NOT LIKE('USER%')
       AND TABLES.TABLE_NAME NOT LIKE('DEL_%')
       AND TABLES.TABLE_NAME NOT LIKE('SYS%')
       AND TABLES.TABLE_NAME NOT LIKE('%TMP')
       AND TABLES.TABLE_NAME NOT LIKE('BATCH%')
       AND TABLES.TABLE_NAME NOT LIKE('SQL%')
       AND TABLES.TABLE_NAME NOT LIKE('XREF%')
       AND TABLES.TABLE_NAME NOT LIKE('EVENT%')
       AND TABLES.TABLE_NAME NOT LIKE('PRINT%')
       AND TABLES.TABLE_NAME NOT LIKE('AIF%')
       AND TABLES.TABLE_NAME NOT LIKE('STAGING%')
       AND TABLES.TABLE_NAME NOT LIKE('WORKFLOW%')
       AND TABLES.TABLE_NAME NOT LIKE '%PARM%'
  • Map the Result Set to the TABLE_NAMES variable by clicking the Add buttonExecute SQL Task ResultSet Dialog and click OK
  • Drag and drop Foreach Loop Container below the GetAllAXTables Task and connect the arrow from the SQL task to this Foreach Loop Container  For each loop container
  • Double click the Foreach Loop Container, rename it to GetAllAXTables and on Collection tab, set Enumerator to Foreach ADO Enumerator and ADO Source object variable to User::TABLE_NAMESFor each loop container collection
  • On Variable mappings tab, add a mapping to the User::CURRENT_TABLE_NAME variable and press OKExportingLegacyDataIntoADLS_For_each_loop_container_variable_mapping
  • Drag and drop a script task inside the Foreach Loop Container and rename it to Export CURRENT_TABLE_NAME. Your screen should look like this:Script task created
  • Double click the Export CURRENT_TABLE_NAME task; make sure that Microsoft Visual C# 2017 is selected in the ScriptLanguage property and User::CURRENT_TABLE_NAME, User::DB_CONNECTION and User::EXPORT_PATH  are selected in ReadOnlyVariables propertyScript Task Editor
  • Click on Edit script. This will open a new Visual Studio with a new C# project and ScriptMain.cs file / method Main opened: ScriptTask VS Editor
  • This code is going to export the table in CURRENT_TABLE_NAME variable into the EXPORT_PATH directory using SqlConnection and StreamWriter classes. To do that, replace the Main method and add a few helper methods:ScriptTask VS EditorMain Method
    public void Main()
    {
        // Export all the columns from the table, but skips the BLOBs
        object rawDBConnection = null;
        string dbConnection = null;
        try
        {
            // Retrieve variables
            if (!Dts.Variables.Contains("CURRENT_TABLE_NAME"))
            {
                Dts.Events.FireError(18, "CURRENT_TABLE_NAME", "Variable missing", "", 0);
                throw new Exception("Variable CURRENT_TABLE_NAME is missing.");
            }
            if (!Dts.Variables.Contains("EXPORT_PATH"))
            {
                Dts.Events.FireError(18, "EXPORT_PATH", "Variable missing", "", 0);
                throw new Exception("Variable EXPORT_PATH is missing.");
            }
            if (!Dts.Variables.Contains("DB_CONNECTION"))
            {
                Dts.Events.FireError(18, "DB_CONNECTION", "Variable missing", "", 0);
                throw new Exception("Variable DB_CONNECTION is missing.");
            }
    
            string currentTableName = (string)Dts.Variables["CURRENT_TABLE_NAME"].Value;
            string exportPath = (string)Dts.Variables["EXPORT_PATH"].Value;
            dbConnection = (string)Dts.Variables["DB_CONNECTION"].Value;
    
            //Retrieve DB connection
            rawDBConnection = Dts.Connections[dbConnection].AcquireConnection(Dts.Transaction);
            SqlConnection sqlConnection = (SqlConnection)rawDBConnection;
    
            var columnInfo = GetColumnInfo(sqlConnection, currentTableName);
            string columns = GetColumns(columnInfo);
            string[] columnTypes = GetColumnTypes(columnInfo);
            var sb = new StringBuilder();
    
            if (!String.IsNullOrWhiteSpace(columns))
            {
                using (var sw = new StreamWriter(Path.Combine(exportPath, currentTableName) + ".csv", false) { NewLine = "\n" })
                {
                    //when using in a CDM folder with model.json, skip the header sw.WriteLine(columns);
                    using (var sqlCommand = new SqlCommand(String.Format("SELECT {0} FROM {1}", columns, currentTableName), sqlConnection))
                    {
                        using (var reader = sqlCommand.ExecuteReader())
                        {
                            while (reader.Read())
                            {
                                sb.Clear();
                                for (int i = 0; i < columnTypes.Length; i++) 
                                {
                                    if (reader.IsDBNull(i))
                                    {
                                        sb.Append(",");
                                        continue;
                                    }
                                    switch (columnTypes[i]) 
                                    { 
                                        case "nvarchar": 
                                            sb.Append(",\""); 
                                            string strVal = reader.GetString(i); 
                                            if (strVal == null) 
                                                strVal = ""; 
                                            if (strVal.Contains("\"")) 
                                                strVal = strVal.Replace("\"", "\"\""); 
                                            sb.Append(strVal).Append("\""); 
                                            break; 
                                        case "int": 
                                            sb.Append(",").Append(reader.GetInt32(i)); 
                                            break; 
                                        case "bigint": 
                                            sb.Append(",").Append(reader.GetInt64(i)); 
                                            break; 
                                        case "datetime":  
                                           sb.Append(",").Append(reader.GetDateTime(i).ToString("u").Replace(" ", "T")); 
                                           break;
                                        case "numeric": 
                                            sb.Append(",").Append(reader.GetDecimal(i).ToString(CultureInfo.InvariantCulture)); 
                                            break;
                                        case "uniqueidentifier": 
                                            sb.Append(",").Append(reader.GetGuid(i).ToString("D")); 
                                            break; 
                                        case "ntext": 
                                            sb.Append(",\""); 
                                            string txtVal = reader.GetString(i); 
                                            if (txtVal == null) 
                                                txtVal = ""; 
                                            if (txtVal.Contains("\"")) 
                                                txtVal = txtVal.Replace("\"", "\"\""); 
                                            sb.Append(txtVal).Append("\""); 
                                            break; 
                                        default: 
                                            sb.Append(","); 
                                            break; 
                                    } 
                                } 
                                if (sb.Length > 0)
                                {
                                    sb.Remove(0, 1);
                                    sw.WriteLine(sb.ToString());
                                }
                            }
                        }
                    }
                }
            }
    
            Dts.Connections[dbConnection].ReleaseConnection(rawDBConnection);
    
            //MessageBox.Show(currentTableName);
    
            Dts.TaskResult = (int)ScriptResults.Success;
        }
        catch (Exception e)
        {
            try
            {
                if (rawDBConnection != null)
                    Dts.Connections[dbConnection].ReleaseConnection(rawDBConnection);
            }
            catch { }
            Dts.Log(e.Message, (int)ScriptResults.Failure, null);
            Dts.TaskResult = (int)ScriptResults.Failure;
        }
    }
    
    private List<Tuple<string, string>> GetColumnInfo(SqlConnection sqlConnection, string tableName)
    {
        var retVal = new List<Tuple<string, string>>();
        using (var sqlCommand = new SqlCommand("SELECT COLUMN_NAME,DATA_TYPE FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME='" + tableName + "' AND COLUMN_NAME NOT LIKE 'DEL_%'", sqlConnection))
        {
            using (var reader = sqlCommand.ExecuteReader())
            {
                while (reader.Read())
                {
                    var columnName = reader.GetString(0);
                    var columnType = reader.GetString(1);
                    if (!String.Equals(columnType, "image", StringComparison.InvariantCultureIgnoreCase))
                        retVal.Add(new Tuple<string, string>(columnName, columnType));
                }
            }
        }
        return retVal;
    }
    private string GetColumns(List<Tuple<string, string>> columns)
    {
        string retVal = "";
        foreach (var column in columns)
        {
            retVal += "," + column.Item1;
        }
        if (retVal.Length > 0)
            retVal = retVal.Substring(1);
        return retVal;
    }
    private string[] GetColumnTypes(List<Tuple<string, string>> columns)
    {
        var retVal = new List();
        foreach (var column in columns)
        {
            retVal.Add(column.Item2);
        }
        return retVal.ToArray();
    }
    
  • If needed, tweak the code to modify and enhance exported data, for example skipping the ModifiedTransactionId column or adding virtual columns _Year, _Month and _Day for every column of the type DateTime. In a later article, we will modify the code to convert enum values (integers) to human readable strings.
  • You will also need to extend the using block to avoid compilation errors:
    using System;
    using System.Collections.Generic;
    using System.Data.SqlClient;
    using System.Globalization;
    using System.IO;
    using System.Text;
  • Press save, close this Visual studio window and press OK and press OK once more on the Script Task Editor properties
  • Click on Save all button
  • To test that the export is working, click on run and you should see a screen like this: Export test run
  • And finally in the export directory, there will be CSV exports of all database tables retrieved by the first select statements:Export test run results
  • To upload the data into the azure blob container in ADLS gen 2, drag and drop the Azure Blob Upload Task on the Control Flow view and attach an arrow from the Foreach TABLE_NAMES Loop task to it:ControlFlow after adding blob upload task
  • Right click into the Connection manager pane and choose Add connection manager, select Blob storage service and fill in connection details: Adding Azure Storage Connection Manager Make sure that the storage account being selected here has Hierarchical Namespace enabled for later consumption from Power BI or Auto ML
  • Press OK, rename the newly created SSIS Connection Manager to AzureStorage
  • Double click on Azure Blob Upload Task and set AzureStorageConnection, BlobContainer, BlobDirectory and LocalDirectory properties. Azure blob upload task_editor The Blob Container must exist in the Azure Storage Account and the Blob Directory has to be created within this Blob container. See notes at the end of this article on how to do that. At minimum, check the availability of both Blob Container and Blob Directory using the Azure Storage Explorer:ExportingLegacyDataIntoADLS_Azure_storage_explorer
  • Last task remaining is to run run the package by clicking on Start buttonFlow control final run
  • After the package run has finished, check the Azure Storage Explorer to see the generated files Azure storage explorer viewing results

Summary

In this article, we have seen how to export legacy data into ADLS gen2. This example is using an SQL database, containing Dynamics AX 2009 Demo data, but the same approach can be used for any other legacy information system and any database with an ADO.NET connectivity. This is just the begin of the journey. Now that the data resides in ADLS gen 2, they can be consumed by Power BI dashboards, Auto ML, Azure Data Factory and other workloads.

Notes 1 – Creating an ADLS gen2 storage account

To create an ADLS gen2 storage acount, follow these steps:

  • Log in into https://portal.azure.com
  • Click on Storage accounts or search for the Storage accounts page in the search field
  • Click on Add
  • Select your subscription and the resource group (or create a new one)
  • Fill in the Name, Location, Account kind (StorageV2) and the rest of the parameters Azure Portal Creating ADLS
  • Follow the wizard and on the Advanced page, enable the Hierarchical namespace Azure Portal Creating ADLS HNS
  • Click next, review and create the storage account
  • In the Azure Storage explorer, locate your subscription and the newly create storage account
  • After selecting the storage account, right click on Blob containers and create a new one
  • Set the access permissions on the container for users and services, who will be reading the data. Typical groups would include analysts and Power BI service. It is important to perform this action now, as later changes will not be automatically reflected in files and directories created before such changes took place.
  • In this container, create a destination directory for our data export. You should see a configuration similar to thisAzure storage explorer viewing results

Notes 2 – Possible enhancements

  • This article does not cover the conversion of Dynamics AX enum values (integers) into human readable strings
  • Auto generation of model.json is also not covered. model.json makes the exported data consumption from PowerBI much easier
  • It also does not cover a creation of virtual single field keys – by creating a virtual column, such as DataAreaId_CustAccount in SalesTable or DataAreaId_AccountNum in CustTable. This can be achieved by creating a setup table populating this information with a Dynamics AX 2009 X++ Job and modifying the Script Task
  • There is, of course, room for performance enhancements – by making the export + upload processing run in parallel for example