Dan Vatterott

Data Scientist

Balancing Model Weights in PySpark

Imbalanced classes is a common problem. Scikit-learn provides an easy fix - “balancing” class weights. This makes models more likely to predict the less common classes (e.g., logistic regression).

The PySpark ML API doesn’t have this same functionality, so in this blog post, I describe how to balance class weights yourself.

1
2
3
4
5
6
7
8
9
10
11
import numpy as np
import pandas as pd
from itertools import chain
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import functions as F
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression

sc = SparkContext("local", "Example")
spark = SparkSession(sc)

Generate some random data and put the data in a Spark DataFrame. Note that the input variables are not predictive. The model will behave randomly. This is okay, since I am not interested in model accuracy.

1
2
3
4
5
6
7
8
9
10
11
12
X = np.random.normal(0, 1, (10000, 10))

y = np.ones(X.shape[0]).astype(int)
y[:1000] = 0
np.random.shuffle(y)

print(np.mean(y)) # 0.9

X = np.append(X, y.reshape((10000, 1)), 1)

DF = spark.createDataFrame(pd.DataFrame(X))
DF = DF.withColumnRenamed("10", "y")

Here’s how Scikit-learn computes class weights when “balanced” weights are requested.

1
2
3
4
5
6
# class weight
# https://scikit-learn.org/stable/modules/generated/sklearn.linear_model.LogisticRegression.html
# n_samples / (n_classes * np.bincount(y)).

class_weights = {i: ii for i, ii in zip(np.unique(y), len(y) / (len(np.unique(y)) * np.bincount(y)))}
print(class_weights) # {0: 5.0, 1: 0.5555555555555556}

Here’s how we can compute “balanced” weights with data from a PySpark DataFrame.

1
2
3
4
5
6
7
8
y_collect = DF.select("y").groupBy("y").count().collect()
unique_y = [x["y"] for x in y_collect]
total_y = sum([x["count"] for x in y_collect])
unique_y_count = len(y_collect)
bin_count = [x["count"] for x in y_collect]

class_weights_spark = {i: ii for i, ii in zip(unique_y, total_y / (unique_y_count * np.array(bin_count)))}
print(class_weights_spark) # {0.0: 5.0, 1.0: 0.5555555555555556}

PySpark needs to have a weight assigned to each instance (i.e., row) in the training set. I create a mapping to apply a weight to each training instance.

1
2
3
mapping_expr = F.create_map([F.lit(x) for x in chain(*class_weights_spark.items())])

DF = DF.withColumn("weight", mapping_expr.getItem(F.col("y")))

I assemble all the input features into a vector.

1
2
3
assembler = VectorAssembler(inputCols=[str(x) for x in range(10)], outputCol="features")

DF = assembler.transform(DF).drop(*[str(x) for x in range(10)])

And train a logistic regression. Without the instance weights, the model predicts all instances as the frequent class.

1
2
3
lr = LogisticRegression(featuresCol="features", labelCol="y")
lrModel = lr.fit(DF)
lrModel.transform(DF).agg(F.mean("prediction")).show()
+---------------+
|avg(prediction)|
+---------------+
|            1.0|
+---------------+

With the weights, the model assigns half the instances to each class (even the less commmon one).

1
2
3
lr = LogisticRegression(featuresCol="features", labelCol="y", weightCol="weight")
lrModel = lr.fit(DF)
lrModel.transform(DF).agg(F.mean("prediction")).show()
+---------------+
|avg(prediction)|
+---------------+
|         0.5089|
+---------------+

Comments