Skip to content
This repository was archived by the owner on Feb 9, 2025. It is now read-only.

Commit f508672

Browse files
authored
Merge pull request #13 from COM6012/rloftin/Lab_9
Adding lab 9
2 parents 1a05ce1 + 1da08ba commit f508672

1 file changed

Lines changed: 381 additions & 0 deletions

File tree

Lines changed: 381 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,381 @@
1+
e# Lab 9: PCA for dimensionality reduction and Spark data types
2+
3+
[COM6012 Scalable Machine Learning **2024**](https://github.com/COM6012/ScalableML) by [Shuo Zhou](https://shuo-zhou.github.io/) at The University of Sheffield
4+
5+
## Study schedule
6+
7+
- [Task 1](#1-data-types-in-rdd-based-api): To finish in the lab session on 26th April. **Essential**
8+
- [Task 2](#2-pca): To finish in the lab session on 26th April. **Essential**
9+
- [Task 3](#3-exercises): To finish by the following Wednesday 1st May. ***Exercise***
10+
- [Task 4](#4-additional-ideas-to-explore-optional): To explore further. *Optional*
11+
12+
### Suggested reading
13+
14+
- [Extracting, transforming and selecting features](https://spark.apache.org/docs/3.5.0/ml-features.html)
15+
- [PCA in Spark DataFrame API `pyspark.ml`](https://spark.apache.org/docs/3.5.0/ml-features.html#pca)
16+
- [SVD in Spark RDD API `pyspark.mllib`](https://spark.apache.org/docs/3.5.0/mllib-dimensionality-reduction.html#singular-value-decomposition-svd)
17+
- [StandardScaler in Spark](https://spark.apache.org/docs/3.5.0/ml-features.html#standardscaler) to standardise/normalise data to unit standard deviation and/or zero mean.
18+
- [Data Types - RDD-based API](https://spark.apache.org/docs/3.5.0/mllib-data-types.html)
19+
- [PCA on Wiki](https://en.wikipedia.org/wiki/Principal_component_analysis)
20+
- [Understanding Dimension Reduction with Principal Component Analysis (PCA)](https://blog.paperspace.com/dimension-reduction-with-principal-component-analysis/)
21+
- [Principal Component Analysis explained on Kaggle](https://www.kaggle.com/nirajvermafcb/principal-component-analysis-explained) with data available [here](https://www.kaggle.com/liujiaqi/hr-comma-sepcsv), and background info [here](https://rstudio-pubs-static.s3.amazonaws.com/345463_37f54d1c948b4cdfa181541841e0db8a.html)
22+
23+
## 1. Data Types in RDD-based API
24+
25+
To deal with data efficiently, Spark considers different [data types](https://spark.apache.org/docs/3.5.0/mllib-data-types.html). In particular, MLlib supports local vectors and matrices stored on a single machine, as well as distributed matrices backed by one or more RDDs. Local vectors and local matrices are simple data models that serve as public interfaces. The underlying linear algebra operations are provided by [Breeze](http://www.scalanlp.org/). A training example used in supervised learning is called a “labeled point” in MLlib.
26+
27+
### [Local vector](https://spark.apache.org/docs/3.5.0/mllib-data-types.html#local-vector): Dense vs Sparse
28+
29+
> A local vector has integer-typed and 0-based indices and double-typed values, stored on a single machine. MLlib supports two types of local vectors: dense and sparse. A dense vector is backed by a double array representing its entry values, while a sparse vector is backed by two parallel arrays: indices and values. For example, a vector (1.0, 0.0, 3.0) can be represented in dense format as [1.0, 0.0, 3.0] or in sparse format as (3, [0, 2], [1.0, 3.0]), where 3 is the size of the vector.
30+
31+
Check out the [Vector in RDD API](https://spark.apache.org/docs/3.5.0/api/python/reference/api/pyspark.mllib.linalg.Vectors.html?highlight=mllib%20linalg%20vectors#pyspark.mllib.linalg.Vectors) or [Vector in DataFrame API](https://spark.apache.org/docs/3.5.0/api/python/reference/api/pyspark.ml.linalg.Vector.html?highlight=ml%20linalg%20vector#pyspark.ml.linalg.Vector) (see method `.Sparse()`) and [SparseVector in RDD API ](https://spark.apache.org/docs/3.5.0/api/python/reference/api/pyspark.mllib.linalg.SparseVector.html?highlight=sparsevector#pyspark.mllib.linalg.SparseVector) or [SparseVector in DataFrame API ](https://spark.apache.org/docs/3.5.0/api/python/reference/api/pyspark.ml.linalg.SparseVector.html?highlight=sparsevector#pyspark.ml.linalg.SparseVector). The official example is below
32+
33+
```python
34+
import numpy as np
35+
from pyspark.mllib.linalg import Vectors
36+
37+
dv1 = np.array([1.0, 0.0, 3.0]) # Use a NumPy array as a dense vector.
38+
dv2 = [1.0, 0.0, 3.0] # Use a Python list as a dense vector.
39+
sv1 = Vectors.sparse(3, [0, 2], [1.0, 3.0]) # Create a SparseVector.
40+
```
41+
42+
Note the vector created by `Vectors.sparse()` is of type `SparseVector()`
43+
44+
```python
45+
sv1
46+
# SparseVector(3, {0: 1.0, 2: 3.0})
47+
```
48+
49+
To view the sparse vector in a dense format
50+
51+
```python
52+
sv1.toArray()
53+
# array([1., 0., 3.])
54+
```
55+
56+
### [Labeled point](https://spark.apache.org/docs/3.5.0/mllib-data-types.html#labeled-point)
57+
58+
> A labeled point is a local vector, either dense or sparse, associated with a label/response. In MLlib, labeled points are used in supervised learning algorithms. We use a double to store a label, so we can use labeled points in both regression and classification. For binary classification, a label should be either 0 (negative) or 1 (positive). For multiclass classification, labels should be class indices starting from zero: 0, 1, 2, ....
59+
60+
See [LabeledPoint API in MLlib](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.mllib.regression.LabeledPoint.html?highlight=labeledpoint#pyspark.mllib.regression.LabeledPoint). Now, we create a labeled point with a positive label and a dense feature vector, as well as a labeled point with a negative label and a sparse feature vector.
61+
62+
```python
63+
from pyspark.mllib.linalg import SparseVector
64+
from pyspark.mllib.regression import LabeledPoint
65+
66+
pos = LabeledPoint(1.0, [1.0, 0.0, 3.0])
67+
neg = LabeledPoint(0.0, SparseVector(3, [0, 2], [1.0, 3.0]))
68+
69+
neg
70+
# LabeledPoint(0.0, (3,[0,2],[1.0,3.0]))
71+
neg.label
72+
# 0.0
73+
neg.features
74+
# SparseVector(3, {0: 1.0, 2: 3.0})
75+
```
76+
77+
Now view the features as dense vector (rather than sparse vector)
78+
79+
```python
80+
neg.features.toArray()
81+
# array([1., 0., 3.])
82+
```
83+
84+
### [Local matrix](https://spark.apache.org/docs/3.5.0/mllib-data-types.html#local-matrix)
85+
86+
> A local matrix has integer-typed row and column indices and double-typed values, stored on a single machine. MLlib supports dense matrices, whose entry values are stored in a single double array in column-major order, and sparse matrices, whose non-zero entry values are stored in the Compressed Sparse Column (CSC) format in column-major order. For example, we create a dense matrix ((1.0, 2.0), (3.0, 4.0), (5.0, 6.0)) and a sparse matrix ((9.0, 0.0), (0.0, 8.0), (0.0, 6.0)) in the following:
87+
88+
```python
89+
from pyspark.mllib.linalg import Matrix, Matrices
90+
91+
dm2 = Matrices.dense(3, 2, [1, 3, 5, 2, 4, 6])
92+
sm = Matrices.sparse(3, 2, [0, 1, 3], [0, 2, 1], [9, 6, 8])
93+
print(dm2)
94+
# DenseMatrix([[1., 2.],
95+
# [3., 4.],
96+
# [5., 6.]])
97+
print(sm)
98+
# 3 X 2 CSCMatrix
99+
# (0,0) 9.0
100+
# (2,1) 6.0
101+
# (1,1) 8.0
102+
```
103+
104+
See [Scala API for Matrices.sparse](https://spark.apache.org/docs/3.5.0/api/scala/org/apache/spark/mllib/linalg/Matrices$.html) and from its [source code](https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala), we can see it creates a CSC [SparseMatrix](https://spark.apache.org/docs/3.5.0/api/scala/org/apache/spark/mllib/linalg/SparseMatrix.html).
105+
106+
Here the [compressed sparse column (CSC or CCS) format](https://en.wikipedia.org/wiki/Sparse_matrix#Compressed_sparse_column_(CSC_or_CCS)) is used for sparse matrix representation. You can learn it from this [simple explanation](https://stackoverflow.com/questions/44825193/how-to-create-a-sparse-cscmatrix-using-spark?answertab=votes#tab-top). To learn more about CSC, you may refer to a [top video](https://www.youtube.com/watch?v=fy_dSZb-Xx8) and a [top post with animation](https://matteding.github.io/2019/04/25/sparse-matrices/#compressed-sparse-matrices).
107+
> values are read first by column, a row index is stored for each value, and column pointers are stored. For example, CSC is (val, row_ind, col_ptr), where val is an array of the (top-to-bottom, then left-to-right) non-zero values of the matrix; row_ind is the row indices corresponding to the values; and, col_ptr is the list of val indexes where each column starts.
108+
109+
110+
```python
111+
dsm=sm.toDense()
112+
print(dsm)
113+
# DenseMatrix([[9., 0.],
114+
# [0., 8.],
115+
# [0., 6.]])
116+
```
117+
118+
### [Distributed matrix](https://spark.apache.org/docs/3.5.0/mllib-data-types.html#distributed-matrix)
119+
120+
> A distributed matrix has long-typed row and column indices and double-typed values, stored distributively in one or more RDDs. It is very important to choose the right format to store large and distributed matrices. Converting a distributed matrix to a different format may require a global shuffle, which is quite expensive. Four types of distributed matrices have been implemented so far.
121+
122+
#### RowMatrix
123+
124+
> The basic type is called RowMatrix. A RowMatrix is a row-oriented distributed matrix without meaningful row indices, e.g., a collection of feature vectors. It is backed by an RDD of its rows, where each row is a local vector. We assume that the number of columns is not huge for a RowMatrix so that a single local vector can be reasonably communicated to the driver and can also be stored / operated on using a single node.
125+
> Since each row is represented by a local vector, the number of columns is limited by the integer range but it should be much smaller in practice.
126+
127+
Now we create an RDD of vectors `rows`, from which we create a RowMatrix `mat`.
128+
129+
```python
130+
from pyspark.mllib.linalg.distributed import RowMatrix
131+
132+
rows = sc.parallelize([[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]])
133+
mat = RowMatrix(rows)
134+
135+
m = mat.numRows() # Get its size: m=4, n=3
136+
n = mat.numCols()
137+
138+
rowsRDD = mat.rows # Get the rows as an RDD of vectors again.
139+
```
140+
141+
We can view the RowMatrix in a dense matrix format
142+
143+
```python
144+
rowsRDD.collect()
145+
# [DenseVector([1.0, 2.0, 3.0]), DenseVector([4.0, 5.0, 6.0]), DenseVector([7.0, 8.0, 9.0]), DenseVector([10.0, 11.0, 12.0])]
146+
```
147+
148+
## 2. PCA
149+
150+
[Principal component analysis](http://en.wikipedia.org/wiki/Principal_component_analysis) (PCA) is a statistical procedure that uses an orthogonal transformation to convert a set of observations of possibly correlated variables (entities each of which takes on various numerical values) into a set of values of linearly uncorrelated variables called **principal components (PCs)**. A PCA class trains a model to project vectors to a low-dimensional space using PCA and this is probably the most commonly used **dimensionality reduction** method.
151+
152+
### PCA in DataFrame-based API `pyspark.ml`
153+
154+
Check out the [API](https://spark.apache.org/docs/3.5.0/api/python/reference/api/pyspark.ml.feature.PCA.html?highlight=pyspark%20ml%20feature%20pca#pyspark.ml.feature.PCA). Check [`pyspark.ml.feature.PCAModel`](https://spark.apache.org/docs/3.5.0/api/python/reference/api/pyspark.ml.feature.PCAModel.html?highlight=pyspark%20ml%20feature%20pcamodel#pyspark.ml.feature.PCAModel) too to see what is available for the fitted model. Let us project three 5-dimensional feature vectors into 2-dimensional principal components.
155+
156+
```python
157+
from pyspark.ml.feature import PCA
158+
from pyspark.ml.linalg import Vectors
159+
160+
data = [(Vectors.sparse(5, [(1, 1.0), (3, 7.0)]),),
161+
(Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),),
162+
(Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)]
163+
df = spark.createDataFrame(data, ["features"])
164+
df.show()
165+
# +--------------------+
166+
# | features|
167+
# +--------------------+
168+
# | (5,[1,3],[1.0,7.0])|
169+
# |[2.0,0.0,3.0,4.0,...|
170+
# |[4.0,0.0,0.0,6.0,...|
171+
# +--------------------+
172+
173+
pca = PCA(k=2, inputCol="features", outputCol="pcaFeatures")
174+
model = pca.fit(df)
175+
176+
result = model.transform(df).select("pcaFeatures")
177+
result.show(truncate=False)
178+
# +----------------------------------------+
179+
# |pcaFeatures |
180+
# +----------------------------------------+
181+
# |[1.6485728230883807,-4.013282700516296] |
182+
# |[-4.645104331781534,-1.1167972663619026]|
183+
# |[-6.428880535676489,-5.337951427775355] |
184+
# +----------------------------------------+
185+
```
186+
187+
Check the explained variance in percentage
188+
189+
```python
190+
model.explainedVariance
191+
# DenseVector([0.7944, 0.2056])
192+
```
193+
194+
Take a look at the principal components Matrix. Each column is one principal component.
195+
196+
```python
197+
print(model.pc)
198+
# DenseMatrix([[-0.44859172, -0.28423808],
199+
# [ 0.13301986, -0.05621156],
200+
# [-0.12523156, 0.76362648],
201+
# [ 0.21650757, -0.56529588],
202+
# [-0.84765129, -0.11560341]])
203+
```
204+
205+
### PCA in RDD-based API `pyspark.mllib`
206+
207+
#### Eigendecomposition for PCA
208+
209+
`pyspark.mllib` supports PCA for **tall-and-skinny** (big $n$, small $d$) matrices stored in row-oriented format and any Vectors. We demonstrate how to compute principal components on a [<tt>RowMatrix</tt>](http://spark.apache.org/docs/3.5.0/mllib-data-types.html#rowmatrix) and use them to project the vectors into a low-dimensional space in the cell below.
210+
211+
```python
212+
from pyspark.mllib.linalg import Vectors
213+
from pyspark.mllib.linalg.distributed import RowMatrix
214+
215+
rows = sc.parallelize([
216+
Vectors.sparse(5, {1: 1.0, 3: 7.0}),
217+
Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
218+
Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
219+
])
220+
rows.collect()
221+
# [SparseVector(5, {1: 1.0, 3: 7.0}), DenseVector([2.0, 0.0, 3.0, 4.0, 5.0]), DenseVector([4.0, 0.0, 0.0, 6.0, 7.0])]
222+
223+
mat = RowMatrix(rows)
224+
```
225+
226+
Compute the top 2 principal components, which are stored in a local dense matrix (the same as above).
227+
228+
```python
229+
pc = mat.computePrincipalComponents(2)
230+
print(pc)
231+
# DenseMatrix([[-0.44859172, -0.28423808],
232+
# [ 0.13301986, -0.05621156],
233+
# [-0.12523156, 0.76362648],
234+
# [ 0.21650757, -0.56529588],
235+
# [-0.84765129, -0.11560341]])
236+
```
237+
238+
Project the rows to the linear space spanned by the top 2 principal components (the same as above)
239+
240+
```python
241+
projected = mat.multiply(pc)
242+
projected.rows.collect()
243+
# [DenseVector([1.6486, -4.0133]), DenseVector([-4.6451, -1.1168]), DenseVector([-6.4289, -5.338])]
244+
```
245+
246+
Now we convert to dense rows to see the matrix
247+
248+
```python
249+
from pyspark.mllib.linalg import DenseVector
250+
251+
denseRows = rows.map(lambda vector: DenseVector(vector.toArray()))
252+
denseRows.collect()
253+
# [DenseVector([0.0, 1.0, 0.0, 7.0, 0.0]), DenseVector([2.0, 0.0, 3.0, 4.0, 5.0]), DenseVector([4.0, 0.0, 0.0, 6.0, 7.0])]
254+
```
255+
256+
#### SVD for PCA - more *scalable* way to do PCA
257+
258+
Read [SVD in RDD-based API `pyspark.mllib`](https://spark.apache.org/docs/3.5.0/mllib-dimensionality-reduction.html#singular-value-decomposition-svd). As covered in the lecture, we will need SVD for PCA on large-scale data. Here, we use it on the same small toy example to examine the relationship with eigenvalue decomposition based PCA methods above.
259+
260+
We compute the top 2 singular values and corresponding singular vectors.
261+
262+
```python
263+
svd = mat.computeSVD(2, computeU=True)
264+
U = svd.U # The U factor is a RowMatrix.
265+
s = svd.s # The singular values are stored in a local dense vector.
266+
V = svd.V # The V factor is a local dense matrix.
267+
```
268+
269+
If we are doing it right, the **right** singular vectors should be the same as the eigenvectors.
270+
271+
```python
272+
print(V)
273+
# DenseMatrix([[-0.31278534, 0.31167136],
274+
# [-0.02980145, -0.17133211],
275+
# [-0.12207248, 0.15256471],
276+
# [-0.71847899, -0.68096285],
277+
# [-0.60841059, 0.62170723]])
278+
```
279+
280+
But it is **not the same**! Why? Remeber that we need to do **centering**! We can do so use the [StandardScaler (check out the API](https://spark.apache.org/docs/3.5.0/mllib-feature-extraction.html#standardscaler)) to center the data, i.e., remove the mean.
281+
282+
```python
283+
from pyspark.mllib.feature import StandardScaler
284+
285+
standardizer = StandardScaler(True, False)
286+
model = standardizer.fit(rows)
287+
centeredRows = model.transform(rows)
288+
centeredRows.collect()
289+
# [DenseVector([-2.0, 0.6667, -1.0, 1.3333, -4.0]), DenseVector([0.0, -0.3333, 2.0, -1.6667, 1.0]), DenseVector([2.0, -0.3333, -1.0, 0.3333, 3.0])]
290+
centeredmat = RowMatrix(centeredRows)
291+
```
292+
293+
Compute the top 2 singular values and corresponding singular vectors.
294+
295+
```python
296+
svd = centeredmat.computeSVD(2, computeU=True)
297+
U = svd.U # The U factor is a RowMatrix.
298+
s = svd.s # The singular values are stored in a local dense vector.
299+
V = svd.V # The V factor is a local dense matrix.
300+
```
301+
302+
Check the **PC** obtained this time (it is the same as the above PCA methods now)
303+
304+
```python
305+
print(V)
306+
# DenseMatrix([[-0.44859172, -0.28423808],
307+
# [ 0.13301986, -0.05621156],
308+
# [-0.12523156, 0.76362648],
309+
# [ 0.21650757, -0.56529588],
310+
# [-0.84765129, -0.11560341]])
311+
```
312+
313+
Let us examine the relationships between the singular values and the eigenvalues.
314+
315+
```python
316+
print(s)
317+
# [6.001041088520536,3.0530049438580336]
318+
```
319+
320+
We get the eigenvalues by taking squares of the singular values
321+
322+
```python
323+
evs=s*s
324+
print(evs)
325+
# [36.012494146111734,9.320839187221594]
326+
```
327+
328+
Now we compute the percentage of variance captures and compare with the above to verify (see/search `model.explainedVariance`).
329+
330+
```python
331+
evs/sum(evs)
332+
# DenseVector([0.7944, 0.2056])
333+
```
334+
335+
## 3. Exercises
336+
337+
### PCA on iris
338+
339+
Study the [Iris flower data set](https://en.wikipedia.org/wiki/Iris_flower_data_set) `iris.csv` under `Data` with PCA.
340+
341+
1. Follow [Understanding Dimension Reduction with Principal Component Analysis (PCA)](https://blog.paperspace.com/dimension-reduction-with-principal-component-analysis/) to do the same analysis using the DataFrame-based PCA `pca.fit()` from `pyspark.ml`.
342+
2. Follow this lab to verify that using the other two RDD-based PCA APIs `computePrincipalComponents` and `computeSVD` will give the same PCA features.
343+
344+
## 4. Additional ideas to explore (*optional*)
345+
346+
### [HR analytics](https://rstudio-pubs-static.s3.amazonaws.com/345463_37f54d1c948b4cdfa181541841e0db8a.html)
347+
348+
A company is trying to figure out why their best and experienced employees are leaving prematurely from a [dataset](https://www.kaggle.com/liujiaqi/hr-comma-sepcsv). Follow the example [Principal Component Analysis explained on Kaggle](https://www.kaggle.com/nirajvermafcb/principal-component-analysis-explained) to perform such analysis in PySpark, using as many PySpark APIs as possible.
349+
350+
351+
### Word meaning extraction
352+
353+
Use PySpark to perform the steps in IBM's notebook on [Spark-based machine learning for word meanings](https://github.com/IBMDataScience/word2vec/blob/master/Spark-based%20machine%20learning%20for%20word%20meanings.ipynb) that makes use of PCA, kmeans, and Word2Vec to learn word meanings.
354+
355+
### Bag of words analysis
356+
357+
Choose a [Bag of Words Data Set](https://archive.ics.uci.edu/ml/datasets/Bag+of+Words). Let us take the **NIPS full papers** data as an example.
358+
359+
The format of this data is
360+
361+
```markdown
362+
Number of documents
363+
Number of words in the vocabulary
364+
Total number of words in the collection
365+
docID wordID count
366+
docID wordID count
367+
...
368+
docID wordID count
369+
```
370+
371+
Our data matrix will be $doc \times wordcount$. To begin, we need to read this data in. Possible steps would include:
372+
373+
1. extract the number of documents and the size of the vocabulary, and strip off the first 3 lines
374+
2. combine the words per document
375+
3. create sparse vectors (for better space efficiency)
376+
377+
Start from a small dataset to test your work, and then checking **whether** your work scales up to the big **NYTIMES** bagofwords data. Keep everything as parallel as possible.
378+
379+
### Large image datasets
380+
381+
Find some large-scale image datasets to examine the principal components and explore low-dimensional representations.

0 commit comments

Comments
 (0)