Many (if not all of) PySpark’s machine learning algorithms require the input data is concatenated into a single column (using the vector assembler command). This is all well and good, but applying non-machine learning algorithms (e.g., any aggregations) to data in this format can be a real pain. Here, I describe how to aggregate (average in this case) data in sparse and dense vectors.
I start by importing the necessary libraries and creating a spark dataframe, which includes a column of sparse vectors. Note that I am using ml.linalg SparseVector and not the SparseVector from mllib. This makes a big difference!
123456789101112
frompyspark.sqlimportfunctionsasFfrompyspark.sqlimporttypesasTfrompyspark.ml.linalgimportSparseVector,DenseVector# note that using Sparse and Dense Vectors from ml.linalg. There are other Sparse/Dense vectors in spark.df=sc.parallelize([(1,SparseVector(10,{1:1.0,2:1.0,3:2.0,4:1.0,5:3.0})),(2,SparseVector(10,{9:100.0})),(3,SparseVector(10,{1:1.0})),]).toDF(["row_num","features"])df.show()
row_num
features
1
(10,[1,2,3,4,5],[1.0, 1.0, 2.0, 1.0, 3.0])
2
(10,[9],[100.0])
3
(10,[1],[1.0])
Next, I write a udf, which changes the sparse vector into a dense vector and then changes the dense vector into a python list. The python list is then turned into a spark array when it comes out of the udf.