Apache Spark on Anaconda Python 3

© 2016 - Harald Schilly <harald@schil.ly> - CC BY-SA 4.0

Download

Configuration

  • Anaconda Python 3 + scientific Python stack
  • Apache Spark 1.6.0

~/.zshrc:

export SPARK_HOME="/opt/spark/current/"
export PYSPARK_SUBMIT_ARGS="--master local[2] pyspark-shell"
export PATH="$PATH":$SPARK_HOME/bin
export PYSPARK_PYTHON=/opt/anaconda/bin/python


$SPARK_HOME/conf/spark-env.sh:

PYSPARK_DRIVER_PYTHON=ipython
PYSPARK_DRIVER_PYTHON_OPTS=notebook

Start using: $ pyspark

http://localhost:4040

In [1]:
import sys
import os
import re
import numpy as np
import pandas as pd
from glob import glob
import operator

print(sys.version)

%matplotlib inline
import matplotlib.pyplot as plt
import seaborn; seaborn.set() # "talk")
3.4.3 |Anaconda 2.3.0 (64-bit)| (default, Oct 19 2015, 21:52:17) 
[GCC 4.4.7 20120313 (Red Hat 4.4.7-1)]

Spark Kontext

In [2]:
sc, sqlContext
Out[2]:
(<pyspark.context.SparkContext at 0x7f59a0432080>,
 <pyspark.sql.context.HiveContext at 0x7f59818e1c18>)
In [3]:
"Apache Spark Version %s" % sc.version
Out[3]:
'Apache Spark Version 1.6.0'
In [4]:
xx = sc.range(-200, 300)
xx
Out[4]:
PythonRDD[1] at RDD at PythonRDD.scala:43
In [5]:
values_rdd = xx.map(lambda x : 2 * (x / 100)**2 - 10)
In [6]:
print(values_rdd.toDebugString().decode("utf8"))
(2) PythonRDD[2] at RDD at PythonRDD.scala:43 []
 |  ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:423 []
In [7]:
yy = values_rdd.collect()
In [8]:
plt.plot(xx.collect(), yy)
Out[8]:
[<matplotlib.lines.Line2D at 0x7f596cab8eb8>]

Broadcasting

In [9]:
def get_words(f):
    x = f.zipWithIndex().filter(lambda x : 
            "Ende dieses Projekt Gutenberg Etextes" in x[0] or 
            "End of the Project Gutenberg EBook of Faust" in x[0]).first()[1]
    length = sc.broadcast(x)
    lines = f.zipWithIndex().filter(lambda x : x[1] > 40 and x[1] < length.value)
    return lines.flatMap(lambda x : x[0].split())
In [10]:
words = sc.emptyRDD()
for fn in glob("*.txt"):
    f = sc.textFile(fn, minPartitions=10)
    words = words.union(get_words(f))
In [11]:
print(words.toDebugString().decode("utf8"))
(20) UnionRDD[22] at union at NativeMethodAccessorImpl.java:-2 []
 |   UnionRDD[13] at union at NativeMethodAccessorImpl.java:-2 []
 |   PythonRDD[11] at RDD at PythonRDD.scala:43 []
 |   EmptyRDD[3] at emptyRDD at NativeMethodAccessorImpl.java:-2 []
 |   PythonRDD[12] at RDD at PythonRDD.scala:43 []
 |   MapPartitionsRDD[5] at textFile at NativeMethodAccessorImpl.java:-2 []
 |   goethe-faust1.txt HadoopRDD[4] at textFile at NativeMethodAccessorImpl.java:-2 []
 |   PythonRDD[21] at RDD at PythonRDD.scala:43 []
 |   MapPartitionsRDD[15] at textFile at NativeMethodAccessorImpl.java:-2 []
 |   goethe-faust2.txt HadoopRDD[14] at textFile at NativeMethodAccessorImpl.java:-2 []
In [12]:
words.count()
Out[12]:
75971
In [13]:
# rough check, with header/footer:
!  cat *.txt | wc -w
82136

Accumulators

