Many (if not all of) PySpark's machine learning algorithms require the input data is concatenated into a single column (using the vector assembler command). This is all well and good, but applying non-machine learning algorithms (e.g., any aggregations) to data in this format can be a real pain. Here, I describe how to aggregate (average in this case) data in sparse and dense vectors.
I start by importing the necessary libraries and creating a spark dataframe, which includes a column of sparse vectors. Note that I am using ml.linalg SparseVector and not the SparseVector from mllib. This makes a big difference!
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.ml.linalg import SparseVector, DenseVector
# note that using Sparse and Dense Vectors from ml.linalg. There are other Sparse/Dense vectors in spark.
df = sc.parallelize([
(1, SparseVector(10, {1: 1.0, 2: 1.0, 3: 2.0, 4: 1.0, 5: 3.0})),
(2, SparseVector(10, {9: 100.0})),
(3, SparseVector(10, {1: 1.0})),
]).toDF(["row_num", "features"])
df.show()
row_num | features |
---|---|
1 | (10,[1,2,3,4,5],[1.0, 1.0, 2.0, 1.0, 3.0]) |
2 | (10,[9],[100.0]) |
3 | (10,[1],[1.0]) |
Next, I write a udf, which changes the sparse vector into a dense vector and then changes the dense vector into a python list. The python list is then turned into a spark array when it comes out of the udf.
def sparse_to_array(v):
v = DenseVector(v)
new_array = list([float(x) for x in v])
return new_array
sparse_to_array_udf = F.udf(sparse_to_array, T.ArrayType(T.FloatType()))
df = df.withColumn('features_array', sparse_to_array_udf('features'))
df.show()
row_num | features | features_array |
---|---|---|
1 | (10,[1,2,3,4,5],[1.0, 1.0, 2.0, 1.0, 3.0]) | [0.0, 1.0, 1.0, 2.0, 1.0, 3.0, 0.0, 0.0, 0.0, 0.0] |
2 | (10,[9],[100.0]) | [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 100.0] |
3 | (10,[1],[1.0]) | [0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0] |
Now that the data is in a PySpark array, we can apply the desired PySpark aggregation to each item in the array.
df_agg = df.agg(F.array(*[F.avg(F.col('features_array')[i]) for i in range(10)]).alias("averages"))
df_agg.show()
averages |
---|
[0.0, 0.66667, 0.33333, 0.66667, 0.33333, 1.0, 0.0, 0.0, 0.0, 33.33333] |
Now, let's run through the same exercise with dense vectors. We start by creating a spark dataframe with a column of dense vectors.
df = sc.parallelize([
(1, DenseVector([0.0, 1.0, 1.0, 2.0, 1.0, 3.0, 0.0, 0.0, 0.0, 0.0])),
(2, DenseVector([0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 100.0])),
(3, DenseVector([0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0])),
]).toDF(["row_num", "features"])
df.show()
row_num | features |
---|---|
1 | [0.0, 1.0, 1.0, 2.0, 1.0, 3.0, 0.0, 0.0, 0.0, 0.0] |
2 | [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 100.0] |
3 | [0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0] |
Next, we create another PySpark udf which changes the dense vector into a PySpark array.
def dense_to_array(v):
new_array = list([float(x) for x in v])
return new_array
dense_to_array_udf = F.udf(dense_to_array, T.ArrayType(T.FloatType()))
df = df.withColumn('features_array', dense_to_array_udf('features'))
df.show()
row_num | features | features_array |
---|---|---|
1 | [0.0, 1.0, 1.0, 2.0, 1.0, 3.0, 0.0, 0.0, 0.0, 0.0] | [0.0, 1.0, 1.0, 2.0, 1.0, 3.0, 0.0, 0.0, 0.0, 0.0] |
2 | [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 100.0] | [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 100.0] |
3 | [0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0] | [0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0] |
Finally, we can use our standard PySpark aggregators to each item in the PySpark array.
df_agg = df.agg(F.array(*[F.avg(F.col('features_array')[i]) for i in range(10)]).alias("averages"))
df_agg.show()
averages |
---|
[0.0, 0.66667, 0.33333, 0.66667, 0.33333, 1.0, 0.0, 0.0, 0.0, 33.33333] |
There we go! Hope you find this info helpful!