Generating model.json as a part of SSIS export of legacy data into ADLS

Introduction

In the previous article, we have seen how to export all data from all or selected tables only into ADLS, using SSIS control flow. In this article, we will extend the example with model.json export, so that we can later easily consume the data using Power BI Dataflows

Adding a new script task to generate model.json

First, add a new Script Task to the control flow:
Control flow with model_json

You will need to introduce a new variable EXPORT_PATH_ADLS of type string:
Control flow with model_json introducing new variable

and set it to contain a path prefix of the blob URL + directory path. In the new Export model_json script task editor, set the read only variables to User::DB_CONNECTION, User::EXPORT_PATH, User::EXPORT_PATH_ADLS, User::TABLE_NAMES:
Control flow with export model_json task editor

After clicking on the Edit script, replace the Main method with the following C# code to generate model.json in the same directory as the .csv files:

{
    // Export all the columns from the table, but skips the BLOBs
    object rawDBConnection = null;
    string dbConnection = null;
    try
    {

        // Retrieve variables
        if (!Dts.Variables.Contains("TABLE_NAMES"))
        {
            Dts.Events.FireError(18, "TABLE_NAMES", "Variable missing", "", 0);
            throw new Exception("Variable TABLE_NAMES is missing.");
        }
        if (!Dts.Variables.Contains("EXPORT_PATH"))
        {
            Dts.Events.FireError(18, "EXPORT_PATH", "Variable missing", "", 0);
            throw new Exception("Variable EXPORT_PATH is missing.");
        }
        if (!Dts.Variables.Contains("EXPORT_PATH_ADLS"))
        {
            Dts.Events.FireError(18, "EXPORT_PATH_ADLS", "Variable missing", "", 0);
            throw new Exception("Variable EXPORT_PATH_ADLS is missing.");
        }
        if (!Dts.Variables.Contains("DB_CONNECTION"))
        {
            Dts.Events.FireError(18, "DB_CONNECTION", "Variable missing", "", 0);
            throw new Exception("Variable DB_CONNECTION is missing.");
        }
        var tableNames = (DataSet)Dts.Variables["TABLE_NAMES"].Value;
        string exportPathADLS = (string)Dts.Variables["EXPORT_PATH_ADLS"].Value;
        string exportPath = (string)Dts.Variables["EXPORT_PATH"].Value;
        dbConnection = (string)Dts.Variables["DB_CONNECTION"].Value;

        //Retrieve DB connection
        rawDBConnection = Dts.Connections[dbConnection].AcquireConnection(Dts.Transaction);
        SqlConnection sqlConnection = (SqlConnection)rawDBConnection;

        using (var sw = new StreamWriter(Path.Combine(exportPath, "model.json"), false) { NewLine = "\n" })
        {
            sw.Write("{\n");
            sw.Write("  \"application\": \"Dynamics AX 2009\",\n");
            sw.Write("  \"name\": \"Archive\",\n");
            sw.Write("  \"description\": \"Archive\",\n");
            sw.Write("  \"version\": \"1.0\",\n");
            sw.Write("  \"culture\": \"en-US\",\n");
            sw.Write("  \"modifiedTime\": \"" + DateTime.UtcNow.ToString("U") + "\",\n");
            sw.Write("  \"annotations\": [\n");
            sw.Write("    {\n");
            sw.Write("      \"name\": \"D365FO:ApplicationBuildVersion\",\n");
            sw.Write("      \"value\": \"10.0.319.10005\"\n");
            sw.Write("    },\n");
            sw.Write("    {\n");
            sw.Write("      \"name\": \"D365FO:ModelName\",\n");
            sw.Write("      \"value\": \"Archive\"\n");
            sw.Write("    },\n");
            sw.Write("    {\n");
            sw.Write("      \"name\": \"D365FO:ModelPublisher\",\n");
            sw.Write("      \"value\": \"Demo\"\n");
            sw.Write("    }\n");
            sw.Write("  ],\n");
            sw.Write("  \"entities\": [\n");

            StringBuilder sb = new StringBuilder();
            foreach (DataTable dataTable in tableNames.Tables)
            {
                foreach (DataRow dataRow in dataTable.Rows)
                {
                    string currentTableName = (string)dataRow.ItemArray[0];
                    if (!String.IsNullOrWhiteSpace(currentTableName))
                    {
                        sb.Append("    {\n");
                        sb.Append("      \"$type\": \"LocalEntity\",\n");
                        sb.Append("      \"name\": \"" + currentTableName + "\",\n");
                        sb.Append("      \"description\": \"" + currentTableName + "\",\n");
                        sb.Append("      \"attributes\": [\n");

                        var columnInfo = GetColumnInfo(sqlConnection, currentTableName);
                        string[] columnNames = GetColumnNames(columnInfo);
                        string[] columnTypes = GetColumnTypes(columnInfo);

                        for (int i = 0; i < columnNames.Length; i++)
                        {
                            string currentColumnTypeJSON = "";
                            switch (columnTypes[i])
                            {
                                case "int":
                                    currentColumnTypeJSON = "int64";
                                    break;
                                case "datetime":
                                    currentColumnTypeJSON = "dateTime";
                                    break;
                                case "numeric":
                                    currentColumnTypeJSON = "double";
                                    break;
                                case "uniqueidentifier":
                                    currentColumnTypeJSON = "string";
                                    break;
                                case "bigint":
                                    currentColumnTypeJSON = "int64";
                                    break;
                                case "ntext":
                                    currentColumnTypeJSON = "string";
                                    break;
                                case "nvarchar":
                                    currentColumnTypeJSON = "string";
                                    break;
                                default:
                                    currentColumnTypeJSON = "string";
                                    break;

                            }
                            sb.Append("        {\n");
                            sb.Append("          \"name\": \"" + columnNames[i] + "\",\n");
                            sb.Append("          \"description\": \"" + columnNames[i] + "\",\n");
                            sb.Append("          \"dataType\": \"" + currentColumnTypeJSON + "\"\n");
                            if (i < (columnNames.Length - 1))
                                sb.Append("        },\n");
                            else
                                sb.Append("        }\n");
                        }
                        sb.Append("      ],\n");
                        sb.Append("      \"partitions\": [\n");
                        sb.Append("        {\n");
                        sb.Append("          \"name\": \"" + currentTableName + "\",\n");
                        sb.Append("          \"location\": \"" + exportPathADLS.TrimEnd('/') + "/" + currentTableName + ".csv\"\n");
                        sb.Append("        }\n");
                        sb.Append("      ]\n");
                        sb.Append("    },\n");
                    }
                }
            }
            if (sb.Length > 3) //remove the last ",\n"
                sb.Remove(sb.Length - 2, 2);
            sw.Write(sb.ToString());

            sw.Write("\n");
            sw.Write("  ],\n");
            sw.Write("  \"relationships\": [\n");
            sw.Write("  ]\n");
            sw.Write("}\n");

        }
        Dts.Connections[dbConnection].ReleaseConnection(rawDBConnection);

        Dts.TaskResult = (int)ScriptResults.Success;
    }
    catch (Exception e)
    {
        try
        {
            if (rawDBConnection != null)
                Dts.Connections[dbConnection].ReleaseConnection(rawDBConnection);
        }
        catch { }
        Dts.Log(e.Message, (int)ScriptResults.Failure, null);
        Dts.TaskResult = (int)ScriptResults.Failure;
    }
}

private List<Tuple<string, string>> GetColumnInfo(SqlConnection sqlConnection, string tableName)
{
    var retVal = new List<Tuple<string, string>>();
    using (var sqlCommand = new SqlCommand("SELECT COLUMN_NAME,DATA_TYPE FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME='" + tableName + "' AND COLUMN_NAME NOT LIKE 'DEL_%'", sqlConnection))
    {
        using (var reader = sqlCommand.ExecuteReader())
        {
            while (reader.Read())
            {
                var columnName = reader.GetString(0);
                var columnType = reader.GetString(1);
                if (!String.Equals(columnType, "image", StringComparison.InvariantCultureIgnoreCase))
                    retVal.Add(new Tuple<string, string>(columnName, columnType));
            }
        }
    }
    return retVal;
}
private string[] GetColumnNames(List<Tuple<string, string>> columns)
{
    var retVal = new List();
    foreach (var column in columns)
    {
        retVal.Add(column.Item1);
    }
    return retVal.ToArray();
}
private string[] GetColumnTypes(List<Tuple<string, string>> columns)
{
    var retVal = new List();
    foreach (var column in columns)
    {
        retVal.Add(column.Item2);
    }
    return retVal.ToArray();
}

Running the script, you should see something similar to the following screen:

Control flow with model_json result in storage explorer

Summary

With a small extension to the SSIS control flow designed in the previous article, we made a CDM (Common Data Model) out of our data lake based archive. This will allow the data analysts to use these data easily in power bi dashboard, by connecting to ADLS using Power BI Dataflows.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s