Distributed SQL Query using SparkSQL, HDFS and Sqoop

Spark SQL: A brief introduction

Spark SQL is a component of Spark framework. It allows to manipulate big unstructured data file and extract useful information using SQL.
It introduces a new data abstraction called DataFrames allowing the analysis of structured and semi-structured data.
Spark SQL provides API in Scala,Python and Java in order to manipulate DataFrames.
It also provides the support for SQL language, a command line interface and an ODBC/JDBC server.
The example described in this post shows how to write a simple Spark application in order to execute an SQL query using Spark.

 

Import MySQL data into HDFS

In this paragraph I show how import a MySQL database in hadoop using sqoop in the following paragraph I use this data loaded in HDFS in order to execute an SQL query.

I’m using the “world” database that can be downloaded from https://dev.mysql.com/doc/index-other.html. It contains data about cities and countries around the world and the languages spoken in each country.
world-dm

I import all tables of world database in hdfs using as output format a text file separated by tab character. The following command imports all the table in the hdfs directory /user/cloudera/world

[cloudera@quickstart ~]$ sqoop import-all-tables --connect jdbc:mysql://localhost/world --username root -P --warehouse-dir  /user/cloudera/world  --fields-terminated-by '\t'
As you can observe watching the following command Sqoop has created a sub directory for each MySQL table and it has divided table data in different files with the same prefix "part-m-".
[cloudera@quickstart ~]$ hadoop fs -ls /user/cloudera/world
Found 3 items
drwxr-xr-x   - cloudera cloudera          0 2016-02-29 21:54 /user/cloudera/world/City
drwxr-xr-x   - cloudera cloudera          0 2016-02-29 21:55 /user/cloudera/world/Country
drwxr-xr-x   - cloudera cloudera          0 2016-02-29 21:55 /user/cloudera/world/CountryLanguage

[cloudera@quickstart ~]$ hadoop fs -ls /user/cloudera/world/City
Found 5 items
-rw-r--r--   1 cloudera cloudera          0 2016-02-29 21:54 /user/cloudera/world/City/_SUCCESS
-rw-r--r--   1 cloudera cloudera      37088 2016-02-29 21:54 /user/cloudera/world/City/part-m-00000
-rw-r--r--   1 cloudera cloudera      35361 2016-02-29 21:54 /user/cloudera/world/City/part-m-00001
-rw-r--r--   1 cloudera cloudera      35884 2016-02-29 21:54 /user/cloudera/world/City/part-m-00002
-rw-r--r--   1 cloudera cloudera      36148 2016-02-29 21:54 /user/cloudera/world/City/part-m-00003

[cloudera@quickstart ~]$ hadoop fs -cat  /user/cloudera/world/City/part-m-00000|head
1       Kabul   AFG     Kabol   1780000
2       Qandahar        AFG     Qandahar        237500
3       Herat   AFG     Herat   186800
4       Mazar-e-Sharif  AFG     Balkh   127800
5       Amsterdam       NLD     Noord-Holland   731200
6       Rotterdam       NLD     Zuid-Holland    593321
7       Haag    NLD     Zuid-Holland    440900
8       Utrecht NLD     Utrecht 234323
9       Eindhoven       NLD     Noord-Brabant   201843
10      Tilburg NLD     Noord-Brabant   193238

Spark SQL application

This paragraph describes the simple application that I wrote in order to execute the SQL Query using Spark SQL on the HDFS data imported in the last paragraph of this post.

The SQL query performs a join between all the three table of the database and it allows to extract the top ten country names and their capitals ordered by life expectancy where more than 50% of people speaks English.

First of all, i created the function “loadCSV” in order to load the data from HDFS in my application. This function accepts three parameters: the HDFS location, the name of the table used in Spark SQL and a lamba function mapping each field of HDFS text files in a specific type (string,integer,float) of a table. It parse each line of the text file in the HDFS location, split them by line and parse each line in order to extract the content of each field; at the end this function register the data on a Spark temporary table.

Later I execute the query described above and save the result on a parquet file.

from pyspark import SparkContext
from pyspark.sql import SQLContext, Row

sc = SparkContext("local", "Simple SparqSQL Join")

sqlContext = SQLContext(sc)

def loadCSV(path,tableName,function):
    csv = sc.textFile(path)
    columns = csv.map(lambda l: l.split("\t"))
    table = columns.map(function)
    schema=sqlContext.createDataFrame(table)
    schema.registerTempTable(tableName)

loadCSV("/user/giovanni/world/City/*","City",
    lambda p:
        Row(
            id=int(p[0]),
            name=p[1],
            countrycode=p[2],
            district=p[3],
            population= int(p[4])
        )
    )
loadCSV("/user/giovanni/world/CountryLanguage/*","CountryLanguage",
    lambda p:
        Row(countrycode=p[0],
            language=p[1],
            isofficial=p[2],
            percentage=float(p[3])
        )
    )
loadCSV("/user/giovanni/world/Country/*","Country",
    lambda p:
        Row(code=p[0],
            name=p[1],
            continent=p[2],
            region=p[3],
            surfacearea=None if p[4]=='null' else float(p[4]),
            indepyear=None if p[5]=='null' else int(p[5]),
            population=int(p[6]),
            lifeexpectancy=None if p[7]=='null' else float(p[7]),
            gnp=float(p[8]),
            gnpold=None if p[9]=='null' else float(p[9]),
            localname=p[10],
            governmentform=p[11],
            headofstate=p[12],
            capital=None if p[13]=='null' else int(p[13]),
            code2=p[14]
        )
    )

outputResult = sqlContext.sql(
"""SELECT  Country.name as CountryName,
           Country.lifeexpectancy as CountryLifeExpectancy,
           City.name as CapitalName,
           CountryLanguage.language
    FROM
           Country JOIN City on Country.capital = City.id
    JOIN
           CountryLanguage ON CountryLanguage.countrycode = Country.code
    WHERE  CountryLanguage.language ="English"
           AND CountryLanguage.percentage > 50
    ORDER BY CountryLifeExpectancy desc limit 10""")

