Dan Vatterott

Data Scientist

Creating a CDF in PySpark

CDFs are a useful tool for understanding your data. This tutorial will demonstrate how to create a CDF in PySpark.

I start by creating normally distributed, fake data.

1
2
3
4
5
6
7
8
9
10
11
import numpy as np
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import functions as F
from pyspark.sql.window import Window

sc = SparkContext("local", "Example")
spark = SparkSession(sc)

a = (sc.parallelize([(float(x),) for x in np.random.normal(0, 1, 1000)]).toDF(['X']))
a.limit(5).show()
X
1.3162087724709406
-0.9226127327757598
0.5388249247619141
-0.38263792383896356
0.20584675505779562

To create the CDF I need to use a window function to order the data. I can then use percent_rank to retrieve the percentile associated with each value.

The only trick here is I round the column of interest to make sure I don’t retrieve too much data onto the master node (not a concern here, but always good to think about).

After rounding, I group by the variable of interest, again, to limit the amount of data returned.

1
2
3
4
5
6
7
8
9
win = Window.orderBy('X')

output = (a
          .withColumn('cumulative_probability', F.percent_rank().over(win))
          .withColumn("X", F.round(F.col("X"), 1))
          .groupBy("X")
          .agg(F.max("cumulative_probability").alias("cumulative_probability"),F.count('*').alias("my_count")))

output.limit(5).show()
X cumulative_probability my_count
-3.5 0.0 1
-3.3 0.001001001001001001 1
-2.9 0.002002002002002002 1
-2.8 0.003003003003003003 1
-2.7 0.004004004004004004 1

A CDF should report the percent of data less than or equal to the specified value. The data returned above is the percent of data less than the specified value. We need to fix this by shifting the data up.

To shift the data, I will use the function, lead.

1
2
3
4
5
6
7
8
9
output = (a
          .withColumn('cumulative_probability', F.percent_rank().over(win))
          .withColumn("X", F.round(F.col("X"), 1))
          .groupBy("X")
          .agg(F.max("cumulative_probability").alias("cumulative_probability"),F.count('*').alias("my_count"))
          .withColumn("cumulative_probability", F.lead(F.col("cumulative_probability")).over(win))
          .fillna(1, subset=["cumulative_probability"]))

output.limit(5).show()
X cumulative_probability my_count
-3.5 0.001001001001001001 1
-3.3 0.002002002002002002 1
-2.9 0.003003003003003003 1
-2.8 0.004004004004004004 1
-2.7 0.005005005005005005 1

There we go! A CDF of the data! I hope you find this helpful!

Comments