Spark & R: Loading Data into SparkSQL Data Frames
In this second tutorial (see the first one) we will introduce basic concepts about SparkSQL with R that you can find in the SparkR documentation, applied to the 2013 American Community Survey dataset. We will do two things, read data into a SparkSQL data frame, and have a quick look at the schema and what we have read. We will work with IPython/Jupyter notebooks here. In the first tutorial, we explained how to download files and start a Jupyter notebook using SparkR. In order to continue with this tutorial, you will need to take that first, in order to have the data downloaded locally.
All the code for these series of Spark and R tutorials can be found in its own GitHub repository. Go there and make it yours.
Instructions
As we already mentioned, for these series of tutorials/notebooks, we have used Jupyter with the IRkernel R kernel. You can find
installation instructions for you specific setup here.
A good way of using these notebooks is by first cloning the repo, and then
starting your Jupyter in pySpark mode. For example,
if we have a standalone Spark installation running in our localhost
with a
maximum of 6Gb per node assigned to IPython:
MASTER="spark://127.0.0.1:7077" SPARK_EXECUTOR_MEMORY="6G" IPYTHON_OPTS="notebook --pylab inline" ~/spark-1.5.0-bin-hadoop2.6/bin/pyspark
Notice that the path to the pyspark
command will depend on your specific
installation. So as requirement, you need to have
Spark installed in
the same machine you are going to start the IPython notebook
server.
For more Spark options see here. In general it works the rule of passign options
described in the form spark.executor.memory
as SPARK_EXECUTOR_MEMORY
when
calling IPython/pySpark.
Datasets
2013 American Community Survey dataset
Every year, the US Census Bureau runs the American Community Survey. In this survey, approximately 3.5 million
households are asked detailed questions about who they are and how they live. Many topics are covered, including
ancestry, education, work, transportation, internet use, and residency. You can directly to
the source
in order to know more about the data and get files for different years, longer periods, individual states, etc.
In any case, the starting up notebook
will download the 2013 data locally for later use with the rest of the notebooks.
The idea of using this dataset came from being recently announced in Kaggle
as part of their Kaggle scripts datasets. There you will be able to analyse the dataset on site, while sharing your results with other Kaggle
users. Highly recommended!
Creating a SparkSQL Context
In further notebooks, we will explore our data by loading them into SparkSQL data frames. But first we need to init a SparkSQL context. The first thing we need to do is to set up some environment variables and library paths as follows. Remember to replace the value assigned to SPARK_HOME
with your Spark home folder.
# Set Spark home and R libs
Sys.setenv(SPARK_HOME='/home/cluster/spark-1.5.0-bin-hadoop2.6')
.libPaths(c(file.path(Sys.getenv('SPARK_HOME'), 'R', 'lib'), .libPaths()))
Now we can load the SparkR
library as follows.
library(SparkR)
Attaching package: ‘SparkR’
The following objects are masked from ‘package:stats’:
filter, na.omit
The following objects are masked from ‘package:base’:
intersect, rbind, sample, subset, summary, table, transform
And now we can initialise the Spark context as in the official documentation. In our case we are use a standalone Spark cluster with one master and seven workers. If you are running Spark in local node, use just master='local'
. Additionally, we require a Spark package from Databricks to read CSV files (more on this in the next section).
sc <- sparkR.init(master='spark://169.254.206.2:7077', sparkPackages="com.databricks:spark-csv_2.11:1.2.0")
Launching java with spark-submit command /home/cluster/spark-1.5.0-bin-hadoop2.6/bin/spark-submit --packages com.databricks:spark-csv_2.11:1.2.0 sparkr-shell /tmp/RtmpyC8rUQ/backend_port3743359adb78
And finally we can start the SparkSQL context as follows.
sqlContext <- sparkRSQL.init(sc)
Creating SparkSQL Data Frames
Reading CSV Data Using Databricks CSV Extension
The easiest way to get our CSV data into a SparkSQL dataframe, is by using Databricks CSV extension to read SparkSQL dataframes directly from csv files. In any case, remember to set the right path for your data files in the first line, ours is /nfs/data/2013-acs/ss13husa.csv
.
housing_a_file_path <- file.path('', 'nfs','data','2013-acs','ss13husa.csv')
housing_b_file_path <- file.path('', 'nfs','data','2013-acs','ss13husb.csv')
Now let's read into a SparkSQL dataframe. We need to pass four parameters in addition to the sqlContext
:
- The file path.
header='true'
since ourcsv
files have a header with the column names.- Indicate that we want the library to infer the schema.
- And the source type (the Databricks package in this case).
system.time(
housing_a_df <- read.df(sqlContext,
housing_a_file_path,
header='true',
source = "com.databricks.spark.csv",
inferSchema='true')
)
user system elapsed
0.002 0.000 16.919
Let's have a look at the inferred schema.
system.time(
printSchema(housing_a_df)
)
root
|-- RT: string (nullable = true)
|-- SERIALNO: integer (nullable = true)
|-- DIVISION: integer (nullable = true)
|-- PUMA: integer (nullable = true)
|-- REGION: integer (nullable = true)
|-- ST: integer (nullable = true)
|-- ADJHSG: integer (nullable = true)
|-- ADJINC: integer (nullable = true)
|-- WGTP: integer (nullable = true)
|-- NP: integer (nullable = true)
|-- TYPE: integer (nullable = true)
|-- ACCESS: integer (nullable = true)
|-- ACR: integer (nullable = true)
|-- AGS: integer (nullable = true)
|-- BATH: integer (nullable = true)
|-- BDSP: integer (nullable = true)
|-- BLD: integer (nullable = true)
|-- BROADBND: integer (nullable = true)
|-- BUS: integer (nullable = true)
|-- COMPOTHX: integer (nullable = true)
|-- CONP: integer (nullable = true)
|-- DIALUP: integer (nullable = true)
|-- DSL: integer (nullable = true)
|-- ELEP: integer (nullable = true)
|-- FIBEROP: integer (nullable = true)
|-- FS: integer (nullable = true)
|-- FULP: integer (nullable = true)
|-- GASP: integer (nullable = true)
|-- HANDHELD: integer (nullable = true)
|-- HFL: integer (nullable = true)
|-- INSP: integer (nullable = true)
|-- LAPTOP: integer (nullable = true)
|-- MHP: integer (nullable = true)
|-- MODEM: integer (nullable = true)
|-- MRGI: integer (nullable = true)
|-- MRGP: integer (nullable = true)
|-- MRGT: integer (nullable = true)
|-- MRGX: integer (nullable = true)
|-- OTHSVCEX: integer (nullable = true)
|-- REFR: integer (nullable = true)
|-- RMSP: integer (nullable = true)
|-- RNTM: integer (nullable = true)
|-- RNTP: integer (nullable = true)
|-- RWAT: integer (nullable = true)
|-- RWATPR: integer (nullable = true)
|-- SATELLITE: integer (nullable = true)
|-- SINK: integer (nullable = true)
|-- SMP: integer (nullable = true)
|-- STOV: integer (nullable = true)
|-- TEL: integer (nullable = true)
|-- TEN: integer (nullable = true)
|-- TOIL: integer (nullable = true)
|-- VACS: integer (nullable = true)
|-- VALP: integer (nullable = true)
|-- VEH: integer (nullable = true)
|-- WATP: integer (nullable = true)
|-- YBL: integer (nullable = true)
|-- FES: integer (nullable = true)
|-- FFINCP: integer (nullable = true)
|-- FGRNTP: integer (nullable = true)
|-- FHINCP: integer (nullable = true)
|-- FINCP: integer (nullable = true)
|-- FPARC: integer (nullable = true)
|-- FSMOCP: integer (nullable = true)
|-- GRNTP: integer (nullable = true)
|-- GRPIP: integer (nullable = true)
|-- HHL: integer (nullable = true)
|-- HHT: integer (nullable = true)
|-- HINCP: integer (nullable = true)
|-- HUGCL: integer (nullable = true)
|-- HUPAC: integer (nullable = true)
|-- HUPAOC: integer (nullable = true)
|-- HUPARC: integer (nullable = true)
|-- KIT: integer (nullable = true)
|-- LNGI: integer (nullable = true)
|-- MULTG: integer (nullable = true)
|-- MV: integer (nullable = true)
|-- NOC: integer (nullable = true)
|-- NPF: integer (nullable = true)
|-- NPP: integer (nullable = true)
|-- NR: integer (nullable = true)
|-- NRC: integer (nullable = true)
|-- OCPIP: integer (nullable = true)
|-- PARTNER: integer (nullable = true)
|-- PLM: integer (nullable = true)
|-- PSF: integer (nullable = true)
|-- R18: integer (nullable = true)
|-- R60: integer (nullable = true)
|-- R65: integer (nullable = true)
|-- RESMODE: integer (nullable = true)
|-- SMOCP: integer (nullable = true)
|-- SMX: integer (nullable = true)
|-- SRNT: integer (nullable = true)
|-- SSMC: integer (nullable = true)
|-- SVAL: integer (nullable = true)
|-- TAXP: integer (nullable = true)
|-- WIF: integer (nullable = true)
|-- WKEXREL: integer (nullable = true)
|-- WORKSTAT: integer (nullable = true)
|-- FACCESSP: integer (nullable = true)
|-- FACRP: integer (nullable = true)
|-- FAGSP: integer (nullable = true)
|-- FBATHP: integer (nullable = true)
|-- FBDSP: integer (nullable = true)
|-- FBLDP: integer (nullable = true)
|-- FBROADBNDP: integer (nullable = true)
|-- FBUSP: integer (nullable = true)
|-- FCOMPOTHXP: integer (nullable = true)
|-- FCONP: integer (nullable = true)
|-- FDIALUPP: integer (nullable = true)
|-- FDSLP: integer (nullable = true)
|-- FELEP: integer (nullable = true)
|-- FFIBEROPP: integer (nullable = true)
|-- FFSP: integer (nullable = true)
|-- FFULP: integer (nullable = true)
|-- FGASP: integer (nullable = true)
|-- FHANDHELDP: integer (nullable = true)
|-- FHFLP: integer (nullable = true)
|-- FINSP: integer (nullable = true)
|-- FKITP: integer (nullable = true)
|-- FLAPTOPP: integer (nullable = true)
|-- FMHP: integer (nullable = true)
|-- FMODEMP: integer (nullable = true)
|-- FMRGIP: integer (nullable = true)
|-- FMRGP: integer (nullable = true)
|-- FMRGTP: integer (nullable = true)
|-- FMRGXP: integer (nullable = true)
|-- FMVP: integer (nullable = true)
|-- FOTHSVCEXP: integer (nullable = true)
|-- FPLMP: integer (nullable = true)
|-- FREFRP: integer (nullable = true)
|-- FRMSP: integer (nullable = true)
|-- FRNTMP: integer (nullable = true)
|-- FRNTP: integer (nullable = true)
|-- FRWATP: integer (nullable = true)
|-- FRWATPRP: integer (nullable = true)
|-- FSATELLITEP: integer (nullable = true)
|-- FSINKP: integer (nullable = true)
|-- FSMP: integer (nullable = true)
|-- FSMXHP: integer (nullable = true)
|-- FSMXSP: integer (nullable = true)
|-- FSTOVP: integer (nullable = true)
|-- FTAXP: integer (nullable = true)
|-- FTELP: integer (nullable = true)
|-- FTENP: integer (nullable = true)
|-- FTOILP: integer (nullable = true)
|-- FVACSP: integer (nullable = true)
|-- FVALP: integer (nullable = true)
|-- FVEHP: integer (nullable = true)
|-- FWATP: integer (nullable = true)
|-- FYBLP: integer (nullable = true)
|-- wgtp1: integer (nullable = true)
|-- wgtp2: integer (nullable = true)
|-- wgtp3: integer (nullable = true)
|-- wgtp4: integer (nullable = true)
|-- wgtp5: integer (nullable = true)
|-- wgtp6: integer (nullable = true)
|-- wgtp7: integer (nullable = true)
|-- wgtp8: integer (nullable = true)
|-- wgtp9: integer (nullable = true)
|-- wgtp10: integer (nullable = true)
|-- wgtp11: integer (nullable = true)
|-- wgtp12: integer (nullable = true)
|-- wgtp13: integer (nullable = true)
|-- wgtp14: integer (nullable = true)
|-- wgtp15: integer (nullable = true)
|-- wgtp16: integer (nullable = true)
|-- wgtp17: integer (nullable = true)
|-- wgtp18: integer (nullable = true)
|-- wgtp19: integer (nullable = true)
|-- wgtp20: integer (nullable = true)
|-- wgtp21: integer (nullable = true)
|-- wgtp22: integer (nullable = true)
|-- wgtp23: integer (nullable = true)
|-- wgtp24: integer (nullable = true)
|-- wgtp25: integer (nullable = true)
|-- wgtp26: integer (nullable = true)
|-- wgtp27: integer (nullable = true)
|-- wgtp28: integer (nullable = true)
|-- wgtp29: integer (nullable = true)
|-- wgtp30: integer (nullable = true)
|-- wgtp31: integer (nullable = true)
|-- wgtp32: integer (nullable = true)
|-- wgtp33: integer (nullable = true)
|-- wgtp34: integer (nullable = true)
|-- wgtp35: integer (nullable = true)
|-- wgtp36: integer (nullable = true)
|-- wgtp37: integer (nullable = true)
|-- wgtp38: integer (nullable = true)
|-- wgtp39: integer (nullable = true)
|-- wgtp40: integer (nullable = true)
|-- wgtp41: integer (nullable = true)
|-- wgtp42: integer (nullable = true)
|-- wgtp43: integer (nullable = true)
|-- wgtp44: integer (nullable = true)
|-- wgtp45: integer (nullable = true)
|-- wgtp46: integer (nullable = true)
|-- wgtp47: integer (nullable = true)
|-- wgtp48: integer (nullable = true)
|-- wgtp49: integer (nullable = true)
|-- wgtp50: integer (nullable = true)
|-- wgtp51: integer (nullable = true)
|-- wgtp52: integer (nullable = true)
|-- wgtp53: integer (nullable = true)
|-- wgtp54: integer (nullable = true)
|-- wgtp55: integer (nullable = true)
|-- wgtp56: integer (nullable = true)
|-- wgtp57: integer (nullable = true)
|-- wgtp58: integer (nullable = true)
|-- wgtp59: integer (nullable = true)
|-- wgtp60: integer (nullable = true)
|-- wgtp61: integer (nullable = true)
|-- wgtp62: integer (nullable = true)
|-- wgtp63: integer (nullable = true)
|-- wgtp64: integer (nullable = true)
|-- wgtp65: integer (nullable = true)
|-- wgtp66: integer (nullable = true)
|-- wgtp67: integer (nullable = true)
|-- wgtp68: integer (nullable = true)
|-- wgtp69: integer (nullable = true)
|-- wgtp70: integer (nullable = true)
|-- wgtp71: integer (nullable = true)
|-- wgtp72: integer (nullable = true)
|-- wgtp73: integer (nullable = true)
|-- wgtp74: integer (nullable = true)
|-- wgtp75: integer (nullable = true)
|-- wgtp76: integer (nullable = true)
|-- wgtp77: integer (nullable = true)
|-- wgtp78: integer (nullable = true)
|-- wgtp79: integer (nullable = true)
|-- wgtp80: integer (nullable = true)
user system elapsed
0.002 0.000 0.062
Looks good. Let's have a look at the first few rows.
head(housing_a_df)
. | RT | SERIALNO | DIVISION | PUMA | REGION | ST | ADJHSG | ADJINC | WGTP | NP | ellip.h | wgtp71 | wgtp72 | wgtp73 | wgtp74 | wgtp75 | wgtp76 | wgtp77 | wgtp78 | wgtp79 | wgtp80 | . |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
1 | H | 84 | 6 | 2600 | 3 | 1 | 1000000 | 1007549 | 0 | 1 | ⋯ | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | |
2 | H | 154 | 6 | 2500 | 3 | 1 | 1000000 | 1007549 | 51 | 4 | ⋯ | 86 | 53 | 59 | 84 | 49 | 15 | 15 | 20 | 50 | 16 | |
3 | H | 156 | 6 | 1700 | 3 | 1 | 1000000 | 1007549 | 449 | 1 | ⋯ | 161 | 530 | 601 | 579 | 341 | 378 | 387 | 421 | 621 | 486 | |
4 | H | 160 | 6 | 2200 | 3 | 1 | 1000000 | 1007549 | 16 | 3 | ⋯ | 31 | 24 | 33 | 7 | 7 | 13 | 18 | 23 | 23 | 5 | |
5 | H | 231 | 6 | 2400 | 3 | 1 | 1000000 | 1007549 | 52 | 1 | ⋯ | 21 | 18 | 37 | 49 | 103 | 38 | 49 | 51 | 46 | 47 | |
6 | H | 286 | 6 | 900 | 3 | 1 | 1000000 | 1007549 | 76 | 1 | ⋯ | 128 | 25 | 68 | 66 | 80 | 26 | 66 | 164 | 88 | 24 |
And let's count how many rows do we have in the first dataset. For that we will use nrow
as we do with regular R data frames. It is just a definition of nrow
by the package SparkR.
nrow(housing_a_df)
756065
Let's read the second housing data frame and count the number of rows.
system.time(
housing_b_df <- read.df(sqlContext,
housing_b_file_path,
header='true',
source = "com.databricks.spark.csv",
inferSchema='true')
)
user system elapsed
0.128 0.016 9.666
print(nrow(housing_b_df))
[1] 720248
Merging Data Frames
Now we can use rbind()
as we do with regular R data frames to put both of them together. Again, SparkR redefines many of the common R functions to work with SparkSQL data frames. Let's actually use rbind
as follows.
housing_df <- rbind(housing_a_df, housing_b_df)
And let's count how many rows do we have in the complete data frame.
system.time(
housing_samples <- nrow(housing_df)
)
print(housing_samples)
user system elapsed
0.001 0.000 17.206
[1] 1476313
Finally, let's get a glimpse of what is to explore data using SparkR by using the summary
function on the data frame. If we have a look at the documentation using '?summary' we can see that we are redirected to describe
, that optionally accepts column names. Let's use it with the whole data frame, that is, the 231 columns and 1476313 rows. Note: We use collect
here because the results of describe
are given as a DataFrame
object and we need to print them in the notebook.
system.time(
housing_summary <- describe(housing_df)
)
user system elapsed
0.174 0.016 197.755
collect(housing_summary)
. | summary | RT | SERIALNO | DIVISION | PUMA | REGION | ST | ADJHSG | ADJINC | WGTP | ellip.h | wgtp71 | wgtp72 | wgtp73 | wgtp74 | wgtp75 | wgtp76 | wgtp77 | wgtp78 | wgtp79 | wgtp80 | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
1 | count | 1476313 | 1476313 | 1476313 | 1476313 | 1476313 | 1476313 | 1476313 | 1476313 | 1476313 | ⋯ | 1476313 | 1476313 | 1476313 | 1476313 | 1476313 | 1476313 | 1476313 | 1476313 | 1476313 | 1476313 | |
2 | mean | NA | 746447.7617598707 | 5.114928202894643 | 4458.457590632881 | 2.6305945961323918 | 27.8048550679971 | 1000000.0 | 1007549.0 | 89.95933585899468 | ⋯ | 89.95847289836234 | 89.9587072660066 | 89.95868220357065 | 89.95941511048132 | 89.95922341671448 | 89.95888473514763 | 89.95931147392186 | 89.95866933367111 | 89.95865239959276 | 89.95870862073286 | |
3 | stddev | NA | NaN | 2.476670855019605 | NaN | 1.0144176501154056 | 15.894404191598897 | NaN | NaN | 80.66731786485431 | ⋯ | 99.43323120400835 | 100.08749152422924 | 98.37291356061534 | 99.52607767089758 | 98.67766416960376 | 99.11341616484188 | 99.05644177173914 | 98.94559614502197 | 97.87387141885895 | 100.18396018153416 | |
4 | min | H | 1 | 1 | 100 | 1 | 1 | 1000000 | 1007549 | 0 | ⋯ | -8 | -494 | -14 | -151 | -38 | -5 | -3 | -16 | -22 | -243 | |
5 | max | H | 1492845 | 9 | 70301 | 4 | 56 | 1000000 | 1007549 | 1829 | ⋯ | 2282 | 2328 | 2393 | 2348 | 2263 | 2310 | 2131 | 2794 | 2710 | 2447 |
Or we can select individual column summaries using select
as follows (here is a dictionary of each column meaning) where VALP
is the property value.
collect(select(housing_summary,"VALP"))
| . | VALP | |
|------|--------------------|
| 1 | 859691 |
| 2 | 247682.84302150423 |
| 3 | NaN |
| 4 | 100 |
| 5 | 4775000 |
Conclusions
And that's it. In this tutorial we have shown how to load a CSV file into an SparkSQL data frame using SparkR. We also had a look at the data loaded, mainly to the number of samples loaded, and the data summary.
We already started to see how the SparkR implementation tries to create a language that sounds familiar to the R user, not just by using a data frame abstraction but also by defining a series of functions equivalent to the regular R ones. However we have to remember that SparkR data frames are distributed data structures that open the door to scalable data analysis.
And finally, remember that all the code for these series of Spark and R tutorials can be found in its own GitHub repository. Go there and make it yours.
Hii you are providing good information.Thanks for sharing AND Data Scientist Course in Hyderabad, Data Analytics Courses, Data Science Courses, Business Analytics Training ISB HYD Trained Faculty with 10 yrs of Exp See below link
data-science-course-in-Ameerpet