outputResult.save("sparkSQLResult")

Execute the application and view the result

To run the application in my Hadoop cluster I simply wrote the source code described in the previous paragraph in the file spark_sql.py and I run it using the command spark-submit

The next snippet use parquet tools in order to view in a human readable format the result stored in the parquet file.

[cloudera@quickstart ~]$ spark-submit /home/cloudera/workspace/pyspark_examples/spark_sql.py
[cloudera@quickstart ~]$ hadoop fs -ls /user/cloudera/sparkSQLResult
Found 4 items
-rw-r--r--   1 cloudera cloudera          0 2016-02-29 22:07 /user/cloudera/sparkSQLResult/_SUCCESS
-rw-r--r--   1 cloudera cloudera        496 2016-02-29 22:07 /user/cloudera/sparkSQLResult/_common_metadata
-rw-r--r--   1 cloudera cloudera        866 2016-02-29 22:07 /user/cloudera/sparkSQLResult/_metadata
-rw-r--r--   1 cloudera cloudera       1371 2016-02-29 22:07 /user/cloudera/sparkSQLResult/part-r-00001.parquet

[root@quickstart ~]# hadoop parquet.tools.Main cat /user/cloudera/sparkSQLResult
CountryName = Australia
CountryLifeExpectancy = 79.8
CapitalName = Canberra
language = English

CountryName = Canada
CountryLifeExpectancy = 79.4
CapitalName = Ottawa
language = English

CountryName = Gibraltar
CountryLifeExpectancy = 79.0
CapitalName = Gibraltar
language = English

CountryName = Virgin Islands, U.S.
CountryLifeExpectancy = 78.1
CapitalName = Charlotte Amalie
language = English

CountryName = New Zealand
CountryLifeExpectancy = 77.8
CapitalName = Wellington
language = English

CountryName = United Kingdom
CountryLifeExpectancy = 77.7
CapitalName = London
language = English

CountryName = United States
CountryLifeExpectancy = 77.1
CapitalName = Washington
language = English

CountryName = Bermuda
CountryLifeExpectancy = 76.9
CapitalName = Hamilton
language = English

CountryName = Ireland
CountryLifeExpectancy = 76.8
CapitalName = Dublin
language = English

CountryName = Belize
CountryLifeExpectancy = 70.9
CapitalName = Belmopan
language = English

Apache Spark – Distributed computation of π in 8 lines of Python code

In this post I show how to write a distributed application computing an approximation of pi number though a Spark application using Python.

A brief introduction of Spark

Apache Spark is an opensource cluster computing framework supporting developers to create distributed applications.
Spark applications provides performance up to 100 times faster compared to Hadoop disk-based map reduce paradigm for certain applications. It allows to load data into a cluster memory and query it repeatedly using different programming language as Python, Scala or Java.

A brief introduction of Leibniz formula for pi

This formula also called Leibniz series or Gregory–Leibniz was discovered in 16th century by Gottfried Leibniz(https://en.wikipedia.org/wiki/Gottfried_Wilhelm_Leibniz) and James Gregory( https://en.wikipedia.org/wiki/James_Gregory_(mathematician)).
The formula using the summation notation is:

4b325f4142cab62b1786d8be1ac3be60

It allows summing an infinite series of numbers of compute the value of pi divided by 4.

If you sum a finite series of numbers generated using this formula you can obtain an approximation of pi number divided by 4. Increasing the number of series elements you can obtain a better approximation of pi divided by 4.

Further details about this formula are present in the Wikipedia page.

Pi approximation using Apache Spark

The following code allow to compute the pi approximation:

from pyspark import SparkContext
sc = SparkContext("local", "Pi Leibniz approximation")
iteration=10000
partition=4
data = range(0,iteration)
distIn = sc.parallelize(data,partition)
result=distIn.map(lambda n:(1 if n%2==0 else -1)/float(2*n+1)).reduce(lambda a,b: a+b)
print "Pi is %f" % (result*4)

 

This source code computes the first 10000 values of Leibniz series and sum it. The result is multiplied for 4 in order to obtain the approximation of pi.
In order to compute the 10000 values of the series, an sequence of integer between 0 and 99999 is generated and stored in distData variable. This sequence of number is splitted in 4 different partitions that could be computed separately by different servers.
In order to compute the i-th element of the series I used the following map function “lambda n: (1 if n % 2 == 0 else -1)/float(2*n+1)”.
The reduce function only sums the elements of the series.

In order to write the map and the reduce functions, I used lambda function python feature, in this blog i wrote a post about python lambda function.
The result obtained from the reduce function is multiplied by 4 and printed to the standard output.

Development and execution environment

In order to develop and execute this application I used Spark 1.3.0

This version of Spark is present in Cloudera quickstart VM, an virtual machine appliance that contains a test environment for Hadoop/Spark clusters.

Submitting the code described in this post using this environment is very simple.

You have to write the source code in a file called for example leibniz_pi.py and execute it running the command:

spark-submit leibniz_pi.py

The result of the execution showed in the standard output is:

Pi is 3.141493

Import Mysql data in Elasticsearch server

Elasticsearch is a near real-time search server based on Lucene. It allows to create a distributed full-text search engine. It’s an opensource software developed in Java. It offers REST api in order to insert, retrieve and search data.

In this post I describe how import data from a mysql database in an elasticsearch search engine using the library https://github.com/jprante/elasticsearch-jdbc

I have already installed a Mysql server and an Elasticsearch server, you can find several documentation on internet about installation of these software.

I use the mysql example database “World” provided by mysql. It can be downloaded from the following url http://downloads.mysql.com/docs/world.sql.gz

This image show the entity relationship model of this database.

world-dm

Step 1

The following text box shows the command used to import the example database in mysql

root@ubuntu01:~/database_example# wget http://downloads.mysql.com/docs/world.sql.gz

root@ubuntu01:~/database_example# unzip world.sql.zip
Archive: world.sql.zip
inflating: world.sql
root@ubuntu01:~/database_example# ls
world.sql world.sql.zip

root@ubuntu01:~/database_example# mysql -uroot -p < world.sql

Step 2

Download the elasticsearch-jdbc library and create the script “mysql-import-world.sh” showed in the following text box and run it in order to import the data from Mysql to Elasticsearch.

The script contains several parameter:

  • Mysql database connection data (ip, port, database name, username, passowrd)
  • SQL query executed in order to extract the data. In my example I use this query:
    SELECT City.ID as _id,
           City.Name,
           City.District,
           City.Population,
           Country.Name as CountryName,
           Country.continent as CountryContinent
    FROM City JOIN Country
    ON City.CountryCode = Country.Code;
  • Elastic search connection data (ip, port)
  • The name of the index created on Elasticsearch
root@ubuntu01:~# wget http://xbib.org/repository/org/xbib/elasticsearch/importer/elasticsearch-jdbc/2.1.0.0/elasticsearch-jdbc-2.1.0.0-dist.zip

root@ubuntu01:~# unzip elasticsearch-jdbc-2.1.0.0-dist.zip
root@ubuntu01:~# cd elasticsearch-jdbc-2.1.0.0/bin/

root@ubuntu01:~/elasticsearch-jdbc-2.1.0.0/bin# cat ./mysql-import-world.sh
#!/bin/bash
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
bin=${DIR}/../bin
lib=${DIR}/../lib

echo '
{
"type" : "jdbc",
"jdbc" : {
"url" : "jdbc:mysql://172.17.0.101:3306/world",
"user" : "root",
"password" : "password",
"sql" : "select City.ID as _id,City.Name,City.District,City.Population,Country.Name as CountryName, Country.continent as CountryContinent from City JOIN Country ON City.CountryCode = Country.Code;",
"treat_binary_as_string" : true,
"elasticsearch" : {
"cluster" : "elasticsearch",
"host" : "172.17.0.101",
"port" : 9300
},
"max_bulk_actions" : 20000,
"max_concurrent_bulk_requests" : 10,
"index" : "world"
}
}
' | java \
-cp "${lib}/*" \
-Dlog4j.configurationFile=${bin}/log4j2.xml \
org.xbib.tools.Runner \
org.xbib.tools.JDBCImporter

root@ubuntu01:~/elasticsearch-jdbc-2.1.0.0/bin# ./mysql-import-world.sh

Step 3

Finally execute a query to the Elasticsearch server in order to verify that the new index world has been created and a second query in order to retrieve some articles from this index.

</pre>
root@ubuntu01:~/elasticsearch-jdbc-2.1.0.0/bin# curl 'http://localhost:9200/_cat/indices?v'
 health status index pri rep docs.count docs.deleted store.size pri.store.size
 green open world 5 1 4079 0 1.1mb 1.1mb
 green open settings 5 1 0 0 650b 650b

root@ubuntu01:~/elasticsearch-jdbc-2.1.0.0/bin# curl -XGET 'localhost:9200/world/_search?size=3&pretty=true'
 {
 "took" : 2,
 "timed_out" : false,
 "_shards" : {
 "total" : 5,
 "successful" : 5,
 "failed" : 0
 },
 "hits" : {
 "total" : 4079,
 "max_score" : 1.0,
 "hits" : [ {
 "_index" : "world",
 "_type" : "jdbc",
 "_id" : "129",
 "_score" : 1.0,
 "_source":{"Name":"Oranjestad","District":"Aruba","Population":29034,"CountryName":"Aruba","CountryContinent":"North America"}
 }, {
 "_index" : "world",
 "_type" : "jdbc",
 "_id" : "60",
 "_score" : 1.0,
 "_source":{"Name":"Namibe","District":"Namibe","Population":118200,"CountryName":"Angola","CountryContinent":"Africa"}
 }, {
 "_index" : "world",
 "_type" : "jdbc",
 "_id" : "73",
 "_score" : 1.0,
 "_source":{"Name":"Lomas de Zamora","District":"Buenos Aires","Population":622013,"CountryName":"Argentina","CountryContinent":"South America"}
 } ]
 }
 }

 

There are also some software like Kibana and Graphana providing a dashboard really useful to query an elasticsearch server and show the data in a web interface.

Flume: Import apache logs in hadoop hdfs

Flume is a project of the Apache Software Foundation used to import stream of data to a centralized data store. In hadoop environments Flume is used to import data into hadoop clusters from different data sources.

In this post I show how use Flume to import apache logs (access_log and error_log ) in hadoop hdfs filesystem.

A Flume agent is composed by a set of sources, channel and sinks:

  • sources are used to collect data/events from different data sources
  • channels are the communication media used to temporary store the events collected by the sources
  • sinks asynchronously read the events from the channel and send them to a destination

Flume supports different types of sources,channels and sinks. The complete list of sources,channel and sinks already implemented can be obtained reading the documentation ( https://flume.apache.org/FlumeUserGuide.html )

In my example in order to import Apache web server logs I use the following flume component:

  • Exec source: It runs a unix command and expects that process produce data in the standard output
  • Memory channel: This channel implements an in-memory queue for the events
  • HDFS Sync: It allows to create text file on HDFS

The following image shows the architecture of the flume agent used in this example.

flume01

The flume agent requires a configuration file defining the sources,channels and sinks used by the agent and their properties. The following text box shows the flume configuration file that I used to import apache web server logs in hadoop:


##### /home/cloudera/example-1.conf
# Name the components on this agent
# I define two sources:
# a source for access log file
# a source for for error log file
agent1.sources = tailAccessSource tailErrorSource
# I define one sink
agent1.sinks = hdfsSink
# I define one channel
agent1.channels = memChannel01

# Bind the source and sink to the channel
# Both sources will use the memory channel 
agent1.sources.tailAccessSource.channels = memChannel01
agent1.sources.tailErrorSource.channels = memChannel01
agent1.sinks.hdfsSink.channel = memChannel01


# Define the type and options for each sources
agent1.sources.tailAccessSource.type = exec
agent1.sources.tailAccessSource.command = tail -F /var/log/httpd/access_log

agent1.sources.tailErrorSource.type = exec
agent1.sources.tailErrorSource.command = tail -F /var/log/httpd/error_log

# Define the type and options for the channel
agent1.channels.memChannel01.type = memory
agent1.channels.memChannel01.capacity = 100000
agent1.channels.memChannel01.transactionCapacity = 10000


# Define the type and options for the sink
# Note: namenode is the hostname the hadoop namenode server
#       flume/data-example.1/ is the directory where the apache logs will be stored
agent1.sinks.hdfsSink.type = hdfs
agent1.sinks.hdfsSink.hdfs.path = hdfs://namenode/flume/data-example.1/
agent1.sinks.hdfsSink.hdfs.fileType = DataStream
agent1.sinks.hdfsSink.hdfs.rollCount = 0
agent1.sinks.hdfsSink.hdfs.rollSize = 0
agent1.sinks.hdfsSink.hdfs.rollInterval = 60

The import of apache logs can be started running Flume with its configuration file as argument and waiting that Apache web server start to produce logs:

[cloudera@quickstart ~]$ flume-ng agent --conf conf --conf-file /home/cloudera/example-1.conf --name agent1 -Dflume.root.logger=DEBUG,console

HA two node GPFS cluster with tie-breaker disk

In a previous post I described how configure a GPFS cluster filesystem ( a filesystem that can be mounted by two or more servers simultaneously ).
This article describes the changes required to enable a high-availability configuration for a GPFS cluster filesystem. This configuration allows each node to write and read the filesystem when the other node is down.

The physical server architecture, showed in the following figure, remains the same:
– two Centos server
– two shared disks between the servers

gpfs01

The command mmlscluster output shows that only the first gpfs node has assigned the role of manager and quorum node. In order to enable high-availability both the servers must have these two roles.

[root@gpfs01 ~]# /usr/lpp/mmfs/bin/mmlscluster

GPFS cluster information
========================
  GPFS cluster name:         gpfs01
  GPFS cluster id:           14526312809412325839
  GPFS UID domain:           gpfs01
  Remote shell command:      /usr/bin/ssh
  Remote file copy command:  /usr/bin/scp
  Repository type:           CCR

 Node  Daemon node name  IP address    Admin node name  Designation
--------------------------------------------------------------------
   1   gpfs01            172.17.0.101  gpfs01           quorum-manager
   2   gpfs02            172.17.0.102  gpfs02

The filesystem fs_gpfs01 is composed by two network shared disk. In this post I’ll show how configure thee two disks as tie-breaker disks in order to enable the high-availability.

[root@gpfs01 ~]# /usr/lpp/mmfs/bin/mmlsnsd -a

File system Disk name NSD servers
---------------------------------------------------------------------------
fs_gpfs01 mynsd1 (directly attached)
fs_gpfs01 mynsd2 (directly attached)

Indeed as many other cluster softwares GPFS requires that the majority of quorum nodes are online to use the filesystem in order to avoid split brain.
In this case the cluster is composed by an even number of cluster nodes so one or more tie-breaker disk must be defined.
More details about gpfs reliability configuration can be found in this document http://www-03.ibm.com/systems/resources/configure-gpfs-for-reliability.pdf .

As described before I assign the manager and quorum role to node gpfs02 and I verify it using the command mmlscluster.

[root@gpfs01 ~]# mmchnode --manager -N gpfs02
Thu May 7 22:11:20 CEST 2015: mmchnode: Processing node gpfs02
mmchnode: Propagating the cluster configuration data to all
affected nodes. This is an asynchronous process.

[root@gpfs01 ~]# mmchnode --quorum -N gpfs02
Thu May 7 22:11:20 CEST 2015: mmchnode: Processing node gpfs02
mmchnode: Propagating the cluster configuration data to all
affected nodes. This is an asynchronous process.
[root@gpfs01 ~]# /usr/lpp/mmfs/bin/mmlscluster

GPFS cluster information
========================
GPFS cluster name: gpfs01
GPFS cluster id: 14526312809412325839
GPFS UID domain: gpfs01
Remote shell command: /usr/bin/ssh
Remote file copy command: /usr/bin/scp
Repository type: CCR

Node Daemon node name IP address Admin node name Designation
--------------------------------------------------------------------
1 gpfs01 172.17.0.101 gpfs01 quorum-manager
2 gpfs02 172.17.0.102 gpfs02 quorum-manager

I configure both NSD as tie-breaker disks and I verify it using the command mmlsconfig

[root@gpfs01 ~]# mmchconfig tiebreakerDisks="mynsd1;mynsd2"

[root@gpfs01 ~]# mmlsconfig
Configuration data for cluster gpfs01:
--------------------------------------
clusterName gpfs01
clusterId 14526312809412325839
autoload no
dmapiFileHandleSize 32
minReleaseLevel 4.1.0.4
ccrEnabled yes
tiebreakerDisks mynsd1;mynsd2
adminMode central

File systems in cluster gpfs01:
-------------------------------
/dev/fs_gpfs01

Now the GPFS HA configuration is completed. I can shutdown one node and verify that the other node can write and read the GPFS filesystem.

[root@gpfs01 ~]# mmmount /fs_gpfs01 -a
Thu May 7 22:22:42 CEST 2015: mmmount: Mounting file systems ...

[root@gpfs02 ~]# ssh gpfs01 shutdown -h now
[root@gpfs02 ~]# cd /fs_gpfs01/
[root@gpfs02 fs_gpfs01]# ls -latr
dr-xr-xr-x 2 root root 8192 Jan 1 1970 .snapshots
[root@gpfs02 fs_gpfs01]# ls -latr
total 1285
dr-xr-xr-x 2 root root 8192 Jan 1 1970 .snapshots
drwxr-xr-x 2 root root 262144 May 7 21:49 .
-rw-r--r-- 1 root root 1048576 May 7 21:50 test1M
dr-xr-xr-x. 24 root root 4096 May 7 21:55 ..

Furthermore the log in /var/log/messages provides more details about this event. The log below,grabbed on node gpfs02 when I shutdown the node gpfs01, shows that the node gpfs02 detected the failure of the node gpfs01 and it has been elected cluster manager.

# /var/log/messages
...
May 7 22:25:29 gpfs02 mmfs: [E] CCR: failed to connect to node 172.17.0.101:1191 (sock 42 err 1143)
May 7 22:25:39 gpfs02 mmfs: [E] CCR: failed to connect to node 172.17.0.101:1191 (sock 42 err 1143)
May 7 22:25:39 gpfs02 mmfs: [E] Node 172.17.0.101 (gpfs01) is being expelled due to expired lease.
May 7 22:25:39 gpfs02 mmfs: [N] This node (172.17.0.102 (gpfs02)) is now Cluster Manager for gpfs01.

Install Cloudera prerequisites with Ansible

Ansible is an opensource software for configuring and managing a server infrastructures. It allows multi-node software deployment, ad hoc task execution and configuration management.
In this post I show how use ansible to deploy some Cloudera Hadoop 5 prerequisites to a large set of server.

Ansible is a lightweight alternative to other opensource configuration management tools like puppet. It doesn’t need any agent installed on each managed node like puppet. It require only a ssh connection from ansible server to managed servers and the python package installed on them.

In this tutorial two type of ansible configuration files will be used:

  • hosts file: it allows to define the list of hosts managed by ansible
  • playbook: it contains a list of configurations that can be deployed to a server or a group of server

Cloudera Hadoop requires for each nodes the following prerequisites :
– The file /etc/hosts of each nodes containing consistent information about hostnames and IP addresses across all hosts
– SELinux disabled
– IPv6 disabled
– vm.swappiness kernel option must be set to a value less or equal to 10
– A password-less ssh connection to root user with a unique private keys must be configured

This configuration must be deployed to each cloudera server. The manual execution of these tasks over a large hadoop infrastructure could be a time-consuming activity.
Ansible can automatize all this operation.

The example showed in this post uses an hostgroup of 4 cloudera server but it can be easily scaled to hundreds of servers.

Each node in my configuration uses Centos 6.5 operating system and python and libselinux-python rpm has been installed.
The libselinux-python rpm package is required also to disable selinux configuration and it could be also installed by ansible.

I install ansible on the first host of my hostgroup.

[root@cloudera01 ~]# yum install epel-release
[root@cloudera01 ~]# yum install ansible

I modify the ansible hosts file /etc/ansible/hosts as showed below. This file defines an hostgroup named “cloudera” and its hostnames list (these hostnames should be defined on /etc/hosts or in a DNS server )

[root@cloudera01 ~]# cat /etc/ansible/hosts
[cloudera]
cloudera01
cloudera02
cloudera03
cloudera04

I create a playbook in /root/cloudera-prerequisites.yml as showed in the following text box.

Playbook are text file in YAML format containing a set of orchestration steps.

  1. This playbook defines:
  2. The group of servers involved in the orchestration
  3. The remote username used for the ssh connection
  4. A list of orchestration step(task) that must be executed on each server of cloudera group. Each task define an ansible module and their parameter. You can found more detail about ansible modules on Ansible documentation http://docs.ansible.com/modules.html .
[root@cloudera01 ~]# cat cloudera-prerequisites.yml
- hosts: cloudera
remote_user: root
tasks:
- selinux: state=disabled
- sysctl: name=net.ipv6.conf.all.disable_ipv6 value=1 state=present
- sysctl: name=net.ipv6.conf.default.disable_ipv6 value=1 state=present
- sysctl: name=vm.swappiness value=10 state=present
- authorized_key: user=root key="{{ lookup('file', '/root/.ssh/id_rsa.pub') }}"
- copy: src=/etc/hosts dest=/etc/hosts owner=root group=root mode=0644

Now I’m ready to execute my playbook applying cloudera prerequisites on each node of cloudera hostgroup.
I launch the command ansible-playbook using the playbook as arguments and using the flag “-k”.
The flags -k allow to insert the password of all cloudera hosts in interactive way.
Ansible playbook can be also executed in other environment where there is an password-less ssh connection or hosts with different root passwords.

[root@cloudera01 ~]# ansible-playbook cloudera-prerequisites.yml -k
SSH password: ## Insert root password of all cloudera hosts

PLAY [cloudera] ***************************************************************

GATHERING FACTS ***************************************************************
ok: [cloudera04]
ok: [cloudera01]
ok: [cloudera03]
ok: [cloudera02]

TASK: [selinux state=disabled] ************************************************
changed: [cloudera04]
changed: [cloudera03]
changed: [cloudera01]
changed: [cloudera02]

TASK: [sysctl name=net.ipv6.conf.all.disable_ipv6 value=1 state=present] ******
ok: [cloudera01]
ok: [cloudera04]
ok: [cloudera03]
ok: [cloudera02]

TASK: [sysctl name=net.ipv6.conf.default.disable_ipv6 value=1 state=present] ***
ok: [cloudera04]
ok: [cloudera03]
ok: [cloudera01]
ok: [cloudera02]

TASK: [authorized_key user=root key="ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAQEA8aFJa2vXcrt42PmzIT8/9rFc4JQHS7ElV7p11l7KrV3Kq9IqnPaWei+u6zJ0zTW/J1DvOalzzT23tMakAMPpsKm/LEAQnvKA3Ytc0K+vtHH7tJaAB0QJAoq2rBocj7R+RJtnU8VvQxRyCYELDYoTLLjCKBjvyDN7908ojuuqHdb4LpIiTnge5WcofpeD64P1J4PN6sYAu+nTC/ykg4a75iiuyoWuocwfRgS9i1aFdyHHnY40rB8/Er+vzn9bQRbNTYjwo8kEaQt1ZM4ZRjzhM3gUUwM0JUjeSDN3soA+Dq4tW052nxiL5xEWsCcTLcy5cd6fChzEQShPP8xnee8btw== root@cloudera01.example.com"] ***
ok: [cloudera01]
ok: [cloudera03]
ok: [cloudera02]
ok: [cloudera04]

TASK: [copy src=/etc/hosts dest=/etc/hosts owner=root group=root mode=0644] ***
ok: [cloudera02]
ok: [cloudera01]
ok: [cloudera03]
ok: [cloudera04]

PLAY RECAP ********************************************************************
cloudera01 : ok=6 changed=1 unreachable=0 failed=0
cloudera02 : ok=6 changed=1 unreachable=0 failed=0
cloudera03 : ok=6 changed=1 unreachable=0 failed=0
cloudera04 : ok=6 changed=1 unreachable=0 failed=0

Reboot each system in order to apply selinux configuration

When a playbook is executed, Ansible generate a report containing detailed information about the execution of each tasks.
In this case ansible informs you that a reboot of each node is required in order to apply to reload selinux configuration.

Ansible executes idempotent configuration, it means that the same playbook can be reapplied on an already configured node without modifying any configuration.
This feature can be useful to check configuration changes or when a new host is added in a hostgroup.

Install and configure GPFS 4.1 filesystem on Linux Centos 6.6

The General Parallel File System (GPFS) is a high-performance clustered file system developed by IBM. It can be deployed in shared-disk infrastructure or in shared-nothing architecture. It’s used by many large company and in serveral supercomputers on the Top 500 List.

GPFS allows to configure a high available filesystem allowing concurrent access from a cluster of nodes.
Cluster nodes can be server using AIX, Linux or Windows operatng system.

GPFS provides high performance allowing striping blocks of data over multiple disk reading and writing this blocks in parallel. It offers also block replication over different disks in order to guarantee the availability of the filesystem also during disk failures.

The following list contains some of the most interesting features of GPFS:

  • Provide a POSIX compliant interface
  • Allow filesystem mounting to client accessing data though LAN connection
  • Many filesystem maintenance tasks can be performed while the filesystem is mounted
  • Support quota
  • Distributes metadata over different disks
  • Have really high scalability limits

This post describes the step required to implement a basic configuration of GPFS filesystem on a cluster composed by two Centos 6.6 servers.

gpfs01

The server architectures showed in the images is composed by two server gpfs01 and gpfs02 sharing two disk device (sdb and sdc) each one of 10 GB size. Each server has an ethernet connection on subnet 172.17.0.0/16 allowing the communication between gpfs cluster nodes. The gpfs version used for installation is 4.1.0-5.

The following sequence of tasks show how install and configure GPFS on this couple of servers.

1) Install GPFS rpm

Install gpfs rpm on both nodes

[root@gpfs01 gpfs_repo]# find $PWD
/root/gpfs_repo
/root/gpfs_repo/gpfs.ext-4.1.0-5.x86_64.rpm
/root/gpfs_repo/gpfs.docs-4.1.0-5.noarch.rpm
/root/gpfs_repo/kmod-gpfs-4.1.0-5.15.sdl6.x86_64.rpm
/root/gpfs_repo/gpfs.base-4.1.0-5.0.1.x86_64.rpm
/root/gpfs_repo/gpfs.gskit-8.0.50-32.x86_64.rpm
/root/gpfs_repo/gpfs.msg.en_US-4.1.0-5.noarch.rpm
/root/gpfs_repo/gpfs.gpl-4.1.0-5.noarch.rpm

[root@gpfs02 gpfs_repo]# yum localinstall *.rpm

[root@gpfs02 gpfs_repo]# find $PWD
/root/gpfs_repo
/root/gpfs_repo/gpfs.ext-4.1.0-5.x86_64.rpm
/root/gpfs_repo/gpfs.docs-4.1.0-5.noarch.rpm
/root/gpfs_repo/kmod-gpfs-4.1.0-5.15.sdl6.x86_64.rpm
/root/gpfs_repo/gpfs.base-4.1.0-5.0.1.x86_64.rpm
/root/gpfs_repo/gpfs.gskit-8.0.50-32.x86_64.rpm
/root/gpfs_repo/gpfs.msg.en_US-4.1.0-5.noarch.rpm
/root/gpfs_repo/gpfs.gpl-4.1.0-5.noarch.rpm

[root@gpfs01 gpfs_repo]# yum localinstall *.rpm

2) Configure /etc/hosts

Configure hosts file on both nodes in order to allow name resolution.

[root@gpfs01 gpfs_repo]# cat /etc/hosts
127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4
::1 localhost localhost.localdomain localhost6 localhost6.localdomain6
172.17.0.101 gpfs01 gpfs01.example.com
172.17.0.102 gpfs02 gpfs02.example.com

[root@gpfs02 gpfs_repo]# cat /etc/hosts
127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4
::1 localhost localhost.localdomain localhost6 localhost6.localdomain6
172.17.0.101 gpfs01 gpfs01.example.com
172.17.0.102 gpfs02 gpfs02.example.com

3) Exchange root ssh key between nodes

GPFS requires that each gpfs cluster nodes can execute ssh commands on all other nodes using root user in order to allow remote administration of other nodes. In order to allow it you have to exchange ssh root keys between each cluster node.

[root@gpfs01 ~]# ssh-copy-id root@gpfs01
[root@gpfs01 ~]# ssh-copy-id root@gpfs02
[root@gpfs02 ~]# ssh-copy-id root@gpfs01
[root@gpfs02 ~]# ssh-copy-id root@gpfs02

4) Test ssh password-less connection

Verify the previous step executing a ssh connection between each couple of nodes.

[root@gpfs01 ~]# ssh gpfs01 date
Sat Apr 18 10:52:08 CEST 2015

[root@gpfs01 ~]# ssh gpfs02 date
Sat Apr 18 10:52:09 CEST 2015

[root@gpfs02 ~]# ssh gpfs01 date
Mon Apr 13 21:44:52 CEST 2015

[root@gpfs02 ~]# ssh gpfs02 date
Mon Apr 13 21:44:53 CEST 2015

5) Compile GPFS portability layer rpm

The GPFS portability layer is a loadable kernel module that allows the GPFS daemon to interact with the operating system.

IBM provides the source code of this module. It must be compiled for the kernel version used by your servers. This step can be executed on a single node then the rpm containing the kernel module can be distributed and installed over all the other gpfs nodes. In this example this module will be compiled on server gpfs01.

In order to avoid the error “Cannot determine the distribution type. /etc/redhat-release is present, but the release name is not recognized. Specify the distribution type explicitly.” during module compiling replace content of /etc/redhat-release with the string “Red Hat Enterprise Linux Server release 6.6 (Santiago)”.

[root@gpfs01 src]# mv /etc/redhat-release /etc/redhat-release.original
[root@gpfs01 src]# echo "Red Hat Enterprise Linux Server release 6.6 (Santiago)" >> /etc/redhat-release

In order to avoid the error “Cannot find a valid kernel include directory” during module compiling install the rpm required for compile module for your kernel version (kernel source, rpmbuild, …).

[root@gpfs01 src]# yum install kernel-headers-2.6.32-504.12.2.el6

[root@gpfs01 src]# yum install kernel-devel-2.6.32-504.12.2.el6
[root@gpfs01 src]# yum install imake gcc-c++ rpmbuild

Compile the module:

[root@gpfs01 ~]# cd /usr/lpp/mmfs/src

[root@gpfs01 src]# make Autoconfig

[root@gpfs01 src]# make World

[root@gpfs01 src]# make InstallImages

[root@gpfs01 src]# make rpm

At the end of this task the rpm  gpfs.gplbin-2.6.32-504.12.2.el6.x86_64-4.1.0-5.x86_64.rpm is saved in the directory /root/rpmbuild/RPMS/x86_64/

6) Install GPFS portability layer rpm

Distribute GPFS portability layer rpm on each node and install it.

[root@gpfs01 src]# scp /root/rpmbuild/RPMS/x86_64/gpfs.gplbin-2.6.32-504.12.2.el6.x86_64-4.1.0-5.x86_64.rpm gpfs02:/tmp/gpfs.gplbin-2.6.32-504.12.2.el6.x86_64-4.1.0-5.x86_64.rpm

[root@gpfs01 src]# yum localinstall /root/rpmbuild/RPMS/x86_64/gpfs.gplbin-2.6.32-504.12.2.el6.x86_64-4.1.0-5.x86_64.rpm

[root@gpfs02 ~]# yum localinstall /tmp/gpfs.gplbin-2.6.32-504.12.2.el6.x86_64-4.1.0-5.x86_64.rpm

7) Create GPFS cluster

In this step the cluster is created adding the node gpfs01 with the role of cluster manager and quorum manager. In the next steps the gpfs02 node will be added to the cluster.

[root@gpfs01 ~]# /usr/lpp/mmfs/bin/mmcrcluster -N gpfs01:manager-quorum -p gpfs01 -r /usr/bin/ssh -R /usr/bin/scp

mmcrcluster: Performing preliminary node verification ...
mmcrcluster: Processing quorum and other critical nodes ...
mmcrcluster: Finalizing the cluster data structures ...
mmcrcluster: Command successfully completed
mmcrcluster: Warning: Not all nodes have proper GPFS license designations.
Use the mmchlicense command to designate licenses as needed

The command mmlscluster allow to show some information about the gpfs cluster:

[root@gpfs01 ~]# /usr/lpp/mmfs/bin/mmlscluster

===============================================================================
| Warning: |
| This cluster contains nodes that do not have a proper GPFS license |
| designation. This violates the terms of the GPFS licensing agreement. |
| Use the mmchlicense command and assign the appropriate GPFS licenses |
| to each of the nodes in the cluster. For more information about GPFS |
| license designation, see the Concepts, Planning, and Installation Guide. |
===============================================================================
GPFS cluster information
========================
GPFS cluster name: gpfs01
GPFS cluster id: 14526312809412325839
GPFS UID domain: gpfs01
Remote shell command: /usr/bin/ssh
Remote file copy command: /usr/bin/scp
Repository type: CCR

Node Daemon node name IP address Admin node name Designation
--------------------------------------------------------------------
1 gpfs01 172.17.0.101 gpfs01 quorum-manager

You need to accept the GPFS server license for node gpfs01.

[root@gpfs01 ~]# /usr/lpp/mmfs/bin/mmchlicense server --accept -N gpfs01

The following nodes will be designated as possessing GPFS server licenses:
gpfs01
mmchlicense: Command successfully completed

8) Start gpfs cluster on node gpfs01

Start gpfs cluster on node gpfs01

[root@gpfs01 ~]# /usr/lpp/mmfs/bin/mmstartup -N gpfs01
Fri Apr 24 18:43:35 CEST 2015: mmstartup: Starting GPFS ...

Verify the node status

[root@gpfs01 ~]# /usr/lpp/mmfs/bin/mmgetstate -a

Node number Node name GPFS state
------------------------------------------
1 gpfs01 active

9) Add the second node to GPFS

In this step the second server gpfs02 will be added to the gpfs cluster.

[root@gpfs01 ~]# /usr/lpp/mmfs/bin/mmaddnode -N gpfs02
Fri Apr 24 18:44:54 CEST 2015: mmaddnode: Processing node gpfs02
mmaddnode: Command successfully completed
mmaddnode: Warning: Not all nodes have proper GPFS license designations.
Use the mmchlicense command to designate licenses as needed.
mmaddnode: Propagating the cluster configuration data to all
affected nodes. This is an asynchronous process.

Now the command mmlscluster shows that the cluster is composed by two servers.

[root@gpfs01 ~]# /usr/lpp/mmfs/bin/mmlscluster

GPFS cluster information

========================
GPFS cluster name: gpfs01
GPFS cluster id: 14526312809412325839
GPFS UID domain: gpfs01
Remote shell command: /usr/bin/ssh
Remote file copy command: /usr/bin/scp
Repository type: CCR

Node Daemon node name IP address Admin node name Designation
--------------------------------------------------------------------
1 gpfs01 172.17.0.101 gpfs01 quorum-manager
2 gpfs02 172.17.0.102 gpfs02

You have to accept the gpfs server license for server gpfs02

[root@gpfs01 ~]# /usr/lpp/mmfs/bin/mmchlicense server --accept -N gpfs02

The following nodes will be designated as possessing GPFS server licenses:
gpfs02
mmchlicense: Command successfully completed
mmchlicense: Propagating the cluster configuration data to all
affected nodes. This is an asynchronous process.

10) Start GPFS on gpfs02 node

[root@gpfs01 ~]# /usr/lpp/mmfs/bin/mmstartup -N gpfs02
Fri Apr 24 18:47:32 CEST 2015: mmstartup: Starting GPFS ...

[root@gpfs01 ~]# /usr/lpp/mmfs/bin/mmgetstate -a

Node number Node name GPFS state
------------------------------------------
1 gpfs01 active
2 gpfs02 active

11) Create NSD configuration

Now you have to create a file containing the configuration of the disk that will be used by gpfs. The disk used by GPFS are called Network Shared Disk (NSD) using GPFS terminology.

The file diskdef.txt showed below contain the NSD configuration used by GPFS.

Two NSD disk has been defined, their name are mynsd1 and mynsd2 and the device files of these disks are respectively /dev/sdb and /dev/sdc. Both disks will be used to store data and metadata.

[root@gpfs01 ~]# cat diskdef.txt

%nsd:
device=/dev/sdb
nsd=mynsd1
usage=dataAndMetadata

%nsd:
device=/dev/sdc
nsd=mynsd2
usage=dataAndMetadata

Configure the NSD using this configuration file

[root@gpfs01 ~]# /usr/lpp/mmfs/bin/mmcrnsd -F diskdef.txt
mmcrnsd: Processing disk sdb
mmcrnsd: Processing disk sdc
mmcrnsd: Propagating the cluster configuration data to all
affected nodes. This is an asynchronous process.

Show the NSD configuration

[root@gpfs01 ~]# /usr/lpp/mmfs/bin/mmlsnsd

File system Disk name NSD servers
---------------------------------------------------------------------------
(free disk) mynsd1 (directly attached)
(free disk) mynsd2 (directly attached)

12) Create GPFS filesystem

The following command create a gpfs filesystem called fs_gpfs01 using the NSD defined in diskdef.txt file that will be mounted on /fs_gpfs01 mount point

[root@gpfs01 ~]# /usr/lpp/mmfs/bin/mmcrfs fs_gpfs01 -F diskdef.txt -A yes -T /fs_gpfs01

The following disks of fs_gpfs01 will be formatted on node gpfs01.example.com:
mynsd1: size 10240 MB
mynsd2: size 10240 MB
Formatting file system ...
Disks up to size 103 GB can be added to storage pool system.
Creating Inode File
86 % complete on Fri Apr 24 19:30:27 2015
100 % complete on Fri Apr 24 19:30:27 2015
Creating Allocation Maps
Creating Log Files
Clearing Inode Allocation Map
Clearing Block Allocation Map
Formatting Allocation Map for storage pool system
Completed creation of file system /dev/fs_gpfs01.
mmcrfs: Propagating the cluster configuration data to all
affected nodes. This is an asynchronous process.

13) Mount/Unmount gpfs

This step shows some useful command to mount and unmount the gpfs filesystem

  • Mount on all nodes
[root@gpfs01 ~]# /usr/lpp/mmfs/bin/mmmount /fs_gpfs01 -a
Thu May 7 21:30:33 CEST 2015: mmmount: Mounting file systems ...
  • Unmount on all nodes
[root@gpfs01 ~]# /usr/lpp/mmfs/bin/mmumount /fs_gpfs01 -a
  • Mount or unmount on local node
[root@gpfs01 ~]# /usr/lpp/mmfs/bin/mmmount /fs_gpfs01
  • Mount or unmount on local node
[root@gpfs01 ~]# /usr/lpp/mmfs/bin/mmumount /fs_gpfs01

14) Verify mount gpfs filesystem

You can verify that gpfs filesystem is mounted using the command mmlsmount or using df

[root@gpfs01 ~]# /usr/lpp/mmfs/bin/mmlsmount all
File system fs_gpfs01 is mounted on 2 nodes.

[root@gpfs01 ~]# df
Filesystem 1K-blocks Used Available Use% Mounted on
/dev/mapper/vg_gpfs01-lv_root 17938864 1278848 15742104 8% /
tmpfs 1958512 0 1958512 0% /dev/shm
/dev/sda1 487652 69865 392187 16% /boot
/dev/fs_gpfs01 20971520 533248 20438272 3% /fs_gpfs01

15) Log location

GPFS filesystem are located in the directory gpfs /var/adm/ras

[root@gpfs01 ~]# ls -latr /var/adm/ras
total 20
drwxr-xr-x. 3 root root 4096 Apr 13 16:46 ..
lrwxrwxrwx 1 root root 35 Apr 24 20:33 mmfs.log.previous -> mmfs.log.2015.04.24.20.33.44.gpfs01
-rw-r--r-- 1 root root 3835 May 7 21:11 mmfs.log.2015.04.24.20.33.44.gpfs01
-rw-r--r-- 1 root root 195 May 7 21:11 mmsdrserv.log
lrwxrwxrwx 1 root root 35 May 7 21:24 mmfs.log.latest -> mmfs.log.2015.05.07.21.24.18.gpfs01
drwxr-xr-x. 2 root root 4096 May 7 21:24 .
-rw-r--r-- 1 root root 2717 May 7 21:24 mmfs.log.2015.05.07.21.24.18.gpfs01

GPFS records also system and disk failure using syslog, gpfs error log can be retrieved using the command:

grep "mmfs:" /var/log/messages

15) Add /usr/lpp/mmfs/bin/ to PATH environment variable

In order to avoid to use the full path for gpfs command the directory /usr/lpp/mmfs/bin/ can be added to the environment PATH variable of root

Add the line “PATH=$PATH:/usr/lpp/mmfs/bin/” in /root/.bash_profile before the line “export PATH”

[root@gpfs01 ~]# source .bash_profile