Dan Vatterott

Data Scientist

Aggregating Sparse and Dense Vectors in PySpark

Many (if not all of) PySpark’s machine learning algorithms require the input data is concatenated into a single column (using the vector assembler command). This is all well and good, but applying non-machine learning algorithms (e.g., any aggregations) to data in this format can be a real pain. Here, I describe how to aggregate (average in this case) data in sparse and dense vectors.

I start by importing the necessary libraries and creating a spark dataframe, which includes a column of sparse vectors. Note that I am using ml.linalg SparseVector and not the SparseVector from mllib. This makes a big difference!

1
2
3
4
5
6
7
8
9
10
11
12
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.ml.linalg import SparseVector, DenseVector
# note that using Sparse and Dense Vectors from ml.linalg. There are other Sparse/Dense vectors in spark.

df = sc.parallelize([
  (1, SparseVector(10, {1: 1.0, 2: 1.0, 3: 2.0, 4: 1.0, 5: 3.0})),
  (2, SparseVector(10, {9: 100.0})),
  (3, SparseVector(10, {1: 1.0})),
]).toDF(["row_num", "features"])

df.show()
row_num features
1 (10,[1,2,3,4,5],[1.0, 1.0, 2.0, 1.0, 3.0])
2 (10,[9],[100.0])
3 (10,[1],[1.0])

Next, I write a udf, which changes the sparse vector into a dense vector and then changes the dense vector into a python list. The python list is then turned into a spark array when it comes out of the udf.

1
2
3
4
5
6
7
8
9
def sparse_to_array(v):
  v = DenseVector(v)
  new_array = list([float(x) for x in v])
  return new_array

sparse_to_array_udf = F.udf(sparse_to_array, T.ArrayType(T.FloatType()))

df = df.withColumn('features_array', sparse_to_array_udf('features'))
df.show()
row_num features features_array
1 (10,[1,2,3,4,5],[1.0, 1.0, 2.0, 1.0, 3.0]) [0.0, 1.0, 1.0, 2.0, 1.0, 3.0, 0.0, 0.0, 0.0, 0.0]
2 (10,[9],[100.0]) [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 100.0]
3 (10,[1],[1.0]) [0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]

Now that the data is in a PySpark array, we can apply the desired PySpark aggregation to each item in the array.

1
2
df_agg = df.agg(F.array(*[F.avg(F.col('features_array')[i]) for i in range(10)]).alias("averages"))
df_agg.show()
averages
[0.0, 0.66667, 0.33333, 0.66667, 0.33333, 1.0, 0.0, 0.0, 0.0, 33.33333]

Now, let’s run through the same exercise with dense vectors. We start by creating a spark dataframe with a column of dense vectors.

1
2
3
4
5
6
7
df = sc.parallelize([
  (1, DenseVector([0.0, 1.0, 1.0, 2.0, 1.0, 3.0, 0.0, 0.0, 0.0, 0.0])),
  (2, DenseVector([0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 100.0])),
  (3, DenseVector([0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0])),
]).toDF(["row_num", "features"])

df.show()
row_num features
1 [0.0, 1.0, 1.0, 2.0, 1.0, 3.0, 0.0, 0.0, 0.0, 0.0]
2 [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 100.0]
3 [0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]

Next, we create another PySpark udf which changes the dense vector into a PySpark array.

1
2
3
4
5
6
7
8
def dense_to_array(v):
  new_array = list([float(x) for x in v])
  return new_array

dense_to_array_udf = F.udf(dense_to_array, T.ArrayType(T.FloatType()))

df = df.withColumn('features_array', dense_to_array_udf('features'))
df.show()
row_num features features_array
1 [0.0, 1.0, 1.0, 2.0, 1.0, 3.0, 0.0, 0.0, 0.0, 0.0] [0.0, 1.0, 1.0, 2.0, 1.0, 3.0, 0.0, 0.0, 0.0, 0.0]
2 [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 100.0] [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 100.0]
3 [0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0] [0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]

Finally, we can use our standard PySpark aggregators to each item in the PySpark array.

1
2
df_agg = df.agg(F.array(*[F.avg(F.col('features_array')[i]) for i in range(10)]).alias("averages"))
df_agg.show()
averages
[0.0, 0.66667, 0.33333, 0.66667, 0.33333, 1.0, 0.0, 0.0, 0.0, 33.33333]

There we go! Hope you find this info helpful!

Comments