Imbalanced classes is a common problem. Scikit-learn provides an easy fix - “balancing” class weights. This makes models more likely to predict the less common classes (e.g., logistic regression ).

The PySpark ML API doesn’t have this same functionality, so in this blog post, I describe how to balance class weights yourself.

1
2
3
4
5
6
7
8
9
10
11
import numpy as np
import pandas as pd
from itertools import chain
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import functions as F
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
sc = SparkContext ( "local" , "Example" )
spark = SparkSession ( sc )

Generate some random data and put the data in a Spark DataFrame. Note that the input variables are not predictive. The model will behave randomly. This is okay, since I am not interested in model accuracy.

1
2
3
4
5
6
7
8
9
10
11
12
X = np . random . normal ( 0 , 1 , ( 10000 , 10 ))
y = np . ones ( X . shape [ 0 ]) . astype ( int )
y [: 1000 ] = 0
np . random . shuffle ( y )
print ( np . mean ( y )) # 0.9
X = np . append ( X , y . reshape (( 10000 , 1 )), 1 )
DF = spark . createDataFrame ( pd . DataFrame ( X ))
DF = DF . withColumnRenamed ( "10" , "y" )

Here’s how Scikit-learn computes class weights when “balanced” weights are requested.

1
2
3
4
5
6
# class weight
# https://scikit-learn.org/stable/modules/generated/sklearn.linear_model.LogisticRegression.html
# n_samples / (n_classes * np.bincount(y)).
class_weights = { i : ii for i , ii in zip ( np . unique ( y ), len ( y ) / ( len ( np . unique ( y )) * np . bincount ( y )))}
print ( class_weights ) # {0: 5.0, 1: 0.5555555555555556}

Here’s how we can compute “balanced” weights with data from a PySpark DataFrame.

1
2
3
4
5
6
7
8
y_collect = DF . select ( "y" ) . groupBy ( "y" ) . count () . collect ()
unique_y = [ x [ "y" ] for x in y_collect ]
total_y = sum ([ x [ "count" ] for x in y_collect ])
unique_y_count = len ( y_collect )
bin_count = [ x [ "count" ] for x in y_collect ]
class_weights_spark = { i : ii for i , ii in zip ( unique_y , total_y / ( unique_y_count * np . array ( bin_count )))}
print ( class_weights_spark ) # {0.0: 5.0, 1.0: 0.5555555555555556}

PySpark needs to have a weight assigned to each instance (i.e., row) in the training set. I create a mapping to apply a weight to each training instance.

1
2
3
mapping_expr = F . create_map ([ F . lit ( x ) for x in chain ( * class_weights_spark . items ())])
DF = DF . withColumn ( "weight" , mapping_expr . getItem ( F . col ( "y" )))

I assemble all the input features into a vector.

1
2
3
assembler = VectorAssembler ( inputCols = [ str ( x ) for x in range ( 10 )], outputCol = "features" )
DF = assembler . transform ( DF ) . drop ( * [ str ( x ) for x in range ( 10 )])

And train a logistic regression. Without the instance weights, the model predicts all instances as the frequent class.

1
2
3
lr = LogisticRegression ( featuresCol = "features" , labelCol = "y" )
lrModel = lr . fit ( DF )
lrModel . transform ( DF ) . agg ( F . mean ( "prediction" )) . show ()

```
+---------------+
|avg(prediction)|
+---------------+
| 1.0|
+---------------+
```

With the weights, the model assigns half the instances to each class (even the less commmon one).

1
2
3
lr = LogisticRegression ( featuresCol = "features" , labelCol = "y" , weightCol = "weight" )
lrModel = lr . fit ( DF )
lrModel . transform ( DF ) . agg ( F . mean ( "prediction" )) . show ()

```
+---------------+
|avg(prediction)|
+---------------+
| 0.5089|
+---------------+
```