Codementor Events

Spark & R: data frame operations with SparkR

Published Sep 21, 2015Last updated Apr 12, 2017
Spark & R: data frame operations with SparkR

In this third tutorial (see the previous one) we will introduce more advanced concepts about SparkSQL with R that you can find in the SparkR documentation, applied to the 2013 American Community Survey housing data. These concepts are related with data frame manipulation, including data slicing, summary statistics, and aggregations. We will use them in combination with ggplot2 visualisations. We will explain what we do at every step but, if you want to go deeper into ggplot2 for exploratory data analysis, I did this Udacity on-line course in the past and I highly recommend it!

All the code for these series of Spark and R tutorials can be found in its own GitHub repository. Go there and make it yours.

Creating a SparkSQL Context and Loading Data

In order to explore our data, we first need to load it into a SparkSQL data frame. But first we need to init a SparkSQL context. The first thing we need to do is to set up some environment variables and library paths as follows. Remember to replace the value assigned to SPARK_HOME with your Spark home folder.

# Set Spark home and R libs
Sys.setenv(SPARK_HOME='/home/cluster/spark-1.5.0-bin-hadoop2.6')
.libPaths(c(file.path(Sys.getenv('SPARK_HOME'), 'R', 'lib'), .libPaths()))

Now we can load the SparkR library as follows.

library(SparkR)
    Attaching package: ‘SparkR’
    
    The following objects are masked from ‘package:stats’:
    
        filter, na.omit
    
    The following objects are masked from ‘package:base’:
    
        intersect, rbind, sample, subset, summary, table, transform

And now we can initialise the Spark context as in the official documentation. In our case we are use a standalone Spark cluster with one master and seven workers. If you are running Spark in local node, use just master='local'. Additionally, we require a Spark package from Databricks to read CSV files (more on this in the previous notebook).

sc <- sparkR.init(master='spark://169.254.206.2:7077', sparkPackages="com.databricks:spark-csv_2.11:1.2.0")
    Launching java with spark-submit command /home/cluster/spark-1.5.0-bin-hadoop2.6/bin/spark-submit  --packages com.databricks:spark-csv_2.11:1.2.0 sparkr-shell /tmp/RtmpfRY7gu/backend_port4c2413c05644 

And finally we can start the SparkSQL context as follows.

sqlContext <- sparkRSQL.init(sc)

Now that we have our SparkSQL context ready, we can use it to load our CSV data into data frames. We have downloaded our 2013 American Community Survey dataset files in notebook 0, so they should be stored locally. Remember to set the right path for your data files in the first line, ours is /nfs/data/2013-acs/ss13husa.csv.

housing_a_file_path <- file.path('', 'nfs','data','2013-acs','ss13husa.csv')
housing_b_file_path <- file.path('', 'nfs','data','2013-acs','ss13husb.csv')

Now let's read into a SparkSQL dataframe. We need to pass four parameters in addition to the sqlContext:

  • The file path.
  • header='true' since our csv files have a header with the column names.
  • Indicate that we want the library to infer the schema.
  • And the source type (the Databricks package in this case).

And we have two separate files for both, housing and population data. We need to join them.

housing_a_df <- read.df(sqlContext, 
                            housing_a_file_path, 
                            header='true', 
                            source = "com.databricks.spark.csv", 
                            inferSchema='true')
housing_b_df <- read.df(sqlContext, 
                            housing_b_file_path, 
                            header='true', 
                            source = "com.databricks.spark.csv", 
                            inferSchema='true')
housing_df <- rbind(housing_a_df, housing_b_df)

Let's check that we have everything there by counting the files and listing a few of them.

