AWS EMR spark for fast data analysis

Setup AWS EMR with Hadoop and Spark

Create a new EMR cluster

Select the advance option
Select all the software you want to test
Bid 20% to 50% higher for your spot instances
For this case, we just go for the default security setting will do, from here we create a AWS EMR cluster for our spark data analysis purpose
This might take a while before the cluster instances are up. 
Please ensure your master instance SSH is open for putty connection

You should see the instances are ready in EC2 dashboard

Click on  the “Action” button and select “Connect”, all the SSH connection details will be available

Or else you may click on the EMR dashboard for SSH connection details

connect using putty

# download and install ananconda 
sudo wget http://repo.continuum.io/archive/Anaconda2-4.0.0-Linux-x86_64.sh
bash Anaconda2-4.0.0-Linux-x86_64.sh
# after install, restart your console to take effect

We will use the movie lens 100k data set for demostration purpose.

#download the movie lens 100k data from below url
[hadoop@ip-xx ~]$ sudo wget http://files.grouplens.org/datasets/movielens/ml-100k.zip

#unzip the file
[hadoop@ip-xx ~]$ unzip ml-100k.zip

#change directory
[hadoop@ip-xx ~]$ cd ml-100k

#check the first 5 rows of a particular files
[hadoop@ip-1xx ~]$ head -5 u.user

#moving the files to a directory
[hadoop@ip-xx ~]$ hadoop fs -put /home/hadoop/ml-100k  /user/hadoop/

#check if all the files are copy correctly
[hadoop@ip-1xx ~]$ hadoop fs -ls /user/hadoop/ml-100k
Found 23 items
-rw-r–r–   1 hadoop hadoop       6750 2016-05-28 14:33 /user/hadoop/ml-100k/README
-rw-r–r–   1 hadoop hadoop        716 2016-05-28 14:33 /user/hadoop/ml-100k/allbut.pl
-rw-r–r–   1 hadoop hadoop        643 2016-05-28 14:33 /user/hadoop/ml-100k/mku.sh
-rw-r–r–   1 hadoop hadoop    1979173 2016-05-28 14:33 /user/hadoop/ml-100k/u.data
-rw-r–r–   1 hadoop hadoop        202 2016-05-28 14:33 /user/hadoop/ml-100k/u.genre
-rw-r–r–   1 hadoop hadoop         36 2016-05-28 14:33 /user/hadoop/ml-100k/u.info
-rw-r–r–   1 hadoop hadoop     236344 2016-05-28 14:33 /user/hadoop/ml-100k/u.item
-rw-r–r–   1 hadoop hadoop        193 2016-05-28 14:33 /user/hadoop/ml-100k/u.occupation
-rw-r–r–   1 hadoop hadoop      22628 2016-05-28 14:33 /user/hadoop/ml-100k/u.user
-rw-r–r–   1 hadoop hadoop    1586544 2016-05-28 14:33 /user/hadoop/ml-100k/u1.base
-rw-r–r–   1 hadoop hadoop     392629 2016-05-28 14:33 /user/hadoop/ml-100k/u1.test
-rw-r–r–   1 hadoop hadoop    1583948 2016-05-28 14:33 /user/hadoop/ml-100k/u2.base
-rw-r–r–   1 hadoop hadoop     395225 2016-05-28 14:33 /user/hadoop/ml-100k/u2.test
-rw-r–r–   1 hadoop hadoop    1582546 2016-05-28 14:33 /user/hadoop/ml-100k/u3.base
-rw-r–r–   1 hadoop hadoop     396627 2016-05-28 14:33 /user/hadoop/ml-100k/u3.test
-rw-r–r–   1 hadoop hadoop    1581878 2016-05-28 14:33 /user/hadoop/ml-100k/u4.base
-rw-r–r–   1 hadoop hadoop     397295 2016-05-28 14:33 /user/hadoop/ml-100k/u4.test
-rw-r–r–   1 hadoop hadoop    1581776 2016-05-28 14:33 /user/hadoop/ml-100k/u5.base
-rw-r–r–   1 hadoop hadoop     397397 2016-05-28 14:33 /user/hadoop/ml-100k/u5.test
-rw-r–r–   1 hadoop hadoop    1792501 2016-05-28 14:33 /user/hadoop/ml-100k/ua.base
-rw-r–r–   1 hadoop hadoop     186672 2016-05-28 14:33 /user/hadoop/ml-100k/ua.test
-rw-r–r–   1 hadoop hadoop    1792476 2016-05-28 14:33 /user/hadoop/ml-100k/ub.base
-rw-r–r–   1 hadoop hadoop     186697 2016-05-28 14:33 /user/hadoop/ml-100k/ub.test

