3 Reasons Every Spark Data Scientist Will Love Spark 3

Louise Forbes
5 min readOct 26, 2021

--

Photo by Shumilov Ludmila on Unsplash

Spark 3 has been out for just over a year (June 2020). I know I’m a bit late to this party but rather late than never, right? While there are MANY cool features of Spark 3, see the full list here, I want to highlight 3 in the MLlib module of Spark that are of particular interest to Data Scientists.

  1. vector_to_array
  2. setWeightCol for a broad set of models
  3. StringIndexer multiple column support

vector_to_array

Every MLlib preprocessing algorithm will output a column of type Vector and every model algorithm requires a features column of type Vector and the algorithm will output a column (of type Vector) with the probability of each class. This is done because machine learning algorithms are based on vectors so inputs need to be of Vector type.

Extracting individual elements from a vector is not straightforward (simple indexing methods used with Arrays, Lists and Seqs do not work with Vectors) and this creates problems for Data Scientists. Something as simple as extracting the probability of the positive class is quite challenging and there are many StackOverflow questions relating to this exact topic.

Spark 3 now has the function vector_to_array (and the inverse array_to_vector) that will convert your Vector column to an Array and then you can use all the standard array functions to extract individual elements.

//Setting up pipeline for model
val assembler = new VectorAssembler()
val rfClass = new RandomForestClassifier()val pipelineR = new Pipeline()
.setStages(Array(assembler, rfClass))
val model= pipeline.fit(testDataTbl)val predictions = model.transform(testDataTbl)val predictionOut = predictions.select("rawPrediction", "probability", "prediction")predictionOut.show()+------------------+---------------+------------+
| rawPrediction | probability | prediction |
+------------------+---------------+------------+
| [102.589,97.410] | [0.512,0.487] | 0.0 |
| [106.374,93.625] | [0.531,0.468] | 0.0 |
| [101.300,98.699] | [0.506,0.493] | 0.0 |
| [94.831,105.168] | [0.474,0.525] | 1.0 |
| [101.207,98.792] | [0.506,0.493] | 0.0 |
| [103.085,96.914] | [0.515,0.484] | 0.0 |
| [69.018,130.981] | [0.345,0.654] | 1.0 |
| [99.677,100.322] | [0.498,0.501] | 1.0 |
+------------------+---------------+------------+

Side tangent, rawPrediction has been made public in Classification models in Spark 3 (so maybe there’s 4 reasons in this article).

To extract the prediction probability:

import org.apache.spark.ml.functions.vector_to_arrayval predictionsArray = predictionOut
.withColumn("predArray",vector_to_array($"probability"))
.withColumn("prob_1", $"predArray"(1) )
.select("probability", "prediction", "predArray", "prob_1")
predictionsArray.show()

+---------------+------------+----------------+--------+
| probability | prediction | predArray | prob_1 |
+---------------+------------+----------------+--------+
| [0.512,0.487] | 0.0 | [0.512,0.487] | 0.487 |
| [0.531,0.468] | 0.0 | [0.531,0.468] | 0.468 |
| [0.506,0.493] | 0.0 | [0.506,0.493] | 0.493 |
| [0.474,0.525] | 1.0 | [0.474,0.525] | 0.525 |
| [0.506,0.493] | 0.0 | [0.506,0.493] | 0.493 |
| [0.515,0.484] | 0.0 | [0.515,0.484] | 0.484 |
| [0.345,0.654] | 1.0 | [0.345,0.654] | 0.654 |
| [0.498,0.501] | 1.0 | [0.498,0.501] | 0.501 |
+---------------+------------+----------------+--------+

The columns probablity and predArray look the same but their columns type is different

predictionsArray.printSchema()
root
|-- probability: vector (nullable = true)
|-- prediction: double (nullable = false)
|-- predArray: array (nullable = false)
| |-- element: double (containsNull = false)
|-- prob_1: double (nullable = true)