In [14]:
chars = sc.accumulator(0)
In [15]:
words.foreach(lambda w : chars.add(len(w)))
In [16]:
chars.value
Out[16]:
400674
In [17]:
# rough check, with header/footer:
!  cat *.txt | wc -c
574503

Map/Reduce

Alle nicht-Buchstaben herausfiltern.

In [18]:
import re
pat = re.compile(r'[^a-zA-ZöäüÖÄÜß ]+', re.IGNORECASE)
In [19]:
wordtuples = words.map(lambda w : (pat.sub('', w), 1))
wordcounts = wordtuples.reduceByKey(lambda c1, c2 : c1 + c2)
wordcounts.takeOrdered(30, key = lambda x : -x[1])
Out[19]:
[('und', 1264),
 ('ich', 1085),
 ('der', 1063),
 ('die', 1041),
 ('zu', 910),
 ('nicht', 874),
 ('Und', 822),
 ('sich', 732),
 ('ist', 667),
 ('ein', 626),
 ('das', 623),
 ('sie', 554),
 ('in', 542),
 ('es', 542),
 ('den', 498),
 ('Die', 496),
 ('du', 494),
 ('mich', 463),
 ('MEPHISTOPHELES', 442),
 ('mir', 437),
 ('so', 403),
 ('Ich', 396),
 ('er', 395),
 ('dem', 395),
 ('mit', 393),
 ('Das', 385),
 ('Der', 385),
 ('auf', 360),
 ('wie', 350),
 ('FAUST', 349)]
In [20]:
! cat *.txt | grep -c "MEPHISTOPHELES"
442

Kontrolle: Summe aller Wortlängen

In [21]:
from operator import add
wordcounts.map(lambda x : x[1]).reduce(add)
Out[21]:
75971

Längste Wörter

In [22]:
words.map(lambda w : (len(w), w)).top(10)
Out[22]:
[(28, 'Untätigkeits-Entschuldigung:'),
 (28, 'Fettbauch-Krummbein-Schelme.'),
 (25, 'Kalenderei--Chymisterei--'),
 (25, 'Einsiedlerisch-beschränkt'),
 (25, 'Dreinamig-Dreigestaltete,'),
 (23, 'Schneckeschnickeschnack'),
 (22, 'allerliebst-geselliger'),
 (22, 'Flügelflatterschlagen!'),
 (22, 'Bürger-Nahrungs-Graus,'),
 (21, 'heimlich-kätzchenhaft')]

Iterationen

In [23]:
x = 1000
words.cache()
Out[23]:
UnionRDD[22] at union at NativeMethodAccessorImpl.java:-2
In [56]:
words_indexed = words.zipWithIndex().cache()

for i in range(20):
    d = 1 / (2 + i)
    v = words_indexed\
        .filter(lambda e : e[1] < x)\
        .map(lambda w : len(w[0])).reduce(operator.add)
    if v > 3000:
        x -= d * x
    else:
        x += d * x
    print("x[{:2d}] = {:9.2f} → v = {:d}    (d = {:.2f})".format(i, x, v, d))
x[ 0] =    857.14 → v = 2998    (d = 0.50)
x[ 1] =    571.43 → v = 4463    (d = 0.33)
x[ 2] =    714.29 → v = 2998    (d = 0.25)
x[ 3] =    571.43 → v = 3729    (d = 0.20)
x[ 4] =    666.67 → v = 2998    (d = 0.17)
x[ 5] =    571.43 → v = 3501    (d = 0.14)
x[ 6] =    642.86 → v = 2998    (d = 0.12)
x[ 7] =    571.43 → v = 3386    (d = 0.11)
x[ 8] =    628.57 → v = 2998    (d = 0.10)
x[ 9] =    571.43 → v = 3319    (d = 0.09)
x[10] =    619.05 → v = 2998    (d = 0.08)
x[11] =    571.43 → v = 3280    (d = 0.08)
x[12] =    612.24 → v = 2998    (d = 0.07)
x[13] =    571.43 → v = 3234    (d = 0.07)
x[14] =    607.14 → v = 2998    (d = 0.06)
x[15] =    571.43 → v = 3208    (d = 0.06)
x[16] =    603.17 → v = 2998    (d = 0.06)
x[17] =    571.43 → v = 3174    (d = 0.05)
x[18] =    600.00 → v = 2998    (d = 0.05)
x[19] =    571.43 → v = 3149    (d = 0.05)

PageRank

In [25]:
link_data = [
    (0, 1),
    (0, 2),
    (0, 3),
    (1, 3),
    (1, 3),
    (2, 3),
    (3, 0),
    (4, 0),
    (2, 3),
    (4, 1)
]
In [26]:
links = sc.parallelize(link_data).distinct().groupByKey().cache()
In [27]:
# init rank data
ranks = links.map(lambda l: (l[0], 1.0))
In [28]:
from operator import add

def computeContribs(urls, rank):
    num_urls = len(urls)
    for url in urls:
        yield url, rank / num_urls

for iteration in range(10):
    # Calculates contribution of link to neighbour
    contribs = links.join(ranks)\
        .flatMap(lambda url_urls_rank: computeContribs(*url_urls_rank[1]))

    # Re-calculates URL ranks based on neighbor contributions.
    ranks = contribs.reduceByKey(add).mapValues(lambda rank: rank * 0.85 + 0.15)

# Collects all URL ranks and dump them to console.
for (link, rank) in sorted(ranks.collect()):
    print("%s has rank: %s." % (link, rank))
0 has rank: 1.4925426310288215.
1 has rank: 0.569011704714285.
2 has rank: 0.569011704714285.
3 has rank: 1.5663083638833293.

Histogram

In [29]:
import string
U = string.ascii_uppercase
In [30]:
chars = words.flatMap(lambda x : list(x)) \
    .map(lambda c : c.upper()) \
    .filter(lambda c : c in string.ascii_uppercase)
chars.cache()
print(chars.collect()[:100])
['F', 'A', 'U', 'S', 'T', 'D', 'E', 'R', 'T', 'R', 'A', 'G', 'D', 'I', 'E', 'E', 'R', 'S', 'T', 'E', 'R', 'T', 'E', 'I', 'L', 'J', 'O', 'H', 'A', 'N', 'N', 'W', 'O', 'L', 'F', 'G', 'A', 'N', 'G', 'V', 'O', 'N', 'G', 'O', 'E', 'T', 'H', 'E', 'Z', 'U', 'E', 'I', 'G', 'N', 'U', 'N', 'G', 'I', 'H', 'R', 'N', 'A', 'H', 'T', 'E', 'U', 'C', 'H', 'W', 'I', 'E', 'D', 'E', 'R', 'S', 'C', 'H', 'W', 'A', 'N', 'K', 'E', 'N', 'D', 'E', 'G', 'E', 'S', 'T', 'A', 'L', 'T', 'E', 'N', 'D', 'I', 'E', 'F', 'R', 'H']
In [31]:
char_freq = chars.countByKey()
char_freq
Out[31]:
defaultdict(int,
            {'A': 18156,
             'B': 6805,
             'C': 14955,
             'D': 17899,
             'E': 58860,
             'F': 6385,
             'G': 11132,
             'H': 24159,
             'I': 30167,
             'J': 712,
             'K': 4438,
             'L': 15773,
             'M': 11007,
             'N': 35768,
             'O': 8644,
             'P': 3231,
             'Q': 134,
             'R': 25718,
             'S': 25371,
             'T': 23848,
             'U': 14859,
             'V': 2613,
             'W': 7172,
             'X': 97,
             'Y': 199,
             'Z': 4197})
In [32]:
fig,ax = plt.subplots()
_ = ax.bar(range(len(U)), [char_freq[_] for _ in U])
_ = ax.set_xticks(np.arange(len(U)) + .35)
_ = ax.set_xticklabels(U)
plt.plot()
Out[32]:
[]

Lineare Regression

Zufällige Daten generieren …

