Skip to content
This repository was archived by the owner on Feb 9, 2025. It is now read-only.

Commit e6f1443

Browse files
committed
add lab 7
1 parent 6757e9c commit e6f1443

1 file changed

Lines changed: 383 additions & 0 deletions

File tree

Lines changed: 383 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,383 @@
1+
# Lab 7: PCA for dimensionality reduction and Spark data types
2+
3+
[COM6012 Scalable Machine Learning **2023**](https://github.com/haipinglu/ScalableML) by [Haiping Lu](https://haipinglu.github.io/) at The University of Sheffield
4+
5+
**Accompanying lectures**: [YouTube video lectures recorded in Year 2020/21.](https://www.youtube.com/watch?v=jz48ORUxqB4&list=PLuRoUKdWifzylmRAERh5ipVhmQY_IoS4Y)
6+
7+
## Study schedule
8+
9+
- [Task 1](#1-data-types-in-rdd-based-api): To finish in the lab session on 28th April. **Essential**
10+
- [Task 2](#2-pca): To finish in the lab session on 28th April. **Essential**
11+
- [Task 3](#3-exercises): To finish by the following Wednesday 29th March. ***Exercise***
12+
- [Task 4](#4-additional-ideas-to-explore-optional): To explore further. *Optional*
13+
14+
### Suggested reading
15+
16+
- [Extracting, transforming and selecting features](https://spark.apache.org/docs/3.3.1/ml-features.html)
17+
- [PCA in Spark DataFrame API `pyspark.ml`](https://spark.apache.org/docs/3.3.1/ml-features.html#pca)
18+
- [SVD in Spark RDD API `pyspark.mllib`](https://spark.apache.org/docs/3.3.1/mllib-dimensionality-reduction.html#singular-value-decomposition-svd)
19+
- [StandardScaler in Spark](https://spark.apache.org/docs/3.3.1/ml-features.html#standardscaler) to standardise/normalise data to unit standard deviation and/or zero mean.
20+
- [Data Types - RDD-based API](https://spark.apache.org/docs/3.3.1/mllib-data-types.html)
21+
- [PCA on Wiki](https://en.wikipedia.org/wiki/Principal_component_analysis)
22+
- [Understanding Dimension Reduction with Principal Component Analysis (PCA)](https://blog.paperspace.com/dimension-reduction-with-principal-component-analysis/)
23+
- [Principal Component Analysis explained on Kaggle](https://www.kaggle.com/nirajvermafcb/principal-component-analysis-explained) with data available [here](https://www.kaggle.com/liujiaqi/hr-comma-sepcsv), and background info [here](https://rstudio-pubs-static.s3.amazonaws.com/345463_37f54d1c948b4cdfa181541841e0db8a.html)
24+
25+
## 1. Data Types in RDD-based API
26+
27+
To deal with data efficiently, Spark considers different [data types](https://spark.apache.org/docs/3.3.1/mllib-data-types.html). In particular, MLlib supports local vectors and matrices stored on a single machine, as well as distributed matrices backed by one or more RDDs. Local vectors and local matrices are simple data models that serve as public interfaces. The underlying linear algebra operations are provided by [Breeze](http://www.scalanlp.org/). A training example used in supervised learning is called a “labeled point” in MLlib.
28+
29+
### [Local vector](https://spark.apache.org/docs/3.3.1/mllib-data-types.html#local-vector): Dense vs Sparse
30+
31+
> A local vector has integer-typed and 0-based indices and double-typed values, stored on a single machine. MLlib supports two types of local vectors: dense and sparse. A dense vector is backed by a double array representing its entry values, while a sparse vector is backed by two parallel arrays: indices and values. For example, a vector (1.0, 0.0, 3.0) can be represented in dense format as [1.0, 0.0, 3.0] or in sparse format as (3, [0, 2], [1.0, 3.0]), where 3 is the size of the vector.
32+
33+
Check out the [Vector in RDD API](https://spark.apache.org/docs/3.3.1/api/python/reference/api/pyspark.mllib.linalg.Vectors.html?highlight=mllib%20linalg%20vectors#pyspark.mllib.linalg.Vectors) or [Vector in DataFrame API](https://spark.apache.org/docs/3.3.1/api/python/reference/api/pyspark.ml.linalg.Vector.html?highlight=ml%20linalg%20vector#pyspark.ml.linalg.Vector) (see method `.Sparse()`) and [SparseVector in RDD API ](https://spark.apache.org/docs/3.3.1/api/python/reference/api/pyspark.mllib.linalg.SparseVector.html?highlight=sparsevector#pyspark.mllib.linalg.SparseVector) or [SparseVector in DataFrame API ](https://spark.apache.org/docs/3.3.1/api/python/reference/api/pyspark.ml.linalg.SparseVector.html?highlight=sparsevector#pyspark.ml.linalg.SparseVector). The official example is below
34+
35+
```python
36+
import numpy as np
37+
from pyspark.mllib.linalg import Vectors
38+
39+
dv1 = np.array([1.0, 0.0, 3.0]) # Use a NumPy array as a dense vector.
40+
dv2 = [1.0, 0.0, 3.0] # Use a Python list as a dense vector.
41+
sv1 = Vectors.sparse(3, [0, 2], [1.0, 3.0]) # Create a SparseVector.
42+
```
43+
44+
Note the vector created by `Vectors.sparse()` is of type `SparseVector()`
45+
46+
```python
47+
sv1
48+
# SparseVector(3, {0: 1.0, 2: 3.0})
49+
```
50+
51+
To view the sparse vector in a dense format
52+
53+
```python
54+
sv1.toArray()
55+
# array([1., 0., 3.])
56+
```
57+
58+
### [Labeled point](https://spark.apache.org/docs/3.3.1/mllib-data-types.html#labeled-point)
59+
60+
> A labeled point is a local vector, either dense or sparse, associated with a label/response. In MLlib, labeled points are used in supervised learning algorithms. We use a double to store a label, so we can use labeled points in both regression and classification. For binary classification, a label should be either 0 (negative) or 1 (positive). For multiclass classification, labels should be class indices starting from zero: 0, 1, 2, ....
61+
62+
See [LabeledPoint API in MLlib](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.mllib.regression.LabeledPoint.html?highlight=labeledpoint#pyspark.mllib.regression.LabeledPoint). Now, we create a labeled point with a positive label and a dense feature vector, as well as a labeled point with a negative label and a sparse feature vector.
63+
64+
```python
65+
from pyspark.mllib.linalg import SparseVector
66+
from pyspark.mllib.regression import LabeledPoint
67+
68+
pos = LabeledPoint(1.0, [1.0, 0.0, 3.0])
69+
neg = LabeledPoint(0.0, SparseVector(3, [0, 2], [1.0, 3.0]))
70+
71+
neg
72+
# LabeledPoint(0.0, (3,[0,2],[1.0,3.0]))
73+
neg.label
74+
# 0.0
75+
neg.features
76+
# SparseVector(3, {0: 1.0, 2: 3.0})
77+
```
78+
79+
Now view the features as dense vector (rather than sparse vector)
80+
81+
```python
82+
neg.features.toArray()
83+
# array([1., 0., 3.])
84+
```
85+
86+
### [Local matrix](https://spark.apache.org/docs/3.3.1/mllib-data-types.html#local-matrix)
87+
88+
> A local matrix has integer-typed row and column indices and double-typed values, stored on a single machine. MLlib supports dense matrices, whose entry values are stored in a single double array in column-major order, and sparse matrices, whose non-zero entry values are stored in the Compressed Sparse Column (CSC) format in column-major order. For example, we create a dense matrix ((1.0, 2.0), (3.0, 4.0), (5.0, 6.0)) and a sparse matrix ((9.0, 0.0), (0.0, 8.0), (0.0, 6.0)) in the following:
89+
90+
```python
91+
from pyspark.mllib.linalg import Matrix, Matrices
92+
93+
dm2 = Matrices.dense(3, 2, [1, 3, 5, 2, 4, 6])
94+
sm = Matrices.sparse(3, 2, [0, 1, 3], [0, 2, 1], [9, 6, 8])
95+
print(dm2)
96+
# DenseMatrix([[1., 2.],
97+
# [3., 4.],
98+
# [5., 6.]])
99+
print(sm)
100+
# 3 X 2 CSCMatrix
101+
# (0,0) 9.0
102+
# (2,1) 6.0
103+
# (1,1) 8.0
104+
```
105+
106+
See [Scala API for Matrices.sparse](https://spark.apache.org/docs/3.3.1/api/scala/org/apache/spark/mllib/linalg/Matrices$.html) and from its [source code](https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala), we can see it creates a CSC [SparseMatrix](https://spark.apache.org/docs/3.3.1/api/scala/org/apache/spark/mllib/linalg/SparseMatrix.html).
107+
108+
Here the [compressed sparse column (CSC or CCS) format](https://en.wikipedia.org/wiki/Sparse_matrix#Compressed_sparse_column_(CSC_or_CCS)) is used for sparse matrix representation. You can learn it from this [simple explanation](https://stackoverflow.com/questions/44825193/how-to-create-a-sparse-cscmatrix-using-spark?answertab=votes#tab-top). To learn more about CSC, you may refer to a [top video](https://www.youtube.com/watch?v=fy_dSZb-Xx8) and a [top post with animation](https://matteding.github.io/2019/04/25/sparse-matrices/#compressed-sparse-matrices).
109+
> values are read first by column, a row index is stored for each value, and column pointers are stored. For example, CSC is (val, row_ind, col_ptr), where val is an array of the (top-to-bottom, then left-to-right) non-zero values of the matrix; row_ind is the row indices corresponding to the values; and, col_ptr is the list of val indexes where each column starts.
110+
111+
112+
```python
113+
dsm=sm.toDense()
114+
print(dsm)
115+
# DenseMatrix([[9., 0.],
116+
# [0., 8.],
117+
# [0., 6.]])
118+
```
119+
120+
### [Distributed matrix](https://spark.apache.org/docs/3.3.1/mllib-data-types.html#distributed-matrix)
121+
122+
> A distributed matrix has long-typed row and column indices and double-typed values, stored distributively in one or more RDDs. It is very important to choose the right format to store large and distributed matrices. Converting a distributed matrix to a different format may require a global shuffle, which is quite expensive. Four types of distributed matrices have been implemented so far.
123+
124+
#### RowMatrix
125+
126+
> The basic type is called RowMatrix. A RowMatrix is a row-oriented distributed matrix without meaningful row indices, e.g., a collection of feature vectors. It is backed by an RDD of its rows, where each row is a local vector. We assume that the number of columns is not huge for a RowMatrix so that a single local vector can be reasonably communicated to the driver and can also be stored / operated on using a single node.
127+
> Since each row is represented by a local vector, the number of columns is limited by the integer range but it should be much smaller in practice.
128+
129+
Now we create an RDD of vectors `rows`, from which we create a RowMatrix `mat`.
130+
131+
```python
132+
from pyspark.mllib.linalg.distributed import RowMatrix
133+
134+
rows = sc.parallelize([[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]])
135+
mat = RowMatrix(rows)
136+
137+
m = mat.numRows() # Get its size: m=4, n=3
138+
n = mat.numCols()
139+
140+
rowsRDD = mat.rows # Get the rows as an RDD of vectors again.
141+
```
142+
143+
We can view the RowMatrix in a dense matrix format
144+
145+
```python
146+
rowsRDD.collect()
147+
# [DenseVector([1.0, 2.0, 3.0]), DenseVector([4.0, 5.0, 6.0]), DenseVector([7.0, 8.0, 9.0]), DenseVector([10.0, 11.0, 12.0])]
148+
```
149+
150+
## 2. PCA
151+
152+
[Principal component analysis](http://en.wikipedia.org/wiki/Principal_component_analysis) (PCA) is a statistical procedure that uses an orthogonal transformation to convert a set of observations of possibly correlated variables (entities each of which takes on various numerical values) into a set of values of linearly uncorrelated variables called **principal components (PCs)**. A PCA class trains a model to project vectors to a low-dimensional space using PCA and this is probably the most commonly used **dimensionality reduction** method.
153+
154+
### PCA in DataFrame-based API `pyspark.ml`
155+
156+
Check out the [API](https://spark.apache.org/docs/3.3.1/api/python/reference/api/pyspark.ml.feature.PCA.html?highlight=pyspark%20ml%20feature%20pca#pyspark.ml.feature.PCA). Check [`pyspark.ml.feature.PCAModel`](https://spark.apache.org/docs/3.3.1/api/python/reference/api/pyspark.ml.feature.PCAModel.html?highlight=pyspark%20ml%20feature%20pcamodel#pyspark.ml.feature.PCAModel) too to see what is available for the fitted model. Let us project three 5-dimensional feature vectors into 2-dimensional principal components.
157+
158+
```python
159+
from pyspark.ml.feature import PCA
160+
from pyspark.ml.linalg import Vectors
161+
162+
data = [(Vectors.sparse(5, [(1, 1.0), (3, 7.0)]),),
163+
(Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),),
164+
(Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)]
165+
df = spark.createDataFrame(data, ["features"])
166+
df.show()
167+
# +--------------------+
168+
# | features|
169+
# +--------------------+
170+
# | (5,[1,3],[1.0,7.0])|
171+
# |[2.0,0.0,3.0,4.0,...|
172+
# |[4.0,0.0,0.0,6.0,...|
173+
# +--------------------+
174+
175+
pca = PCA(k=2, inputCol="features", outputCol="pcaFeatures")
176+
model = pca.fit(df)
177+
178+
result = model.transform(df).select("pcaFeatures")
179+
result.show(truncate=False)
180+
# +----------------------------------------+
181+
# |pcaFeatures |
182+
# +----------------------------------------+
183+
# |[1.6485728230883807,-4.013282700516296] |
184+
# |[-4.645104331781534,-1.1167972663619026]|
185+
# |[-6.428880535676489,-5.337951427775355] |
186+
# +----------------------------------------+
187+
```
188+
189+
Check the explained variance in percentage
190+
191+
```python
192+
model.explainedVariance
193+
# DenseVector([0.7944, 0.2056])
194+
```
195+
196+
Take a look at the principal components Matrix. Each column is one principal component.
197+
198+
```python
199+
print(model.pc)
200+
# DenseMatrix([[-0.44859172, -0.28423808],
201+
# [ 0.13301986, -0.05621156],
202+
# [-0.12523156, 0.76362648],
203+
# [ 0.21650757, -0.56529588],
204+
# [-0.84765129, -0.11560341]])
205+
```
206+
207+
### PCA in RDD-based API `pyspark.mllib`
208+
209+
#### Eigendecomposition for PCA
210+
211+
`pyspark.mllib` supports PCA for **tall-and-skinny** (big $n$, small $d$) matrices stored in row-oriented format and any Vectors. We demonstrate how to compute principal components on a [<tt>RowMatrix</tt>](http://spark.apache.org/docs/3.3.1/mllib-data-types.html#rowmatrix) and use them to project the vectors into a low-dimensional space in the cell below.
212+
213+
```python
214+
from pyspark.mllib.linalg import Vectors
215+
from pyspark.mllib.linalg.distributed import RowMatrix
216+
217+
rows = sc.parallelize([
218+
Vectors.sparse(5, {1: 1.0, 3: 7.0}),
219+
Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
220+
Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
221+
])
222+
rows.collect()
223+
# [SparseVector(5, {1: 1.0, 3: 7.0}), DenseVector([2.0, 0.0, 3.0, 4.0, 5.0]), DenseVector([4.0, 0.0, 0.0, 6.0, 7.0])]
224+
225+
mat = RowMatrix(rows)
226+
```
227+
228+
Compute the top 2 principal components, which are stored in a local dense matrix (the same as above).
229+
230+
```python
231+
pc = mat.computePrincipalComponents(2)
232+
print(pc)
233+
# DenseMatrix([[-0.44859172, -0.28423808],
234+
# [ 0.13301986, -0.05621156],
235+
# [-0.12523156, 0.76362648],
236+
# [ 0.21650757, -0.56529588],
237+
# [-0.84765129, -0.11560341]])
238+
```
239+
240+
Project the rows to the linear space spanned by the top 2 principal components (the same as above)
241+
242+
```python
243+
projected = mat.multiply(pc)
244+
projected.rows.collect()
245+
# [DenseVector([1.6486, -4.0133]), DenseVector([-4.6451, -1.1168]), DenseVector([-6.4289, -5.338])]
246+
```
247+
248+
Now we convert to dense rows to see the matrix
249+
250+
```python
251+
from pyspark.mllib.linalg import DenseVector
252+
253+
denseRows = rows.map(lambda vector: DenseVector(vector.toArray()))
254+
denseRows.collect()
255+
# [DenseVector([0.0, 1.0, 0.0, 7.0, 0.0]), DenseVector([2.0, 0.0, 3.0, 4.0, 5.0]), DenseVector([4.0, 0.0, 0.0, 6.0, 7.0])]
256+
```
257+
258+
#### SVD for PCA - more *scalable* way to do PCA
259+
260+
Read [SVD in RDD-based API `pyspark.mllib`](https://spark.apache.org/docs/3.3.1/mllib-dimensionality-reduction.html#singular-value-decomposition-svd). As covered in the lecture, we will need SVD for PCA on large-scale data. Here, we use it on the same small toy example to examine the relationship with eigenvalue decomposition based PCA methods above.
261+
262+
We compute the top 2 singular values and corresponding singular vectors.
263+
264+
```python
265+
svd = mat.computeSVD(2, computeU=True)
266+
U = svd.U # The U factor is a RowMatrix.
267+
s = svd.s # The singular values are stored in a local dense vector.
268+
V = svd.V # The V factor is a local dense matrix.
269+
```
270+
271+
If we are doing it right, the **right** singular vectors should be the same as the eigenvectors.
272+
273+
```python
274+
print(V)
275+
# DenseMatrix([[-0.31278534, 0.31167136],
276+
# [-0.02980145, -0.17133211],
277+
# [-0.12207248, 0.15256471],
278+
# [-0.71847899, -0.68096285],
279+
# [-0.60841059, 0.62170723]])
280+
```
281+
282+
But it is **not the same**! Why? Remeber that we need to do **centering**! We can do so use the [StandardScaler (check out the API](https://spark.apache.org/docs/3.3.1/mllib-feature-extraction.html#standardscaler)) to center the data, i.e., remove the mean.
283+
284+
```python
285+
from pyspark.mllib.feature import StandardScaler
286+
287+
standardizer = StandardScaler(True, False)
288+
model = standardizer.fit(rows)
289+
centeredRows = model.transform(rows)
290+
centeredRows.collect()
291+
# [DenseVector([-2.0, 0.6667, -1.0, 1.3333, -4.0]), DenseVector([0.0, -0.3333, 2.0, -1.6667, 1.0]), DenseVector([2.0, -0.3333, -1.0, 0.3333, 3.0])]
292+
centeredmat = RowMatrix(centeredRows)
293+
```
294+
295+
Compute the top 2 singular values and corresponding singular vectors.
296+
297+
```python
298+
svd = centeredmat.computeSVD(2, computeU=True)
299+
U = svd.U # The U factor is a RowMatrix.
300+
s = svd.s # The singular values are stored in a local dense vector.
301+
V = svd.V # The V factor is a local dense matrix.
302+
```
303+
304+
Check the **PC** obtained this time (it is the same as the above PCA methods now)
305+
306+
```python
307+
print(V)
308+
DenseMatrix([[-0.44859172, -0.28423808],
309+
[ 0.13301986, -0.05621156],
310+
[-0.12523156, 0.76362648],
311+
[ 0.21650757, -0.56529588],
312+
[-0.84765129, -0.11560341]])
313+
```
314+
315+
Let us examine the relationships between the singular values and the eigenvalues.
316+
317+
```python
318+
print(s)
319+
# [6.001041088520536,3.0530049438580336]
320+
```
321+
322+
We get the eigenvalues by taking squares of the singular values
323+
324+
```python
325+
evs=s*s
326+
print(evs)
327+
[36.012494146111734,9.320839187221594]
328+
```
329+
330+
Now we compute the percentage of variance captures and compare with the above to verify (see/search `model.explainedVariance`).
331+
332+
```python
333+
evs/sum(evs)
334+
# DenseVector([0.7944, 0.2056])
335+
```
336+
337+
## 3. Exercises
338+
339+
### PCA on iris
340+
341+
Study the [Iris flower data set](https://en.wikipedia.org/wiki/Iris_flower_data_set) `iris.csv` under `Data` with PCA.
342+
343+
1. Follow [Understanding Dimension Reduction with Principal Component Analysis (PCA)](https://blog.paperspace.com/dimension-reduction-with-principal-component-analysis/) to do the same analysis using the DataFrame-based PCA `pca.fit()` from `pyspark.ml`.
344+
2. Follow this lab to verify that using the other two RDD-based PCA APIs `computePrincipalComponents` and `computeSVD` will give the same PCA features.
345+
346+
## 4. Additional ideas to explore (*optional*)
347+
348+
### [HR analytics](https://rstudio-pubs-static.s3.amazonaws.com/345463_37f54d1c948b4cdfa181541841e0db8a.html)
349+
350+
A company is trying to figure out why their best and experienced employees are leaving prematurely from a [dataset](https://www.kaggle.com/liujiaqi/hr-comma-sepcsv). Follow the example [Principal Component Analysis explained on Kaggle](https://www.kaggle.com/nirajvermafcb/principal-component-analysis-explained) to perform such analysis in PySpark, using as many PySpark APIs as possible.
351+
352+
353+
### Word meaning extraction
354+
355+
Use PySpark to perform the steps in IBM's notebook on [Spark-based machine learning for word meanings](https://github.com/IBMDataScience/word2vec/blob/master/Spark-based%20machine%20learning%20for%20word%20meanings.ipynb) that makes use of PCA, kmeans, and Word2Vec to learn word meanings.
356+
357+
### Bag of words analysis
358+
359+
Choose a [Bag of Words Data Set](https://archive.ics.uci.edu/ml/datasets/Bag+of+Words). Let us take the **NIPS full papers** data as an example.
360+
361+
The format of this data is
362+
363+
```markdown
364+
Number of documents
365+
Number of words in the vocabulary
366+
Total number of words in the collection
367+
docID wordID count
368+
docID wordID count
369+
...
370+
docID wordID count
371+
```
372+
373+
Our data matrix will be $doc \times wordcount$. To begin, we need to read this data in. Possible steps would include:
374+
375+
1. extract the number of documents and the size of the vocabulary, and strip off the first 3 lines
376+
2. combine the words per document
377+
3. create sparse vectors (for better space efficiency)
378+
379+
Start from a small dataset to test your work, and then checking **whether** your work scales up to the big **NYTIMES** bagofwords data. Keep everything as parallel as possible.
380+
381+
### Large image datasets
382+
383+
Find some large-scale image datasets to examine the principal components and explore low-dimensional representations.

0 commit comments

Comments
 (0)