nrow(housing_df)
1476313
head(housing_df)
. RT SERIALNO DIVISION PUMA REGION ST ADJHSG ADJINC WGTP NP ellip.h wgtp71 wgtp72 wgtp73 wgtp74 wgtp75 wgtp76 wgtp77 wgtp78 wgtp79 wgtp80
1 H 84 6 2600 3 1 1000000 1007549 0 1 0 0 0 0 0 0 0 0 0 0
2 H 154 6 2500 3 1 1000000 1007549 51 4 86 53 59 84 49 15 15 20 50 16
3 H 156 6 1700 3 1 1000000 1007549 449 1 161 530 601 579 341 378 387 421 621 486
4 H 160 6 2200 3 1 1000000 1007549 16 3 31 24 33 7 7 13 18 23 23 5
5 H 231 6 2400 3 1 1000000 1007549 52 1 21 18 37 49 103 38 49 51 46 47
6 H 286 6 900 3 1 1000000 1007549 76 1 128 25 68 66 80 26 66 164 88 24

Giving ggplot2 a try

Before we dive into data selection and aggregations, let's try plotting something using ggplot2. We will use this library all the time during our exploratory data analysis, and we better mke sure how to use it with SparkSQL results.

# if it isn't installed, 
# install install.packages("ggplot2") 
# from the R console, specifying a CRAN mirror
    
library(ggplot2)

What if we directly try to use our SparkSQL DataFrame class into a ggplot?

c <- ggplot(data=housing_df, aes(x=factor(REGION)))
    Error: ggplot2 doesn't know how to deal with data of class DataFrame

Obviously it doesn't work that way. The ggplot function doesn't know how to deal with that type of distributed data frames (the Spark ones). Instead, we need to collect the data locally as follows.

housing_region_df_local <- collect(select(housing_df,"REGION"))

Let's have a look at what we got.

str(housing_region_df_local)
    'data.frame':	1476313 obs. of  1 variable:
     $ REGION: int  3 3 3 3 3 3 3 3 3 3 ...

That is, when we collect results from a SparkSQL DataFrame we get a regular R data.frame. Very convenient since we can manipulate it as we need to. For example, let's convert that int values we have for REGION to a factor with the proper names. From our data dictionary we will get the meaning of the REGION variable, as well as the different values it can take.

housing_region_df_local$REGION <- factor(
        x=housing_region_df_local$REGION, 
        levels=c(1,2,3,4,9),
        labels=c('Northeast', 'Midwest','South','West','Puerto Rico')
)

And now we are ready to create the ggplot object as follows.

c <- ggplot(data=housing_region_df_local, aes(x=factor(REGION)))

And now we can give the plot a proper representation (e.g. a bar plot).

c + geom_bar() + xlab("Region")

enter image description here

We will always follow the same approach. First we make some sort of oepration with the SparkSQL DataFrame object (e.g. a selection), then we collect results, and then we prepare the resulting data.frame to be represented using ggplot2. But think about the previous. We just represented all the samples for a given column. That is almost a million and a half data points, and we are pushing our local R environment and ggplot2 a lot. In the case of the bar plot we didn't really experience any problems, cause is sort of aggregating data inside. But we will struggle to do scatter plots this way. The preferred kind of visualisations will be those that come from data from aggreations on SparkSQL DataFrames as we will see in further sections.

Data Selection

In this section we will demonstrate how to select data from a SparkSQL DataFrame object using SparkR.

select and selectExpr

We already made use of the select function. but let's have a look at the documentation.

?select

We see there that have two flavours of select. One that gets a list of column names (this is the one we used so far) and another one called selectExpr that we pass a string containing a SQL expression.

Of course we can pass more than a column name.

collect(select(housing_df, "REGION", "VALP"))
. REGION VALP
1 3 NA
2 3 25000
3 3 80000
4 3 NA
5 3 NA
6 3 18000
7 3 390000
8 3 120000
9 3 NA
10 3 160000
11 3 NA
12 3 NA
13 3 NA
14 3 NA
15 3 40000
16 3 60000
17 3 60000
18 3 NA
19 3 250000
20 3 110000
21 3 190000
22 3 160000
23 3 NA
24 3 750000
25 3 300000
26 3 NA
27 3 NA
28 3 100000
29 3 20000
30 3 70000
31
1476284 4 130000
1476285 4 NA
1476286 4 NA
1476287 4 150000
1476288 4 110000
1476289 4 NA
1476290 4 200000
1476291 4 NA
1476292 4 250000
1476293 4 NA
1476294 4 500000
1476295 4 7100
1476296 4 175000
1476297 4 500000
1476298 4 NA
1476299 4 25000
1476300 4 150000
1476301 4 240000
1476302 4 NA
1476303 4 NA
1476304 4 NA
1476305 4 NA
1476306 4 12000
1476307 4 56000
1476308 4 99000
1476309 4 NA
1476310 4 NA
1476311 4 150000
1476312 4 NA
1476313 4 NA

When passing column names we can also use the R notation data.frame$column.name so familiar to R users. How does this notation compares to the name-based one in terms of performance?

system.time(
        collect(select(housing_df, housing_df$VALP))
)
       user  system elapsed 
     30.086   0.032  48.046 
system.time(
        collect(select(housing_df, "VALP"))
)
       user  system elapsed 
     28.766   0.012  46.358 

When using the $ notation, we can even pass expressions as follows.

head(select(housing_df, housing_df$VALP / 100))
. (VALP / 100.0)
1 NA
2 250
3 800
4 NA
5 NA
6 180

So what's the point of selectExpr then? Well, we can pass more complex SQL expressions. For example.

head(selectExpr(housing_df, "(VALP / 100) as VALP_by_100"))
. VALP_by_100
1 NA
2 250
3 800
4 NA
5 NA
6 180

filter, subset, and sql

The previous functions allow us selecting columns. In order to select rows, we will use filter and contains. Call the docs as follows if you want to know more about it.

?filter

With filter we filter the rows of a DataFrame according to a given condition that we pass as argument. We can define conditions as SQL conditions using column names or by using the $ notation.

For example, and following with our property values column, let's select property values higher than 1000 for the south region.

system.time(
        housing_valp_1000 <- collect(filter(select(housing_df, "REGION", "VALP"), "VALP > 1000"))
)
       user  system elapsed 
     38.043   0.184  56.259 
housing_valp_1000
. REGION VALP
1 3 25000
2 3 80000
3 3 18000
4 3 390000
5 3 120000
6 3 160000
7 3 40000
8 3 60000
9 3 60000
10 3 250000
11 3 110000
12 3 190000
13 3 160000
14 3 750000
15 3 300000
16 3 100000
17 3 20000
18 3 70000
19 3 125000
20 3 1843000
21 3 829000
22 3 84000
23 3 150000
24 3 130000
25 3 90000
26 3 220000
27 3 225000
28 3 65000
29 3 80000
30 3 135000
31
847115 4 120000
847116 4 200000
847117 4 365000
847118 4 600000
847119 4 100000
847120 4 124000
847121 4 200000
847122 4 160000
847123 4 250000
847124 4 285000
847125 4 100000
847126 4 205000
847127 4 189000
847128 4 350000
847129 4 130000
847130 4 150000
847131 4 110000
847132 4 200000
847133 4 250000
847134 4 500000
847135 4 7100
847136 4 175000
847137 4 500000
847138 4 25000
847139 4 150000
847140 4 240000
847141 4 12000
847142 4 56000
847143 4 99000
847144 4 150000

Take into account that we can also perform the previous selection and filtering by using SQL queries agains the SparkSQL DataFrame. In order to do that we need to register the table as follows.

registerTempTable(housing_df, "housing")

And then we can use SparkR sql function using the sqlContext as follows.

system.time(
        housing_valp_1000_sql <- collect(sql(sqlContext, "SELECT REGION, VALP FROM housing WHERE VALP >= 1000"))
)
       user  system elapsed 
     38.862   0.008  56.747 
housing_valp_1000_sql
. REGION VALP
1 3 25000
2 3 80000
3 3 18000
4 3 390000
5 3 120000
6 3 160000
7 3 40000
8 3 60000
9 3 60000
10 3 250000
11 3 110000
12 3 190000
13 3 160000
14 3 750000
15 3 300000
16 3 100000
17 3 20000
18 3 70000
19 3 125000
20 3 1843000
21 3 829000
22 3 84000
23 3 150000
24 3 130000
25 3 90000
26 3 220000
27 3 225000
28 3 65000
29 3 80000
30 3 135000
31
848420 4 120000
848421 4 200000
848422 4 365000
848423 4 600000
848424 4 100000
848425 4 124000
848426 4 200000
848427 4 160000
848428 4 250000
848429 4 285000
848430 4 100000
848431 4 205000
848432 4 189000
848433 4 350000
848434 4 130000
848435 4 150000
848436 4 110000
848437 4 200000
848438 4 250000
848439 4 500000
848440 4 7100
848441 4 175000
848442 4 500000
848443 4 25000
848444 4 150000
848445 4 240000
848446 4 12000
848447 4 56000
848448 4 99000
848449 4 150000

This last method might result more clear and flexible when we need to perform complex queries with multiple conditions. Using filter and select combinations might get verbose versus the clarity of the SQL lingua franca.

But there is another way of subsetting data frames in a functional way. A way that is very familiar to R users. It is by using the function subset. Just have a look at the help page.

?subset

And we use it as follows.

system.time(
        housing_valp_1000_subset <- collect(subset(
            housing_df, housing_df$VALP>1000, 
            c("REGION","VALP")
        ))
)
       user  system elapsed 
     39.751   0.020  57.425 
housing_valp_1000_subset
. REGION VALP
1 3 25000
2 3 80000
3 3 18000
4 3 390000
5 3 120000
6 3 160000
7 3 40000
8 3 60000
9 3 60000
10 3 250000
11 3 110000
12 3 190000
13 3 160000
14 3 750000
15 3 300000
16 3 100000
17 3 20000
18 3 70000
19 3 125000
20 3 1843000
21 3 829000
22 3 84000
23 3 150000
24 3 130000
25 3 90000
26 3 220000
27 3 225000
28 3 65000
29 3 80000
30 3 135000
31
847115 4 120000
847116 4 200000
847117 4 365000
847118 4 600000
847119 4 100000
847120 4 124000
847121 4 200000
847122 4 160000
847123 4 250000
847124 4 285000
847125 4 100000
847126 4 205000
847127 4 189000
847128 4 350000
847129 4 130000
847130 4 150000
847131 4 110000
847132 4 200000
847133 4 250000
847134 4 500000
847135 4 7100
847136 4 175000
847137 4 500000
847138 4 25000
847139 4 150000
847140 4 240000
847141 4 12000
847142 4 56000
847143 4 99000
847144 4 150000

Even more, we can use the [] notation we use with R data.frame objects with SparkSQL DataFrames thanks to SparkR. For example.

system.time(
        housing_valp_1000_bracket <- collect(
            housing_df[housing_df$VALP>1000, c("REGION","VALP")]
        )
)
       user  system elapsed 
     39.090   0.013  56.381 
housing_valp_1000_bracket
. REGION VALP
1 3 25000
2 3 80000
3 3 18000
4 3 390000
5 3 120000
6 3 160000
7 3 40000
8 3 60000
9 3 60000
10 3 250000
11 3 110000
12 3 190000
13 3 160000
14 3 750000
15 3 300000
16 3 100000
17 3 20000
18 3 70000
19 3 125000
20 3 1843000
21 3 829000
22 3 84000
23 3 150000
24 3 130000
25 3 90000
26 3 220000
27 3 225000
28 3 65000
29 3 80000
30 3 135000
31
847115 4 120000
847116 4 200000
847117 4 365000
847118 4 600000
847119 4 100000
847120 4 124000
847121 4 200000
847122 4 160000
847123 4 250000
847124 4 285000
847125 4 100000
847126 4 205000
847127 4 189000
847128 4 350000
847129 4 130000
847130 4 150000
847131 4 110000
847132 4 200000
847133 4 250000
847134 4 500000
847135 4 7100
847136 4 175000
847137 4 500000
847138 4 25000
847139 4 150000
847140 4 240000
847141 4 12000
847142 4 56000
847143 4 99000
847144 4 150000

That is, we have up to four different ways of subsetting a data frame with SparkR. We can plot any of the previous resulting data frames with a ggplot2 chart as we did before.

housing_valp_1000_bracket$REGION <- factor(
        x=housing_valp_1000_bracket$REGION, 
        levels=c(1,2,3,4,9),
        labels=c('Northeast', 'Midwest','South','West','Puerto Rico')
)
c <- ggplot(data=housing_region_df_local, aes(x=factor(REGION)))
c + geom_bar() + ggtitle("Samples with VALP>1000") + xlab("Region")

enter image description here

Finally, a function that is useful, specially when imputing missing values in data frames is isNaN that can be applied to columns as we do with regular R data frames.

Data Aggregation and Sorting

In the previous notebook we already had a look at summary/describe that we can pass column names and get summary statistics that way. If we want instead to be specific about the statistic we want, SparkR also defines the following aggregation functions that we can apply to DataFrame objects columns:

We use them passing columns with the $ notation, and they return columns, so they need to be part of a select call for a DataFrame. For example.

collect(select(housing_df, avg(housing_df$VALP)))
. avg(VALP)
1 247682.8

groupBy and summarize / agg

A basic operation when doing data aggregations on data frames is groupBy. Basically it groups the DataFrame we pass using the specified columns, so we can run aggregation on them. We use it in combination with summarize/agg in order to apply aggregation functions. For example, using the previous avg example, let's averagle property values by region as follows.

housing_avg_valp <- collect(agg(
        groupBy(housing_df, "REGION"), 
        NUM_PROPERTIES=n(housing_df$REGION),
        AVG_VALP = avg(housing_df$VALP), 
        MAX_VALUE=max(housing_df$VALP),
        MIN_VALUE=min(housing_df$VALP)
))
housing_avg_valp$REGION <- factor(
        housing_avg_valp$REGION, 
        levels=c(1,2,3,4,9), 
        labels=c('Northeast', 'Midwest','South','West','Puerto Rico')
)
housing_avg_valp
. REGION NUM_PROPERTIES AVG_VALP MAX_VALUE MIN_VALUE
1 Northeast 268285 314078.1 4775000 100
2 Midwest 328148 168305.3 2381000 100
3 South 560520 204236.9 3934000 100
4 West 319360 365559.3 4727000 110

We can add as many summary/aggregation columns as functions we want to calculate. There is also the possibility of adding several levels of grouping. For example, let's add the number of bedrooms (BDSP in our dictionary) as follows.

housing_avg_valp <- collect(agg(
        groupBy(housing_df, "REGION", "BDSP"), 
        NUM_PROPERTIES=n(housing_df$REGION),
        AVG_VALP = avg(housing_df$VALP), 
        MAX_VALUE=max(housing_df$VALP),
        MIN_VALUE=min(housing_df$VALP)
))
housing_avg_valp$REGION <- factor(
        housing_avg_valp$REGION, 
        levels=c(1,2,3,4,9), 
        labels=c('Northeast', 'Midwest','South','West','Puerto Rico')
)
housing_avg_valp
. REGION BDSP NUM_PROPERTIES AVG_VALP MAX_VALUE MIN_VALUE
1 West NA 30339 NA NA NA
2 West 0 7750 226487.3 4727000 120
3 West 1 32620 212315 4727000 110
4 West 2 74334 258654.5 4727000 110
5 West 3 106532 325764.1 4727000 110
6 West 4 51785 459180.8 4727000 110
7 West 5 12533 607017.6 4727000 120
8 West 6 929 391539.8 2198000 170
9 West 7 555 518111.6 3972000 160
10 West 8 87 478757.2 2221000 230
11 West 9 374 480418.7 2386000 140
12 West 10 1486 975835.3 4727000 250
13 West 19 36 671500 1100000 65000
14 Northeast NA 31319 NA NA NA
15 Northeast 0 4951 311725.2 4775000 170
16 Northeast 1 30030 268071.8 4775000 110
17 Northeast 2 59301 233250.9 4775000 110
18 Northeast 3 89835 262429.5 4775000 110
19 Northeast 4 40622 393485.4 4775000 100
20 Northeast 5 8974 599000.8 4775000 110
21 Northeast 6 38 335772.7 750000 180
22 Northeast 7 273 1305308 4532000 280
23 Northeast 8 2228 864495.4 4775000 200
24 Northeast 9 56 607983.3 2383000 200
25 Northeast 10 642 416859.6 1826000 180
26 Northeast 13 16 298750 550000 150000
27 Midwest NA 32390 NA NA NA
28 Midwest 0 3588 131162.5 1688000 120
29 Midwest 1 24826 100265.2 2381000 110
30 Midwest 2 76965 112534.6 2381000 110
31 Midwest 3 126023 149800.3 2381000 100
32 Midwest 4 51108 229332.6 2381000 100
33 Midwest 5 10804 314773.4 2381000 110
34 Midwest 7 53 282746.7 1548000 1000
35 Midwest 8 812 359498.4 1562000 150
36 Midwest 9 1261 424151.2 2381000 150
37 Midwest 10 318 344710.9 1659000 1000
38 South NA 54208 NA NA NA
39 South 0 6599 132867.8 2518000 110
40 South 1 42047 119018.9 2880000 110
41 South 2 125856 127456.9 3934000 100
42 South 3 227546 168659.6 3934000 100
43 South 4 83899 287290.5 3934000 110
44 South 5 14095 462709.2 3934000 120
45 South 6 4258 545635.4 3934000 130
46 South 7 1027 609865.2 2552000 200
47 South 8 652 681768.1 2738000 250
48 South 9 314 609922.2 2057000 300
49 South 14 19 1996615 3934000 320000

arrange

One last thing. We can arrange a DataFrame as follows.

head(arrange(select(housing_df, "REGION", "VALP"), desc(housing_df$VALP)))
. REGION VALP
1 1 4775000
2 1 4775000
3 1 4775000
4 1 4775000
5 1 4775000
6 1 4775000

Or we can arrange the result of our aggregations.

housing_avg_agg <- agg(
            groupBy(housing_df, "REGION", "BDSP"), 
            NUM_PROPERTIES=n(housing_df$REGION),
            AVG_VALP = avg(housing_df$VALP), 
            MAX_VALUE=max(housing_df$VALP),
            MIN_VALUE=min(housing_df$VALP)
)
housing_avg_sorted <- head(arrange(
           housing_avg_agg,   
           desc(housing_avg_agg$AVG_VALP)
))
    
housing_avg_sorted$REGION <- factor(
        housing_avg_sorted$REGION, 
        levels=c(1,2,3,4,9), 
        labels=c('Northeast', 'Midwest','South','West','Puerto Rico')
)
housing_avg_sorted
. REGION BDSP NUM_PROPERTIES AVG_VALP MAX_VALUE MIN_VALUE
1 South 14 19 1996615 3934000 320000
2 Northeast 7 273 1305308 4532000 280
3 West 10 1486 975835.3 4727000 250
4 Northeast 8 2228 864495.4 4775000 200
5 South 8 652 681768.1 2738000 250
6 West 19 36 671500 1100000 65000

Conclusions

So that's it. In the next tutorial we will dig deeper into property values (VALP) using these operations and ggplot2 charts. We want to explore what factors influence the variables in our dataset. In this third tutorial we have introduced most of the tools we need in order to perform a exploratory data analysis using Spark and R on a large dataset.

And finally, remember that all the code for these series of Spark and R tutorials can be found in its own GitHub repository. Go there and make it yours.

Discover and read more posts from Jose A Dianes
get started
post comments2Replies
sunilreddy
8 years ago

emergency_calls_911$zip=as.factor(emergency_calls_911$zip)

emergency_calls_911 = separate(emergency_calls_911, col = title, into = c(“Type”, “SubType”), sep = “:”)

what is eqivalent code in SparkR for above two lines

Raphael Kurlansik
8 years ago

Are you aware of any methods to manipulate strings in sparkR? For instance, using the gsub() or strsplit() functions? I was disappointed to find that you can’t convert columns to vectors in sparkR. :(