In [33]:
data = np.c_[
    sorted(2 * np.random.rand(40)),
    np.r_[np.random.randn(20) + np.linspace(0, 3, 20),
          np.random.randn(20) + np.linspace(0, 2, 20) + 3]
]
plt.scatter(data[:,0], data[:,1])
Out[33]:
<matplotlib.collections.PathCollection at 0x7f596c0187b8>
In [34]:
from pyspark.mllib.regression import LabeledPoint, LinearRegressionWithSGD

Datenpunkte und dazugehöriger Featurevektor (exogen x → endogen y)

In [35]:
sparkdata = sc.parallelize(LabeledPoint(y, [x]) for x, y in data)
In [36]:
lm = LinearRegressionWithSGD.train(sparkdata)
lm
Out[36]:
(weights=[2.67100279627], intercept=0.0)
In [37]:
valuesAndPreds = sparkdata.map(lambda p: (p.label, lm.predict(p.features)))
MSE = valuesAndPreds.map(lambda v_p: (v_p[0] - v_p[1])**2).sum() / valuesAndPreds.count()
print("Mean Squared Error = " + str(MSE))
Mean Squared Error = 1.13026470359
In [38]:
xx = np.linspace(-1, 3, 100)
yy = [lm.predict([_]) for _ in xx]
In [39]:
fig, ax = plt.subplots()
_ = ax.scatter(data[:,0], data[:,1])
_ = ax.plot(xx, yy, color="green")
plt.show()

Isotonic Regression

In [40]:
from pyspark.mllib.regression import IsotonicRegression
In [41]:
sparkiso = sc.parallelize((y, x, 1) for x, y in data)
In [42]:
ir = IsotonicRegression.train(sparkiso)
In [43]:
valuesAndPreds2 = sparkdata.map(lambda p: (p.label, ir.predict(p.features)))
MSE = valuesAndPreds2.map(lambda v_p: (v_p[0] - v_p[1])**2).sum() / valuesAndPreds2.count()
print("Mean Squared Error = " + str(MSE))
Mean Squared Error = [ 0.74351952]
In [44]:
yy2 = [ir.predict([_]) for _ in xx]
In [45]:
fig, ax = plt.subplots()
_ = ax.scatter(data[:,0], data[:,1])
_ = ax.plot(xx, yy2, color="green")
plt.show()

DataFrames

In [46]:
from pyspark.sql import SQLContext, Row
sqlContext = SQLContext(sc)
In [47]:
entries = sc.textFile("elements.csv").map(lambda line : line.split(","))
print(', '.join(entries.first()))
Electronegativity,  Calculated Radius,  First Ionization,  Core Configuration,  Heat of Vapor,  Covalent Radius,  Heat of Fusion,  Bulk Modulus,  Boiling Point,  Brinell Hardness,  Melting Point,  Symbol,  STP Density,  Young Modulus,  Shear Modulus,  Vickers Hardness,  Name,  Common Ions,  Second Ionization,  Mass,  Van der Waals Radius,  Specific Heat,  Thermal Cond.,  Third Ionization,  Series,  Electron Affinity,  Atomic Number,  Mohs Hardness,  Empirical Radius

Converting second to last row into "Row"s

In [48]:
def try_float(x):
    try:
        return float(x)
    except:
        np.nan

elements = entries.zipWithIndex()\
    .filter(lambda x : x[1] >= 1).map(lambda x : x[0]) \
    .map(lambda e: Row(
        number = int(e[-3]),
        symbol=e[11],
        name=e[16],
        mass=try_float(e[19]),
        radius = try_float(e[-1])
    ))

Registering Schema

In [49]:
elementsDF = sqlContext.createDataFrame(elements)
elementsDF.registerTempTable("elements")

Inferring types

In [50]:
elementsDF.printSchema()
root
 |-- mass: double (nullable = true)
 |-- name: string (nullable = true)
 |-- number: long (nullable = true)
 |-- radius: double (nullable = true)
 |-- symbol: string (nullable = true)

Query DataFrame with SQL

In [51]:
heavy = sqlContext.sql("SELECT number, name, mass FROM elements WHERE mass >= 250 SORT BY number")
heavy.collect()
Out[51]:
[Row(number=6, name='Carbon', mass=2352.6),
 Row(number=22, name='Titanium', mass=1309.8),
 Row(number=24, name='Chromium', mass=1590.6),
 Row(number=26, name='Iron', mass=1561.9),
 Row(number=27, name='Cobalt', mass=1648.0),
 Row(number=29, name='Copper', mass=1957.9),
 Row(number=50, name='Tin', mass=1411.8),
 Row(number=58, name='Cerium', mass=1050.0),
 Row(number=79, name='Gold', mass=1980.0),
 Row(number=80, name='Mercury', mass=1810.0),
 Row(number=81, name='Thallium', mass=1971.0),
 Row(number=82, name='Lead', mass=1450.5)]
In [52]:
radii = sqlContext.sql("SELECT radius FROM elements SORT BY number")
plt.plot(radii.collect())
Out[52]:
[<matplotlib.lines.Line2D at 0x7f596c0c1390>]

Query DataFrame

In [53]:
elementsDF.show()
+----------+----------+------+------+------+
|      mass|      name|number|radius|symbol|
+----------+----------+------+------+------+
|   1.00794|  Hydrogen|     1|  25.0|     H|
|  4.002602|    Helium|     2|  null|    He|
|     6.941|   Lithium|     3| 145.0|    Li|
|  9.012182| Beryllium|     4| 105.0|    Be|
|    10.811|     Boron|     5|  85.0|     B|
|    2352.6|    Carbon|     6|  70.0|     C|
|   14.0067|  Nitrogen|     7|  65.0|     N|
|   15.9994|    Oxygen|     8|  60.0|     O|
|18.9984032|  Fluorine|     9|  50.0|     F|
|   20.1797|      Neon|    10|  null|    Ne|
|   22.9898|    Sodium|    11| 180.0|    Na|
|    24.305| Magnesium|    12| 150.0|    Mg|
|   26.9815|  Aluminum|    13| 125.0|    Al|
|   28.0855|   Silicon|    14| 110.0|    Si|
|   30.9738|Phosphorus|    15| 100.0|     P|
|    32.065|    Sulfur|    16| 100.0|     S|
|    35.453|  Chlorine|    17| 100.0|    Cl|
|    39.948|     Argon|    18|  71.0|    Ar|
|   39.0983| Potassium|    19| 220.0|     K|
|    40.078|   Calcium|    20| 180.0|    Ca|
+----------+----------+------+------+------+
only showing top 20 rows

In [54]:
df = elementsDF.select('number', 'name', 'radius', 'mass')\
    .filter(elementsDF.mass < 20)\
    .sort('number')
df.show()
+------+---------+------+----------+
|number|     name|radius|      mass|
+------+---------+------+----------+
|     1| Hydrogen|  25.0|   1.00794|
|     2|   Helium|  null|  4.002602|
|     3|  Lithium| 145.0|     6.941|
|     4|Beryllium| 105.0|  9.012182|
|     5|    Boron|  85.0|    10.811|
|     7| Nitrogen|  65.0|   14.0067|
|     8|   Oxygen|  60.0|   15.9994|
|     9| Fluorine|  50.0|18.9984032|
+------+---------+------+----------+

In [55]:
df.fillna(0).show()
+------+---------+------+----------+
|number|     name|radius|      mass|
+------+---------+------+----------+
|     1| Hydrogen|  25.0|   1.00794|
|     2|   Helium|   0.0|  4.002602|
|     3|  Lithium| 145.0|     6.941|
|     4|Beryllium| 105.0|  9.012182|
|     5|    Boron|  85.0|    10.811|
|     7| Nitrogen|  65.0|   14.0067|
|     8|   Oxygen|  60.0|   15.9994|
|     9| Fluorine|  50.0|18.9984032|
+------+---------+------+----------+

Future: DataSets