Spark tutorial¶

Install Spark¶

  1. Install Java
sudo apt install default-jdk
java -version
  1. Download Apache Spark and unpack it.
sudo mkdir /opt/spark/
sudo mv spark-3.3.2-bin-hadoop3/ /opt/spark/
  1. Install PySpark
pip install pyspark
  1. Set Spark environment (append the following two lines to ~/.bashrc)
vi ~/.bashrc
export SPARK_HOME=/opt/spark/spark-3.1.1-bin-hadoop2.7
export PATH=$SPARK_HOME/bin:$PATH
source ~/.bashrc

Spark & Jupyter notebook¶

To set up Spark in Jupyter notebook, do the following:

  1. add the following lines into ~/.bashrc
    • local access
    export PYSPARK_DRIVER_PYTHON=jupyter
    export PYSPARK_DRIVER_PYTHON_OPTS="notebook"
  • remote access
    export PYSPARK_DRIVER_PYTHON=jupyter
    export PYSPARK_DRIVER_PYTHON_OPTS="notebook --no-browser --port=<port> --ip='*'"
  • Windows subsystem for Linux
    export PYSPARK_DRIVER_PYTHON=jupyter
    export PYSPARK_DRIVER_PYTHON_OPTS="notebook --no-browser"

Don't forget to run source ~/.bashrc at the end.

  1. run from terminal:
pyspark

Note that remote access to jupyter notebook requires a tunnel. On Windows machines, you can use Putty to set it up. In Linux environments, the following command can be used:

ssh -N -L localhost:<port>:localhost:<local_port> <user>

Finally, you can run the notebook in your browser:

http://localhost:<local_port>
In [1]:
import random
import re

The following code helps to suppress unnecessary warnings caused by the new Jupyter notebook version.

In [2]:
%%html
<style>
    div.output_stderr {
    display: none;
}
</style>

PySpark Python API¶

PySpark can be used from standalone Python scripts by creating a SparkContext. You can set configuration properties by passing a SparkConf object to SparkContext.

Documentation: pyspark package

In [3]:
from pyspark import SparkContext, SparkConf
In [4]:
# cannot run multiple SparkContexts at once (so stop one just in case)
sc = SparkContext.getOrCreate()
sc.stop()
In [5]:
# spark conf
conf = SparkConf()
In [6]:
# create a Spark context
sc = SparkContext(conf=conf)

RDD - Resilient Distributed Datasets¶

resilient:

  • (of a person or animal) able to withstand or recover quickly from difficult conditions
  • (of a substance or object) able to recoil or spring back into shape after bending, stretching, or being compressed

Spark is RDD-centric!

  • RDDs are immutable
  • RDDs are computed lazily
  • RDDs can be cached
  • RDDs know who their parents are
  • RDDs that contain only tuples of two elements are “pair RDDs”

RDD Actions¶

RDD - Resilient Distributed Datasets

Some useful actions:

  • take(n) – return the first n elements in the RDD as an array.
  • collect() – return all elements of the RDD as an array. Use with caution.
  • count() – return the number of elements in the RDD as an int.
  • saveAsTextFile(‘path/to/dir’) – save the RDD to files in a directory. Will create the directory if it doesn’t exist and will fail if it does.
  • foreach(func) – execute the function against every element in the RDD, but don’t keep any results.

Demo files¶

file1.txt:
    Apple,Amy
    Butter,Bob
    Cheese,Chucky
    Dinkel,Dieter
    Egg,Edward
    Oxtail,Oscar
    Anchovie,Alex
    Avocado,Adam
    Apple,Alex
    Apple,Adam
    Dinkel,Dieter
    Doughboy,Pilsbury
    McDonald,Ronald

file2.txt:
    Wendy,
    Doughboy,Pillsbury
    McDonald,Ronald
    Cheese,Chucky
In [7]:
# input files
file1 = 'file1.txt'
file2 = 'file2.txt'
In [8]:
# load data
data1 = sc.textFile(file1)
data2 = sc.textFile(file2)
In [9]:
data1.collect()
Out[9]:
['Apple,Amy',
 'Butter,Bob',
 'Cheese,Chucky',
 'Dinkel,Dieter',
 'Egg,Edward',
 'Oxtail,Oscar',
 'Anchovie,Alex',
 'Avocado,Adam',
 'Apple,Alex',
 'Apple,Adam',
 'Dinkel,Dieter',
 'Doughboy,Pilsbury',
 'McDonald,Ronald']