#start a pyspark console
[hadoop@ip-1xx~]$ pyspark

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  ‘_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.6.1
      /_/

Using Python version 2.7.10 (default, Dec  8 2015 18:25:23)
SparkContext available as sc, HiveContext available as sqlContext.

#load u.user into the RDD
>>> user_data = sc.textFile(“ml-100k/u.user”)

#checking 1st record of the RDD and you will see the file is seperated by “|”
>>> user_data.first()
12.159989 s
u’1|24|M|technician|85711′

# split the columns by “|”
>>> user_fields = user_data.map(lambda line: line.split(“|”))

#count number of users
>>> num_users = user_fields.map(lambda fields: fields[0]).count()

# distinct count by gender
>>> num_genders = user_fields.map(lambda fields:fields[2]).distinct().count()

# distinct count by occupation
>>> num_occupations = user_fields.map(lambda fields:fields[3]).distinct().count()

# distinct count by zip codes
>>> num_zipcodes = user_fields.map(lambda fields:fields[4]).distinct().count()

# print all the results
>>> print  “Users: %d, genders: %d, occupations: %d, Zip Codes: %d” %(num_users, num_genders, num_occupations, num_zipcodes)
Users: 943, genders: 2, occupations: 21, Zip Codes: 795

The above is just a simple example how we can do a quick data analysis via Pyspark console. now we use the Ipython instead of the standard python shell for further demonstration:

>>> exit()

[hadoop@xx ~]$ IPYTHON=1 pyspark

In [1]: import re

In [2]: from operator import add

# count number of line in the file
In [3]: file_in = sc.textFile(“ml-100k/README”)

In [4]: print(‘number of lines in file: %s’ % file_in.count()
Out[5]:number of lines in file: 157
# count number of charater in file
In [6]: chars = file_in.map(lambda s: len(s)).reduce(add)
In [7]: print(‘number of characters in file: %s’ % chars)
Out[8]:number of characters in file: 6593
# getting word as key from the file
In [9]: words =file_in.flatMap(lambda line: re.split(‘\W+’, line.lower().strip()))
# filter of words with less than 3 charaters, e.g. the, or, and which might biase our result
In [10]: words = words.filter(lambda x: len(x) > 3)
# count value by word as key
In [11]: words = words.map(lambda w: (w,1))
# reduce – sum count of all the words
In [12]: words = words.reduceByKey(add)
# sort (value – count , key – word) by descending order
In [13]: words = words.map(lambda x: (x[1], x[0])).sortByKey(False)
# display top 20 words by frequency
In [14]: words.take(20)
Out[15]:
[(39, u’data’),
 (19, u’test’),
 (15, u’research’),
 (12, u’this’),
 (12, u’grouplens’),
 (12, u’base’),
 (11, u’user’),
 (10, u’from’),
 (10, u’information’),
 (9, u’project’),
 (9, u’sets’),
 (8, u’filtering’),
 (8, u’users’),
 (6, u’collaborative’),
 (6, u’ratings’),
 (5, u’university’),
 (5, u’list’),
 (5, u’training’),
 (5, u’minnesota’),
 (5, u’movielens’)]
Will continue with more in-depth demonstration of how we can combine AWS S3, data pipeline, ELK stack (Elastic Search, logstacks and Kibana) and RedShift for web server logs analysis, visualizastion and loading into RedShift.
Note: Don’t forget to terminate your cluster after the testing. The above should cost you less than USD 0.10.

[email protected] – Liew Keong Han 9.42pm 29 May 2016.

Related Posts

2 thoughts on “AWS EMR spark for fast data analysis

Comments are closed.