setWeightCol for a broad set of models

The setWeightCol parameter setter has been a feature of the Logistic Regression class since 1.6.0 which adds the samples weights to the algorithm. This is very useful in imbalanced datasets to attain better accuracy. Data Scientists have been waiting patiently for it to be implemented on the other models. Well, it’s finally here!

Spark 3 has implemented the setWeightCol parameter setter on the following models:

  • DecisionTreeClassifier/Regressor
  • RandomForestClassifier/Regressor
  • GBTClassifier/Regressor
  • RegressionEvaluator
  • BinaryClassificationEvaluator
  • BisectingKMeans
  • KMeans
  • GaussianMixture

To assign the weight column you need to first create the column with the weights (there are many methods to do this).

val targetCount = dataTbl.filter($”target” === 1).count().toDoubleval tblCount = dataTbl.count().toDoubleval balancingRatio = targetCount/tblCountval calculateWeights = udf { d: Double =>
if (d == 0) { 1 * balancingRatio }
else { (1 * (1.0 - balancingRatio)) }
}
val dataTblWeight = dataTbl.withColumn("classWeightCol", calculateWeights($”target”))dataTblWeight.select("target", "classweightcol").show()+--------+----------------+
| target | classweightcol |
+--------+----------------+
| 0 | 0.219 |
| 1 | 0.780 |
| 0 | 0.219 |
| 0 | 0.219 |
| 0 | 0.219 |
| 1 | 0.780 |
+--------+----------------+
val assembler = new VectorAssembler()
.setOutputCol("features")
val dataTblVector = assembler.transform(dataTblWeight)val RfClass= new RandomForestClassifier()
.setLabelCol("target")
.setFeaturesCol("features")
.setWeightCol("classWeightCol")
val modelRF = RfClass.fit(dataTblVector)

StringIndexer multiple column support

The StringIndexer class is needed to convert StringType columns into IntegerType (as Spark can only deal with numeric data). The single input parameter setters of setInputCol and setOutputCol are easy enough when you’re just converting the target column but when you have multiple string fields it became tedious to have a StringIndexer for each column. There are many code snippets involving a for loop to convert every required column.

Enter Spark 3 with additional parameter setters of setInputCols and setOutputCols to convert multiple columns at once.

import org.apache.spark.ml.feature.StringIndexerval sampleDF =  spark.createDataFrame(
Seq((0, "A", "type 1"), (1, "B", "type 2"), (2, "C", "type 2"), (3, "A", "type 2"), (4, "A", "type 1"), (5, "C", "type 1"))
).toDF("id", "category", "type")
sampleDF.show()

+----+----------+--------+
| id | category | type |
+----+----------+--------+
| 0 | A | type 1 |
| 1 | B | type 2 |
| 2 | C | type 2 |
| 3 | A | type 2 |
| 4 | A | type 1 |
+----+----------+--------+
val indexer = new StringIndexer()
.setInputCols(Array("category", "type"))
.setOutputCols(Array("categoryIndex", "typeIndex"))
val indexedDF = indexer.fit(sampleDF).transform(sampleDF)indexedDF.show()+----+----------+--------+---------------+-----------+
| id | category | type | categoryIndex | typeIndex |
+----+----------+--------+---------------+-----------+
| 0 | A | type 1 | 0.0 | 0.0 |
| 1 | B | type 2 | 2.0 | 1.0 |
| 2 | C | type 2 | 1.0 | 1.0 |
| 3 | A | type 2 | 0.0 | 1.0 |
| 4 | A | type 1 | 0.0 | 0.0 |
| 5 | C | type 1 | 1.0 | 0.0 |
+----+----------+--------+---------------+-----------+

I’m sure you’ll agree that these updates are a huge efficiency boost for Data Scientists. Come join the Spark 3 party!

--

--

Louise Forbes
Louise Forbes

Written by Louise Forbes

I like learning about things, mainly data science.

No responses yet