Using MegaSparkDiff To Test ETLs At Scale

Authors: Ahmed Ibrahim and Matthew Gillett


Do you have a need to test ETL (Extract Transform Load) operations at scale? Comparing the success of data transformations against actual outputs can be tricky; you are essentially comparing apples to oranges. More specifically, you need to transform your inputs (apples) to expected oranges and then compare them to the actual oranges.

In real life this means that an SDET is testing the ETL process implemented by developers by means of a parallel implementation that applies the same transformation and then compares the outputs from the QA transformation to the actual outputs from the development code base. Enter MegaSparkDiff, FINRA’s open source Spark-based data comparison tool.

The problem statement


Let's take for example an ETL that:

  1. Reads input table appliance from an H2 database
  2. Transforms column NAME by splitting the text on the “,” character:
    1. The first portion represents the appliance description and is stored in output column name after capitalization
    2. The second portion is the appliance brand and is stored in output column brand after capitalization
  3. Calculates a new column called units_sold via a round up integer division of the input columns SALES_AMOUNT and PRICE
  4. Replaces the TYPE column with its corresponding ID value from the reference data table appliance_type stored in a Postgres database
  5. Stores the result in table appliance in a Postgres database

Next, let’s test the ETL process via a parallel implementation that can operate at scale but is not too costly in terms of effort needed to implement the logic. Here is where MegaSparkDiff truly shines.

The tables are described as follows:

The source table appliance is defined in an H2 database as follows: The target table appliance is defined in a Postgres database as follows: The reference data table appliance_type is defined in a Postgres database as follows:

CREATE TABLE APPLIANCE
(
NAME        VARCHAR(50),
TYPE        VARCHAR(20),
SALES_AMOUNT     DECIMAL(10,2),
PRICE       DECIMAL(10,2),
DATE_ADDED  DATE
);
              
            

  CREATE TABLE appliance
(
   name        varchar(25),
   brand       varchar(25),
   type        bigint,
   units_sold  integer,
   price       numeric(10,2),
   date_added  date
);


CREATE TABLE appliance_type
(
   type_id    bigint,
   type_name  varchar(20)
);
              

The source data in the appliance table in the H2 database that we will transform:

NAME TYPE SALES_AMOUNT PRICE DATE_ADDED
some refrigerator,some brand 1 refrigerator 1000.00 250.00 2017-06-01
some washer,some brand 2 washer 5000.00 500.00 2017-01-17
some dryer,some brand 3 dryer 2500.00 500.00 2017-04-23

The data in the target appliance table in the Postgres database:

name brand type units_sold price date_added
some refrigerator some brand 1 1 4 250.00 2017-06-01
some washer some brand 2 2 10 500.00 2017-01-17
some dryer some brand 3 3 5 500.00 2017-04-23

The data in the reference data table appliance_type in the Postgres database:

type_id type_name
1 refrigerator
2 washer
3 dryer

Spark UDFs and MegaSparkDiff


The execution plan of the QA process is simple: (a) conduct a parallel QA ETL and (b) compare our QA outputs to the actual outputs from the development code base. Our algorithm can be summarized as follows:

  1. Initialize our data Environment
    1. Initialize Spark Factory
    2. Acquire source data appliance from H2 database and lower case the column names
    3. Acquire target data appliance from Postgres database
    4. Acquire reference data table appliance_type from Postgres database

  2. Split the column NAME into name having capitalized values before the “,” and the column brand having the capitalized values after the “,”. To do this, we will:
    1. Register UDF split_name that will take the data from NAME before the “,” character and capitalize the result.
    2. Register UDF split_brand that will take the data from NAME after the “,” character and capitalize the result.
    3. Apply the UDF split_name to the dataframe while assigning the output of the UDF to a new column name_temp
    4. Apply the UDF split_brand to the dataframe while assigning the output of the UDF to a new column brand
    5. Drop the column called split_brandname containing the original full value, of which is no longer needed
    6. Rename the column name_temp to name

  3. Create a new column called units_sold based on a rounding up of an integer division of SALES_AMOUNT and PRICE
    1. Register a UDF calculate_units_sold that divides the columns SALES_AMOUNT and PRICE and returns a rounded-up integer
    2. Apply the UDF to the dataframe while assigning the output of the UDF to the new column units_sold

  4. Convert the values in the column TYPE from its text form into its ID value via a join operation with the reference data table appliance_type that exists in the Postgres database:
    1. Since the type_name column in the reference data table is capitalized, we register UDF capitalize_type that will capitalize all the data in column TYPE and apply the UDF
    2. Join the source table appliance to the reference data table appliance_type using the column data TYPE equal to column data type_name
    3. Drop the column called type from the dataframe while selecting the type_id column from the reference data.
    4. Rename the column type_id to type

  5. Compare our QA outputs dataframe to the actual outputs from the development ETL

The Detailed Implementation


  1. Initialize our data Environment
    1. Initialize Spark Factory
    2.                   
      SparkFactory.initializeSparkLocalMode("local[*]", "WARN", "1");
                        
                      
    3. Acquire source data appliance from H2 database and lower case the column names
    4.                   
      // Parallelize the source data
      AppleTable leftTable = SparkFactory
          .parallelizeJDBCSource("org.h2.Driver",
              H2Database.getUrl(),
              H2Database.getProperties().getProperty("user"),
              H2Database.getProperties().getProperty("password"),
              "(select * from appliance) a", "appliance_left");
      
      Dataset leftTableTransformDF = leftTable.getDataFrame();
      
      // Lower case all the columns in the source dataframe.
      for (String column : leftTableTransformDF.columns()) {
        leftTableTransformDF = leftTableTransformDF.withColumnRenamed(column, column.toLowerCase());
      }
                      
                    
    5. Acquire target data appliance from Postgres database
    6.                 
      // Parallelize the target data
      AppleTable rightTable = SparkFactory
          .parallelizeJDBCSource("org.postgresql.Driver",
              PostgresDatabase.getUrl(),
              PostgresDatabase.getProperties().getProperty("user"),
              PostgresDatabase.getProperties().getProperty("password"),
              "(select * from appliance) a", "appliance_right");
                        
                      
    7. Acquire reference data table appliance_type from Postgres database
    8.                 
      // Parallelize the reference data
          AppleTable typeTable = SparkFactory
              .parallelizeJDBCSource("org.postgresql.Driver",
                  PostgresDatabase.getUrl(),
                  PostgresDatabase.getProperties().getProperty("user"),
                  PostgresDatabase.getProperties().getProperty("password"),
                  "(select * from appliance_type) a", "appliance_type");
      
                        
                      

  2. Split the column NAME into name having capitalized values before the “,” and the column brand having the capitalized values after the “,”. To do this, we will:
    1. Register UDF split_name that will take the data from NAME before the “,” character and capitalize the result.
    2. Register UDF split_brand that will take the data from NAME after the “,” character and capitalize the result.
    3.                   
      // Handle the source's "NAME" column transformation/split to the target's "name" and "brand"
      // columns.
      // First register two new UDFs, giving the UDFs a name, a lambda, and a data type to return.
      // We use the WordUtils.capitalize method of the Apache Commons Lang library to capitalize
      // the first character of each word.
      // Both UDFs return a String type, since the column data type is the same among both tables.
      
      // For the target name, the lambda gets the data before the comma.
      SparkFactory.sparkSession().udf().register("split_name",
          (String x) -> WordUtils.capitalize(x.substring(0, x.indexOf(","))),
          DataTypes.StringType);
      
      // And for the target brand, the lambda gets the data after the comma.
      SparkFactory.sparkSession().udf().register("split_brand",
          (String x) -> WordUtils.capitalize(x.substring(x.indexOf(",") + 1)),
          DataTypes.StringType);
                        
                      
    4. Apply the UDF split_name to the dataframe while assigning the output of the UDF to a new column name_temp
    5. Apply the UDF split_brand to the dataframe while assigning the output of the UDF to a new column brand
    6. Drop the column called split_brandname containing the original full value, of which is no longer needed
    7. Rename the column name_temp to name
    8.                     
      // Call withColumn operations, passing the source "NAME" to the UDFs "split_name" and
      // "split_brand" and storing the results in "name_temp" and "brand" respectively.
      // Then drop column "name" and rename "name_temp" to "name".
      leftTableTransformDF = leftTableTransformDF
          .withColumn("name_temp",
              functions.callUDF("split_name", functions.col("name")))
          .withColumn("brand",
              functions.callUDF("split_brand", functions.col("name")))
          .drop("name")
          .withColumnRenamed("name_temp", "name");
                          
                        

  3. Create a new column called units_sold based on a rounding up of an integer division of SALES_AMOUNT and PRICE
    1. Register a UDF calculate_units_sold that divides the columns SALES_AMOUNT and PRICE and returns a rounded-up integer
    2.                   
      // Handle the round-up integer division of "SALES_AMOUNT" and "PRICE" to determine units_sold.
      SparkFactory.sparkSession().udf().register("calculate_units_sold",
          (BigDecimal x, BigDecimal y) -> Integer.valueOf(
              x.divide(y, BigDecimal.ROUND_HALF_UP).setScale(0, BigDecimal.ROUND_HALF_UP).toString()
          ),
          DataTypes.IntegerType);
                        
                      
    3. Apply the UDF to the dataframe while assigning the output of the UDF to the new column units_sold
    4.                   
      // Call the withColumn operation, passing both the source SALES_AMOUNT and PRICE columns to the
      // UDF "calculate_units_sold" and storing the result in column "units_sold".
      leftTableTransformDF = leftTableTransformDF
          .withColumn("units_sold",
              functions.callUDF("calculate_units_sold", functions.col("sales_amount"),
                  functions.col("price")));
                        
                      

  4. Convert the values in the column TYPE from its text form into its ID value via a join operation with the reference data table appliance_type that exists in the Postgres database:
    1. Since the type_name column in the reference data table is capitalized, we register UDF capitalize_type that will capitalize all the data in column TYPE and apply the UDF
    2.                     
      // Handle the capitalization the first letter of each word of the source's "TYPE".
      SparkFactory.sparkSession().udf().register("capitalize_type",
          (String x) -> WordUtils.capitalize(x),
          DataTypes.StringType);
      
      // Call the withColumn operation, passing the "TYPE" column to the UDF "capitalize_type" and
      // storing the result in column "type".
      leftTableTransformDF = leftTableTransformDF
          .withColumn("type",
              functions.callUDF("capitalize_type", functions.col("type")));
                          
                        
    3. Join the source table appliance to the reference data table appliance_type using the column data TYPE equal to column data type_name
    4. Drop the column called type from the dataframe while selecting the type_id column from the reference data.
    5. Rename the column type_id to type
    6.                   
      // Join with the reference table to get the "type_id" value, drop the original "type" column,
      // and rename "type_id" to "type".
      leftTableTransformDF = leftTableTransformDF.as("a").join(typeTable.getDataFrame().as("b"),
          leftTableTransformDF.col("type").equalTo(typeTable.getDataFrame().col("type_name")),
          "leftouter")
          .select("a.*", "b.type_id")
          .drop("type")
          .withColumnRenamed("type_id", "type");
                        
                      

  5. Compare our QA outputs dataframe to the actual outputs from the development ETL
    1. At this point the data is expected to match between the transformed left table and the right table, however the columns in the transformed left table are not in the same order as the right table. To handle this, just perform a select operation using all of the columns in the target AppleTable DataFrame:
    2.                     
      // Select all columns in transformed left dataframe that exist in right dataframe, preserving
      // order of columns in right dataframe.
      leftTableTransformDF = leftTableTransformDF.selectExpr(rightTable.getDataFrame().columns());
      
                          
                        
    3. Once all the transformations are declared, we need to update the view:
    4.                   
      // Update the view of the transformed left dataframe
      leftTableTransformDF.createOrReplaceTempView(leftTable.getTempViewName();
                        
                      
    5. Then we just have to create a new AppleTable object for leftTable with the latest dataframe:
    6.                   
      // Create a new AppleTable with transformed dataframe
          AppleTable leftTableTransform = new AppleTable(leftTable.getSourceType(), leftTableTransformDF,
              leftTable.getDelimiter(), leftTable.getTempViewName());
      
                        
                      
    7. Finally, we compare the two AppleTables and assert that both the left and right difference counts are 0:
    8.                 
      // Comparison of transformed left dataframe and right dataframe
          Pair, Dataset> result = SparkCompare
              .compareAppleTables(leftTableTransform, rightTable);
      
          Assert.assertEquals(0, result.getLeft().count());
          Assert.assertEquals(0, result.getRight().count());
      
                      
                    
    9. If both assertions pass, then we know that the data matches between the two AppleTables and that our QA transformation matches the result of the development ETL.

What About Delimited Files?


It is also possible to handle transformations with flat file sources. In this case, we just need to add a few additional operations. Take an example input file that contains field values separated by “;” and rows separated by line feeds:

some refrigerator,some brand 1;refrigerator;1000.00;250.00;2017-06-01

some washer,some brand 2;washer;5000.00;500.00;2017-01-17

some dryer,some brand 3;dryer;2500.00;500.00;2017-04-23

First, before any of the transformations, we need to add a schema to the DataFrame and map the data accordingly. It is sufficient to use a String data type for all of the fields, but this will need to be accounted for in the transformations:

              
// Create a list of the source column names
    List fieldNamesLeft = Arrays
        .asList("name", "type", "sales_amount", "price", "date_added");

    // Create the schema for each column with the String data type and nullable property of true
    List structFieldsLeft = new ArrayList<>();
    for (String fieldNameLeft : fieldNamesLeft) {
      structFieldsLeft.add(DataTypes.createStructField(fieldNameLeft, DataTypes.StringType, true));
    }

    StructType leftSchema = DataTypes.createStructType(structFieldsLeft);
              

            

With the schema defined, we now map the DataFrame’s data to the schema. Since the DataFrame for the file type contains each row in a single String value, we get the String inside the lambda and split by the delimiter. This code should replace the column renaming from uppercase to lowercase that was done on the JDBC source:

              
// Create a dataframe containing the schema defined above, with data populated by splitting the
// text file by character ";"
Dataset leftTableTransformDF = leftTable.getDataFrame().map((MapFunction) x -> {
  Object[] columns = x.getString(0).split(";");
  return RowFactory.create(columns);
}, RowEncoder.apply(leftSchema));

              
            

After all the transformations are declared and the columns ordered properly, we flatten the DataFrame using the same delimiter provided to the target AppleTable (which is the default of “,” in this case):

              
// Flatten the transformed dataframe
leftTableTransformDF = SparkFactory.flattenDataFrame(leftTableTransformDF, ",");
              
            

Now we are able to perform the same transformations on flat files and resume our algorithm at step 2.

Conclusion


Using UDFs in conjunction with MegaSparkDiff to test ETLs is beneficial to the user even if not taking full advantage of MegaSparkDiff’s data comparison at scale. MegaSparkDiff improves the readability of the tests as it allows the transformations to be broken into different methods; it also allows transformations to be coded in Java, which can make things easier for more complex scenarios.

For additional information on MegaSparkDiff, visit FINRA’s github page and make sure to check out our code samples.