In [10]:
print("file1: %d lines" % data1.count())
file1: 13 lines
In [11]:
data1.take(3)
Out[11]:
['Apple,Amy', 'Butter,Bob', 'Cheese,Chucky']
In [12]:
data2.collect()
Out[12]:
['Wendy,', 'Doughboy,Pillsbury', 'McDonald,Ronald', 'Cheese,Chucky']
In [13]:
print("file2: %d lines" % data2.count())
file2: 4 lines
In [14]:
data2.take(3)
Out[14]:
['Wendy,', 'Doughboy,Pillsbury', 'McDonald,Ronald']

Note: the following produces output on Jupyter notebook server!

In [15]:
# prints each element in the Jupyter notebook output
data2.foreach(print)
Cheese,Chucky
Wendy,
Doughboy,Pillsbury
McDonald,Ronald

RDD Operations¶

map()¶

Return a new RDD by applying a function to each element of this RDD.

  • apply an operation to every element of an RDD
  • return a new RDD that contains the results
In [16]:
data = sc.textFile(file1)
In [17]:
data
Out[17]:
file1.txt MapPartitionsRDD[10] at textFile at NativeMethodAccessorImpl.java:0
In [18]:
data.take(3)
Out[18]:
['Apple,Amy', 'Butter,Bob', 'Cheese,Chucky']
In [19]:
data.map(lambda line: line.split(',')).take(3)
Out[19]:
[['Apple', 'Amy'], ['Butter', 'Bob'], ['Cheese', 'Chucky']]

flatMap()¶

Return a new RDD by first applying a function to all elements of this RDD, and then flattening the results.

  • apply an operation to the value of every element of an RDD
  • return a new RDD that contains the results after dropping the outermost container
In [20]:
data = sc.textFile(file1)
In [21]:
data.take(4)
Out[21]:
['Apple,Amy', 'Butter,Bob', 'Cheese,Chucky', 'Dinkel,Dieter']
In [22]:
data.flatMap(lambda line: line.split(',')).take(7)
Out[22]:
['Apple', 'Amy', 'Butter', 'Bob', 'Cheese', 'Chucky', 'Dinkel']

mapValues()¶

Pass each value in the key-value pair RDD through a map function without changing the keys; this also retains the original RDD's partitioning.

  • apply an operation to the value of every element of an RDD
  • return a new RDD that contains the results

Only works with pair RDDs.

In [23]:
data = sc.textFile(file1)
In [24]:
data = data.map(lambda line: line.split(','))
In [25]:
data.take(3)
Out[25]:
[['Apple', 'Amy'], ['Butter', 'Bob'], ['Cheese', 'Chucky']]
In [26]:
data.collect()
Out[26]:
[['Apple', 'Amy'],
 ['Butter', 'Bob'],
 ['Cheese', 'Chucky'],
 ['Dinkel', 'Dieter'],
 ['Egg', 'Edward'],
 ['Oxtail', 'Oscar'],
 ['Anchovie', 'Alex'],
 ['Avocado', 'Adam'],
 ['Apple', 'Alex'],
 ['Apple', 'Adam'],
 ['Dinkel', 'Dieter'],
 ['Doughboy', 'Pilsbury'],
 ['McDonald', 'Ronald']]
In [27]:
data = data.map(lambda pair: (pair[0], pair[1]))
In [28]:
data.take(3)
Out[28]:
[('Apple', 'Amy'), ('Butter', 'Bob'), ('Cheese', 'Chucky')]
In [29]:
data.mapValues(lambda name: name.lower()).take(3)
Out[29]:
[('Apple', 'amy'), ('Butter', 'bob'), ('Cheese', 'chucky')]

flatMapValues()¶

Pass each value in the key-value pair RDD through a flatMap function without changing the keys; this also retains the original RDD's partitioning.

  • apply an operation to the value of every element of an RDD
  • return a new RDD that contains the results after removing the outermost container

Only works with pair RDDs.

In [30]:
data = sc.textFile(file1)
In [31]:
data = data.map(lambda line: line.split(','))
In [32]:
data = data.map(lambda pair: (pair[0], pair[1]))
In [33]:
data.take(3)
Out[33]:
[('Apple', 'Amy'), ('Butter', 'Bob'), ('Cheese', 'Chucky')]
In [34]:
data.flatMapValues(lambda name: name.lower()).take(9)
Out[34]:
[('Apple', 'a'),
 ('Apple', 'm'),
 ('Apple', 'y'),
 ('Butter', 'b'),
 ('Butter', 'o'),
 ('Butter', 'b'),
 ('Cheese', 'c'),
 ('Cheese', 'h'),
 ('Cheese', 'u')]

filter()¶

Return a new RDD containing only the elements that satisfy a predicate.

  • return a new RDD that contains only the elements that pass a filter operation
In [35]:
data = sc.textFile(file1)
In [36]:
data.take(3)
Out[36]:
['Apple,Amy', 'Butter,Bob', 'Cheese,Chucky']
In [37]:
data.filter(lambda line: re.match(r'^[AEIOU]', line)).take(3)
Out[37]:
['Apple,Amy', 'Egg,Edward', 'Oxtail,Oscar']
In [38]:
data.filter(lambda line: re.match(r'^[AEIOU]', line)).collect()
Out[38]:
['Apple,Amy',
 'Egg,Edward',
 'Oxtail,Oscar',
 'Anchovie,Alex',
 'Avocado,Adam',
 'Apple,Alex',
 'Apple,Adam']
In [39]:
data.filter(lambda line: re.match(r'.+[y]$', line)).take(3)
Out[39]:
['Apple,Amy', 'Cheese,Chucky', 'Doughboy,Pilsbury']
In [40]:
data.filter(lambda line: re.search(r'[x]$', line)).take(3)
Out[40]:
['Anchovie,Alex', 'Apple,Alex']

groupByKey()¶

Group the values for each key in the RDD into a single sequence. Hash-partitions the resulting RDD with numPartitions partitions.

  • apply an operation to the value of every element of an RDD
  • return a new RDD that contains the results after removing the outermost container

Only works with pair RDDs.

In [41]:
data = sc.textFile(file1)
In [42]:
data = data.map(lambda line: line.split(','))
In [43]:
data.take(3)
Out[43]:
[['Apple', 'Amy'], ['Butter', 'Bob'], ['Cheese', 'Chucky']]
In [44]:
data = data.map(lambda pair: (pair[0], pair[1]))
In [45]:
data.take(3)
Out[45]:
[('Apple', 'Amy'), ('Butter', 'Bob'), ('Cheese', 'Chucky')]
In [46]:
data.groupByKey().take(1)
Out[46]:
[('Apple', <pyspark.resultiterable.ResultIterable at 0x7f2ba8087b50>)]
In [47]:
for pair in data.groupByKey().take(1):
    print("%s: %s" % (pair[0], ",".join([n for n in pair[1]])))
Apple: Amy,Alex,Adam

reduceByKey()¶

Merge the values for each key using an associative and commutative reduce function. This will also perform the merging locally on each mapper before sending results to a reducer, similarly to a “combiner” in MapReduce.

  • combine elements of an RDD by key and then
  • apply a reduce operation to pairs of keys
  • until only a single key remains.
  • return the result in a new RDD
In [48]:
data = sc.textFile(file1)
In [49]:
data = data.map(lambda line: line.split(","))
In [50]:
data = data.map(lambda pair: (pair[0], pair[1]))
In [51]:
data.take(3)
Out[51]:
[('Apple', 'Amy'), ('Butter', 'Bob'), ('Cheese', 'Chucky')]
In [52]:
data.reduceByKey(lambda v1, v2: v1 + ":" + v2).take(6)
Out[52]:
[('Apple', 'Amy:Alex:Adam'),
 ('Butter', 'Bob'),
 ('Dinkel', 'Dieter:Dieter'),
 ('Doughboy', 'Pilsbury'),
 ('Cheese', 'Chucky'),
 ('Egg', 'Edward')]

sortBy()¶

Sorts this RDD by the given keyfunc.

  • sort an RDD according to a sorting function
  • return the results in a new RDD
In [53]:
data = sc.textFile(file1)
In [54]:
data = data.map(lambda line: line.split(","))
In [55]:
data = data.map(lambda pair: (pair[0], pair[1]))
In [56]:
data.collect()
Out[56]:
[('Apple', 'Amy'),
 ('Butter', 'Bob'),
 ('Cheese', 'Chucky'),
 ('Dinkel', 'Dieter'),
 ('Egg', 'Edward'),
 ('Oxtail', 'Oscar'),
 ('Anchovie', 'Alex'),
 ('Avocado', 'Adam'),
 ('Apple', 'Alex'),
 ('Apple', 'Adam'),
 ('Dinkel', 'Dieter'),
 ('Doughboy', 'Pilsbury'),
 ('McDonald', 'Ronald')]
In [57]:
data.sortBy(lambda pair: pair[1][1]).take(10)
Out[57]:
[('Egg', 'Edward'),
 ('Avocado', 'Adam'),
 ('Apple', 'Adam'),
 ('Cheese', 'Chucky'),
 ('Dinkel', 'Dieter'),
 ('Dinkel', 'Dieter'),
 ('Doughboy', 'Pilsbury'),
 ('Anchovie', 'Alex'),
 ('Apple', 'Alex'),
 ('Apple', 'Amy')]

sortByKey()¶

Sorts this RDD, which is assumed to consist of (key, value) pairs.

  • sort an RDD according to the natural ordering of the keys
  • return the results in a new RDD
In [58]:
data = sc.textFile(file1)
In [59]:
data = data.map(lambda line: line.split(","))
In [60]:
data = data.map(lambda pair: (pair[0], pair[1]))
In [61]:
data.collect()
Out[61]:
[('Apple', 'Amy'),
 ('Butter', 'Bob'),
 ('Cheese', 'Chucky'),
 ('Dinkel', 'Dieter'),
 ('Egg', 'Edward'),
 ('Oxtail', 'Oscar'),
 ('Anchovie', 'Alex'),
 ('Avocado', 'Adam'),
 ('Apple', 'Alex'),
 ('Apple', 'Adam'),
 ('Dinkel', 'Dieter'),
 ('Doughboy', 'Pilsbury'),
 ('McDonald', 'Ronald')]
In [62]:
data.sortByKey().take(6)
Out[62]:
[('Anchovie', 'Alex'),
 ('Apple', 'Amy'),
 ('Apple', 'Alex'),
 ('Apple', 'Adam'),
 ('Avocado', 'Adam'),
 ('Butter', 'Bob')]

subtract()¶

Return each value in self that is not contained in other.

  • return a new RDD that contains all the elements from the original RDD
  • that do not appear in a target RDD
In [63]:
data1 = sc.textFile(file1)
In [64]:
data1.collect()
Out[64]:
['Apple,Amy',
 'Butter,Bob',
 'Cheese,Chucky',
 'Dinkel,Dieter',
 'Egg,Edward',
 'Oxtail,Oscar',
 'Anchovie,Alex',
 'Avocado,Adam',
 'Apple,Alex',
 'Apple,Adam',
 'Dinkel,Dieter',
 'Doughboy,Pilsbury',
 'McDonald,Ronald']
In [65]:
data1.count()
Out[65]:
13
In [66]:
data2 = sc.textFile(file2)
In [67]:
data2.collect()
Out[67]:
['Wendy,', 'Doughboy,Pillsbury', 'McDonald,Ronald', 'Cheese,Chucky']
In [68]:
data2.count()
Out[68]:
4
In [69]:
data1.subtract(data2).collect()
Out[69]:
['Egg,Edward',
 'Doughboy,Pilsbury',
 'Oxtail,Oscar',
 'Apple,Alex',
 'Apple,Amy',
 'Butter,Bob',
 'Anchovie,Alex',
 'Avocado,Adam',
 'Dinkel,Dieter',
 'Dinkel,Dieter',
 'Apple,Adam']
In [70]:
data1.subtract(data2).count()
Out[70]:
11

join()¶

Return an RDD containing all pairs of elements with matching keys in self and other. Each pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in self and (k, v2) is in other.

  • return a new RDD that contains all the elements from the original RDD
  • joined (inner join) with elements from the target RDD
In [71]:
data1 = sc.textFile(file1).map(lambda line: line.split(',')).map(lambda pair: (pair[0], pair[1]))
In [72]:
data1.collect()
Out[72]:
[('Apple', 'Amy'),
 ('Butter', 'Bob'),
 ('Cheese', 'Chucky'),
 ('Dinkel', 'Dieter'),
 ('Egg', 'Edward'),
 ('Oxtail', 'Oscar'),
 ('Anchovie', 'Alex'),
 ('Avocado', 'Adam'),
 ('Apple', 'Alex'),
 ('Apple', 'Adam'),
 ('Dinkel', 'Dieter'),
 ('Doughboy', 'Pilsbury'),
 ('McDonald', 'Ronald')]
In [73]:
data1.count()
Out[73]:
13
In [74]:
data2 = sc.textFile(file2).map(lambda line: line.split(',')).map(lambda pair: (pair[0], pair[1]))
In [75]:
data2.collect()
Out[75]:
[('Wendy', ''),
 ('Doughboy', 'Pillsbury'),
 ('McDonald', 'Ronald'),
 ('Cheese', 'Chucky')]
In [76]:
data2.count()
Out[76]:
4
In [77]:
data1.join(data2).collect()
Out[77]:
[('Doughboy', ('Pilsbury', 'Pillsbury')),
 ('McDonald', ('Ronald', 'Ronald')),
 ('Cheese', ('Chucky', 'Chucky'))]
In [78]:
data1.join(data2).count()
Out[78]:
3
In [79]:
data1.fullOuterJoin(data2).take(2)
Out[79]:
[('Dinkel', ('Dieter', None)), ('Dinkel', ('Dieter', None))]
In [80]:
# stop Spark context
sc.stop()

MapReduce demo¶

We will now count the occurences of each word. The typical "Hello, world!" app for Spark applications is known as word count. The map/reduce model is particularly well suited to applications like counting words in a document.

In [81]:
# create a Spark context
sc = SparkContext(conf=conf)
In [82]:
# read the target file into an RDD
lines = sc.textFile(file1)
lines.take(3)
Out[82]:
['Apple,Amy', 'Butter,Bob', 'Cheese,Chucky']

The flatMap() operation first converts each line into an array of words, and then makes each of the words an element in the new RDD.

In [83]:
# split the lines into individual words
words = lines.flatMap(lambda l: re.split(r'[^\w]+', l))
words.take(3)
Out[83]:
['Apple', 'Amy', 'Butter']

The map() operation replaces each word with a tuple of that word and the number 1. The pairs RDD is a pair RDD where the word is the key, and all of the values are the number 1.

In [84]:
# replace each word with a tuple of that word and the number 1
pairs = words.map(lambda w: (w, 1))
pairs.take(3)
Out[84]:
[('Apple', 1), ('Amy', 1), ('Butter', 1)]

The reduceByKey() operation keeps adding elements' values together until there are no more to add for each key (word).

In [85]:
# group the elements of the RDD by key (word) and add up their values
counts = pairs.reduceByKey(lambda n1, n2: n1 + n2)
counts.take(3)
Out[85]:
[('Apple', 3), ('Amy', 1), ('Butter', 1)]
In [86]:
# sort the elements by values in descending order
counts.sortBy(lambda pair: pair[1], ascending=False).take(10)
Out[86]:
[('Apple', 3),
 ('Dinkel', 2),
 ('Alex', 2),
 ('Dieter', 2),
 ('Adam', 2),
 ('Amy', 1),
 ('Butter', 1),
 ('Chucky', 1),
 ('Edward', 1),
 ('Doughboy', 1)]

Simplify chained transformations¶

It is good to know that the code above can also be written in the following way:

In [87]:
sorted_counts = (lines.flatMap(lambda l: re.split(r'[^\w]+', l))       # words
                      .map(lambda w: (w, 1))                           # pairs
                      .reduceByKey(lambda n1, n2: n1 + n2)             # counts
                      .sortBy(lambda pair: pair[1], ascending=False))  # sorted counts
In [88]:
sorted_counts.take(10)
Out[88]:
[('Apple', 3),
 ('Dinkel', 2),
 ('Alex', 2),
 ('Dieter', 2),
 ('Adam', 2),
 ('Amy', 1),
 ('Butter', 1),
 ('Chucky', 1),
 ('Edward', 1),
 ('Doughboy', 1)]
In [89]:
# stop Spark context
sc.stop()
In [ ]: