Databricks Spark jobs optimization techniques: Pandas UDF

Pandas UDF was introduced in Spark 2.3 and continues to be a useful technique for optimizing Spark jobs in Databricks. The idea of Pandas UDF is to narrow the gap between processing big data using Spark and developing in Python. When Spark engineers develop in Databricks, they use Spark DataFrame API to process or transform big data which are native Spark functions.

However, Python has become the default language for data scientists to build ML models, where a huge number of toolkits and libraries can be very useful. For example, while developing ML models, developers may depend on certain libraries available in Python that are not supported by Spark natively (like the basic Scikit learn library, which cannot be applied to Spark DataFrame). However, if developers develop in pure Python on Databricks, they barely take advantage of features (especially parallel processing for big data) from Spark.

In that case, Pandas UDF is there to apply Python functions directly on Spark DataFrame which allows engineers or scientists to develop in pure Python and still take advantage of Spark’s parallel processing features at the same time.

UDF vs Pandas UDF

UDF is an abbreviation of “user defined function” in Spark. Generally, all Spark-native functions applied on Spark DataFrame are vectorized, which takes advantage of Spark’s parallel processing. Although Spark already supports plenty of mainstream functions that cover most of the use cases, we might still want to build customized functions to transform data for migration existing scripts or for developers who are not familiar with Spark.

For example, let’s say we need a function to hash columns. Spark supports sha or md5 function natively, but UDF allows us to reuse the same hash and salt method on multiple columns. In addition, UDF allows the user to develop more complicated hash functions in pure Python or reuse the same function they have already developed. By converting UDF in Pandas UDF, the Pandas UDF will also process the column parallelly, which provides better performance than a UDF.

Native Spark Function

Databricks-Spark-tutorial-3-Native-Spark-Function

Spark Native Function: 

  • 11.11 seconds 
  • Always the fastest if functions are supported 

 

UDF

Databricks-Spark-tutorial-3-UDF

UDF: 

  • 31.84 seconds 
  • Easy to migrate. Much slower. 

 

Pandas UDF

Databricks-Spark-tutorial-3-PandasUDF

Pandas UDF: 

  • 24.39 seconds 
  • Faster than UDF 

 

Spark native functions will always have the best performance overall. However, when we have to develop some transformation function that is not supported by Spark, or it’s easier for developers to develop in pure Python, using Pandas UDF can optimize Spark jobs performance.

Grouped Map UDFs 

Another useful feature of Pandas UDF is the grouped map. The grouped map feature will split a Spark DataFrame into groups based on the groupby condition, and applies user-defined function to each group, which could transform each group of data parallelly like a native Spark function. 

One useful scenario for grouped map is to train multiple models based on groups when we have a training function. In pure Python, without additional parallel or groupby settings, developers will prepare a training dataset and a testing dataset for each group, then train the model one by one. By using Grouped Map UDFs, developers can apply the function on each group simultaneously, which works like parallel processing. 

Sequential train

Databricks-Spark-tutorial-3-Sequential-train

Sequential train: 

  • 27.4 minutes 
  • Apply function on each group sequentially 

 

Spark Grouped Map Pandas UDF

Databricks-Spark-tutorial-3-Spark-Grouped-Map-PandasUDF

Spark Grouped Map Pandas UDF: 

  • 3.84 minutes 
  • Apply Pandas UDF on each group simultaneously 

 

There are 8 Spark executors in the cluster. After applying Pandas UDF, the performance is almost optimized 8x, which means the 8 groups are trained at the same time. The largest benefit for Grouped Map Pandas UDF is that it can be easily converted from a normal Python function. In addition, it can be applied directly to Spark DataFrame without converting into Pandas DataFrame. 

Additional: Koalas 

In addition to Pandas UDF, Spark org released a new package called Koalas which is also targeted to optimize Python in Spark environments. Besides using Spark DataFrame API, users can also develop functions in pure Python using Pandas API but also take advantage of Spark parallel processing. 

To put it in context of Pandas UDF: Koalas can apply functions on Pandas DataFrame while Pandas UDF applies functions on Spark DataFrame. 

In summary, we have three options 

    1. Spark DataFrame API 
    2. Pandas UDF on Spark DataFrame 
    3. Koalas API (currently Spark Pandas API) on Pandas DataFrame 

All three will take advantage of parallel processing. 

Looking for more Databricks Spark job optimization tutorials? 

Check out some of the other techniques we’ve covered below: