Spark & R: data frame operations with SparkR
In this third tutorial (see the previous one) we will introduce more advanced concepts about SparkSQL with R that you can find in the SparkR documentation, applied to the 2013 American Community Survey housing data. These concepts are related with data frame manipulation, including data slicing, summary statistics, and aggregations. We will use them in combination with ggplot2 visualisations. We will explain what we do at every step but, if you want to go deeper into ggplot2
for exploratory data analysis, I did this Udacity on-line course in the past and I highly recommend it!
All the code for these series of Spark and R tutorials can be found in its own GitHub repository. Go there and make it yours.
Creating a SparkSQL Context and Loading Data
In order to explore our data, we first need to load it into a SparkSQL data frame. But first we need to init a SparkSQL context. The first thing we need to do is to set up some environment variables and library paths as follows. Remember to replace the value assigned to SPARK_HOME
with your Spark home folder.
# Set Spark home and R libs
Sys.setenv(SPARK_HOME='/home/cluster/spark-1.5.0-bin-hadoop2.6')
.libPaths(c(file.path(Sys.getenv('SPARK_HOME'), 'R', 'lib'), .libPaths()))
Now we can load the SparkR
library as follows.
library(SparkR)
Attaching package: ‘SparkR’
The following objects are masked from ‘package:stats’:
filter, na.omit
The following objects are masked from ‘package:base’:
intersect, rbind, sample, subset, summary, table, transform
And now we can initialise the Spark context as in the official documentation. In our case we are use a standalone Spark cluster with one master and seven workers. If you are running Spark in local node, use just master='local'
. Additionally, we require a Spark package from Databricks to read CSV files (more on this in the previous notebook).
sc <- sparkR.init(master='spark://169.254.206.2:7077', sparkPackages="com.databricks:spark-csv_2.11:1.2.0")
Launching java with spark-submit command /home/cluster/spark-1.5.0-bin-hadoop2.6/bin/spark-submit --packages com.databricks:spark-csv_2.11:1.2.0 sparkr-shell /tmp/RtmpfRY7gu/backend_port4c2413c05644
And finally we can start the SparkSQL context as follows.
sqlContext <- sparkRSQL.init(sc)
Now that we have our SparkSQL context ready, we can use it to load our CSV data into data frames. We have downloaded our 2013 American Community Survey dataset files in notebook 0, so they should be stored locally. Remember to set the right path for your data files in the first line, ours is /nfs/data/2013-acs/ss13husa.csv
.
housing_a_file_path <- file.path('', 'nfs','data','2013-acs','ss13husa.csv')
housing_b_file_path <- file.path('', 'nfs','data','2013-acs','ss13husb.csv')
Now let's read into a SparkSQL dataframe. We need to pass four parameters in addition to the sqlContext
:
- The file path.
header='true'
since ourcsv
files have a header with the column names.- Indicate that we want the library to infer the schema.
- And the source type (the Databricks package in this case).
And we have two separate files for both, housing and population data. We need to join them.
housing_a_df <- read.df(sqlContext,
housing_a_file_path,
header='true',
source = "com.databricks.spark.csv",
inferSchema='true')
housing_b_df <- read.df(sqlContext,
housing_b_file_path,
header='true',
source = "com.databricks.spark.csv",
inferSchema='true')
housing_df <- rbind(housing_a_df, housing_b_df)
Let's check that we have everything there by counting the files and listing a few of them.
nrow(housing_df)
1476313
head(housing_df)
. | RT | SERIALNO | DIVISION | PUMA | REGION | ST | ADJHSG | ADJINC | WGTP | NP | ellip.h | wgtp71 | wgtp72 | wgtp73 | wgtp74 | wgtp75 | wgtp76 | wgtp77 | wgtp78 | wgtp79 | wgtp80 | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
1 | H | 84 | 6 | 2600 | 3 | 1 | 1000000 | 1007549 | 0 | 1 | ⋯ | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | |
2 | H | 154 | 6 | 2500 | 3 | 1 | 1000000 | 1007549 | 51 | 4 | ⋯ | 86 | 53 | 59 | 84 | 49 | 15 | 15 | 20 | 50 | 16 | |
3 | H | 156 | 6 | 1700 | 3 | 1 | 1000000 | 1007549 | 449 | 1 | ⋯ | 161 | 530 | 601 | 579 | 341 | 378 | 387 | 421 | 621 | 486 | |
4 | H | 160 | 6 | 2200 | 3 | 1 | 1000000 | 1007549 | 16 | 3 | ⋯ | 31 | 24 | 33 | 7 | 7 | 13 | 18 | 23 | 23 | 5 | |
5 | H | 231 | 6 | 2400 | 3 | 1 | 1000000 | 1007549 | 52 | 1 | ⋯ | 21 | 18 | 37 | 49 | 103 | 38 | 49 | 51 | 46 | 47 | |
6 | H | 286 | 6 | 900 | 3 | 1 | 1000000 | 1007549 | 76 | 1 | ⋯ | 128 | 25 | 68 | 66 | 80 | 26 | 66 | 164 | 88 | 24 |
Giving ggplot2 a try
Before we dive into data selection and aggregations, let's try plotting something using ggplot2. We will use this library all the time during our exploratory data analysis, and we better mke sure how to use it with SparkSQL results.
# if it isn't installed,
# install install.packages("ggplot2")
# from the R console, specifying a CRAN mirror
library(ggplot2)
What if we directly try to use our SparkSQL DataFrame
class into a ggplot
?
c <- ggplot(data=housing_df, aes(x=factor(REGION)))
Error: ggplot2 doesn't know how to deal with data of class DataFrame
Obviously it doesn't work that way. The ggplot
function doesn't know how to deal with that type of distributed data frames (the Spark ones). Instead, we need to collect the data locally as follows.
housing_region_df_local <- collect(select(housing_df,"REGION"))
Let's have a look at what we got.
str(housing_region_df_local)
'data.frame': 1476313 obs. of 1 variable:
$ REGION: int 3 3 3 3 3 3 3 3 3 3 ...
That is, when we collect results from a SparkSQL DataFrame
we get a regular R data.frame
. Very convenient since we can manipulate it as we need to. For example, let's convert that int
values we have for REGION
to a factor with the proper names. From our data dictionary we will get the meaning of the REGION
variable, as well as the different values it can take.
housing_region_df_local$REGION <- factor(
x=housing_region_df_local$REGION,
levels=c(1,2,3,4,9),
labels=c('Northeast', 'Midwest','South','West','Puerto Rico')
)
And now we are ready to create the ggplot object as follows.
c <- ggplot(data=housing_region_df_local, aes(x=factor(REGION)))
And now we can give the plot a proper representation (e.g. a bar plot).
c + geom_bar() + xlab("Region")
We will always follow the same approach. First we make some sort of oepration with the SparkSQL DataFrame
object (e.g. a selection), then we collect results, and then we prepare the resulting data.frame
to be represented using ggplot2. But think about the previous. We just represented all the samples for a given column. That is almost a million and a half data points, and we are pushing our local R environment and ggplot2 a lot. In the case of the bar plot we didn't really experience any problems, cause is sort of aggregating data inside. But we will struggle to do scatter plots this way. The preferred kind of visualisations will be those that come from data from aggreations on SparkSQL DataFrames
as we will see in further sections.
Data Selection
In this section we will demonstrate how to select data from a SparkSQL DataFrame
object using SparkR.
select
and selectExpr
We already made use of the select
function. but let's have a look at the documentation.
?select
We see there that have two flavours of select
. One that gets a list of column names (this is the one we used so far) and another one called selectExpr
that we pass a string containing a SQL expression.
Of course we can pass more than a column name.
collect(select(housing_df, "REGION", "VALP"))
. | REGION | VALP |
---|---|---|
1 | 3 | NA |
2 | 3 | 25000 |
3 | 3 | 80000 |
4 | 3 | NA |
5 | 3 | NA |
6 | 3 | 18000 |
7 | 3 | 390000 |
8 | 3 | 120000 |
9 | 3 | NA |
10 | 3 | 160000 |
11 | 3 | NA |
12 | 3 | NA |
13 | 3 | NA |
14 | 3 | NA |
15 | 3 | 40000 |
16 | 3 | 60000 |
17 | 3 | 60000 |
18 | 3 | NA |
19 | 3 | 250000 |
20 | 3 | 110000 |
21 | 3 | 190000 |
22 | 3 | 160000 |
23 | 3 | NA |
24 | 3 | 750000 |
25 | 3 | 300000 |
26 | 3 | NA |
27 | 3 | NA |
28 | 3 | 100000 |
29 | 3 | 20000 |
30 | 3 | 70000 |
31 | ⋮ | ⋮ |
1476284 | 4 | 130000 |
1476285 | 4 | NA |
1476286 | 4 | NA |
1476287 | 4 | 150000 |
1476288 | 4 | 110000 |
1476289 | 4 | NA |
1476290 | 4 | 200000 |
1476291 | 4 | NA |
1476292 | 4 | 250000 |
1476293 | 4 | NA |
1476294 | 4 | 500000 |
1476295 | 4 | 7100 |
1476296 | 4 | 175000 |
1476297 | 4 | 500000 |
1476298 | 4 | NA |
1476299 | 4 | 25000 |
1476300 | 4 | 150000 |
1476301 | 4 | 240000 |
1476302 | 4 | NA |
1476303 | 4 | NA |
1476304 | 4 | NA |
1476305 | 4 | NA |
1476306 | 4 | 12000 |
1476307 | 4 | 56000 |
1476308 | 4 | 99000 |
1476309 | 4 | NA |
1476310 | 4 | NA |
1476311 | 4 | 150000 |
1476312 | 4 | NA |
1476313 | 4 | NA |
When passing column names we can also use the R notation data.frame$column.name
so familiar to R users. How does this notation compares to the name-based one in terms of performance?
system.time(
collect(select(housing_df, housing_df$VALP))
)
user system elapsed
30.086 0.032 48.046
system.time(
collect(select(housing_df, "VALP"))
)
user system elapsed
28.766 0.012 46.358
When using the $
notation, we can even pass expressions as follows.
head(select(housing_df, housing_df$VALP / 100))
. | (VALP / 100.0) |
---|---|
1 | NA |
2 | 250 |
3 | 800 |
4 | NA |
5 | NA |
6 | 180 |
So what's the point of selectExpr
then? Well, we can pass more complex SQL expressions. For example.
head(selectExpr(housing_df, "(VALP / 100) as VALP_by_100"))
. | VALP_by_100 |
---|---|
1 | NA |
2 | 250 |
3 | 800 |
4 | NA |
5 | NA |
6 | 180 |
filter
, subset
, and sql
The previous functions allow us selecting columns. In order to select rows, we will use filter
and contains
. Call the docs as follows if you want to know more about it.
?filter
With filter
we filter the rows of a DataFrame according to a given condition that we pass as argument. We can define conditions as SQL conditions using column names or by using the $ notation.
For example, and following with our property values column, let's select property values higher than 1000 for the south region.
system.time(
housing_valp_1000 <- collect(filter(select(housing_df, "REGION", "VALP"), "VALP > 1000"))
)
user system elapsed
38.043 0.184 56.259
housing_valp_1000
. | REGION | VALP |
---|---|---|
1 | 3 | 25000 |
2 | 3 | 80000 |
3 | 3 | 18000 |
4 | 3 | 390000 |
5 | 3 | 120000 |
6 | 3 | 160000 |
7 | 3 | 40000 |
8 | 3 | 60000 |
9 | 3 | 60000 |
10 | 3 | 250000 |
11 | 3 | 110000 |
12 | 3 | 190000 |
13 | 3 | 160000 |
14 | 3 | 750000 |
15 | 3 | 300000 |
16 | 3 | 100000 |
17 | 3 | 20000 |
18 | 3 | 70000 |
19 | 3 | 125000 |
20 | 3 | 1843000 |
21 | 3 | 829000 |
22 | 3 | 84000 |
23 | 3 | 150000 |
24 | 3 | 130000 |
25 | 3 | 90000 |
26 | 3 | 220000 |
27 | 3 | 225000 |
28 | 3 | 65000 |
29 | 3 | 80000 |
30 | 3 | 135000 |
31 | ⋮ | ⋮ |
847115 | 4 | 120000 |
847116 | 4 | 200000 |
847117 | 4 | 365000 |
847118 | 4 | 600000 |
847119 | 4 | 100000 |
847120 | 4 | 124000 |
847121 | 4 | 200000 |
847122 | 4 | 160000 |
847123 | 4 | 250000 |
847124 | 4 | 285000 |
847125 | 4 | 100000 |
847126 | 4 | 205000 |
847127 | 4 | 189000 |
847128 | 4 | 350000 |
847129 | 4 | 130000 |
847130 | 4 | 150000 |
847131 | 4 | 110000 |
847132 | 4 | 200000 |
847133 | 4 | 250000 |
847134 | 4 | 500000 |
847135 | 4 | 7100 |
847136 | 4 | 175000 |
847137 | 4 | 500000 |
847138 | 4 | 25000 |
847139 | 4 | 150000 |
847140 | 4 | 240000 |
847141 | 4 | 12000 |
847142 | 4 | 56000 |
847143 | 4 | 99000 |
847144 | 4 | 150000 |
Take into account that we can also perform the previous selection and filtering by using SQL queries agains the SparkSQL DataFrame
. In order to do that we need to register the table as follows.
registerTempTable(housing_df, "housing")
And then we can use SparkR sql
function using the sqlContext
as follows.
system.time(
housing_valp_1000_sql <- collect(sql(sqlContext, "SELECT REGION, VALP FROM housing WHERE VALP >= 1000"))
)
user system elapsed
38.862 0.008 56.747
housing_valp_1000_sql
. | REGION | VALP |
---|---|---|
1 | 3 | 25000 |
2 | 3 | 80000 |
3 | 3 | 18000 |
4 | 3 | 390000 |
5 | 3 | 120000 |
6 | 3 | 160000 |
7 | 3 | 40000 |
8 | 3 | 60000 |
9 | 3 | 60000 |
10 | 3 | 250000 |
11 | 3 | 110000 |
12 | 3 | 190000 |
13 | 3 | 160000 |
14 | 3 | 750000 |
15 | 3 | 300000 |
16 | 3 | 100000 |
17 | 3 | 20000 |
18 | 3 | 70000 |
19 | 3 | 125000 |
20 | 3 | 1843000 |
21 | 3 | 829000 |
22 | 3 | 84000 |
23 | 3 | 150000 |
24 | 3 | 130000 |
25 | 3 | 90000 |
26 | 3 | 220000 |
27 | 3 | 225000 |
28 | 3 | 65000 |
29 | 3 | 80000 |
30 | 3 | 135000 |
31 | ⋮ | ⋮ |
848420 | 4 | 120000 |
848421 | 4 | 200000 |
848422 | 4 | 365000 |
848423 | 4 | 600000 |
848424 | 4 | 100000 |
848425 | 4 | 124000 |
848426 | 4 | 200000 |
848427 | 4 | 160000 |
848428 | 4 | 250000 |
848429 | 4 | 285000 |
848430 | 4 | 100000 |
848431 | 4 | 205000 |
848432 | 4 | 189000 |
848433 | 4 | 350000 |
848434 | 4 | 130000 |
848435 | 4 | 150000 |
848436 | 4 | 110000 |
848437 | 4 | 200000 |
848438 | 4 | 250000 |
848439 | 4 | 500000 |
848440 | 4 | 7100 |
848441 | 4 | 175000 |
848442 | 4 | 500000 |
848443 | 4 | 25000 |
848444 | 4 | 150000 |
848445 | 4 | 240000 |
848446 | 4 | 12000 |
848447 | 4 | 56000 |
848448 | 4 | 99000 |
848449 | 4 | 150000 |
This last method might result more clear and flexible when we need to perform complex queries with multiple conditions. Using filter
and select
combinations might get verbose versus the clarity of the SQL lingua franca.
But there is another way of subsetting data frames in a functional way. A way that is very familiar to R users. It is by using the function subset
. Just have a look at the help page.
?subset
And we use it as follows.
system.time(
housing_valp_1000_subset <- collect(subset(
housing_df, housing_df$VALP>1000,
c("REGION","VALP")
))
)
user system elapsed
39.751 0.020 57.425
housing_valp_1000_subset
. | REGION | VALP |
---|---|---|
1 | 3 | 25000 |
2 | 3 | 80000 |
3 | 3 | 18000 |
4 | 3 | 390000 |
5 | 3 | 120000 |
6 | 3 | 160000 |
7 | 3 | 40000 |
8 | 3 | 60000 |
9 | 3 | 60000 |
10 | 3 | 250000 |
11 | 3 | 110000 |
12 | 3 | 190000 |
13 | 3 | 160000 |
14 | 3 | 750000 |
15 | 3 | 300000 |
16 | 3 | 100000 |
17 | 3 | 20000 |
18 | 3 | 70000 |
19 | 3 | 125000 |
20 | 3 | 1843000 |
21 | 3 | 829000 |
22 | 3 | 84000 |
23 | 3 | 150000 |
24 | 3 | 130000 |
25 | 3 | 90000 |
26 | 3 | 220000 |
27 | 3 | 225000 |
28 | 3 | 65000 |
29 | 3 | 80000 |
30 | 3 | 135000 |
31 | ⋮ | ⋮ |
847115 | 4 | 120000 |
847116 | 4 | 200000 |
847117 | 4 | 365000 |
847118 | 4 | 600000 |
847119 | 4 | 100000 |
847120 | 4 | 124000 |
847121 | 4 | 200000 |
847122 | 4 | 160000 |
847123 | 4 | 250000 |
847124 | 4 | 285000 |
847125 | 4 | 100000 |
847126 | 4 | 205000 |
847127 | 4 | 189000 |
847128 | 4 | 350000 |
847129 | 4 | 130000 |
847130 | 4 | 150000 |
847131 | 4 | 110000 |
847132 | 4 | 200000 |
847133 | 4 | 250000 |
847134 | 4 | 500000 |
847135 | 4 | 7100 |
847136 | 4 | 175000 |
847137 | 4 | 500000 |
847138 | 4 | 25000 |
847139 | 4 | 150000 |
847140 | 4 | 240000 |
847141 | 4 | 12000 |
847142 | 4 | 56000 |
847143 | 4 | 99000 |
847144 | 4 | 150000 |
Even more, we can use the []
notation we use with R data.frame
objects with SparkSQL DataFrames
thanks to SparkR. For example.
system.time(
housing_valp_1000_bracket <- collect(
housing_df[housing_df$VALP>1000, c("REGION","VALP")]
)
)
user system elapsed
39.090 0.013 56.381
housing_valp_1000_bracket
. | REGION | VALP |
---|---|---|
1 | 3 | 25000 |
2 | 3 | 80000 |
3 | 3 | 18000 |
4 | 3 | 390000 |
5 | 3 | 120000 |
6 | 3 | 160000 |
7 | 3 | 40000 |
8 | 3 | 60000 |
9 | 3 | 60000 |
10 | 3 | 250000 |
11 | 3 | 110000 |
12 | 3 | 190000 |
13 | 3 | 160000 |
14 | 3 | 750000 |
15 | 3 | 300000 |
16 | 3 | 100000 |
17 | 3 | 20000 |
18 | 3 | 70000 |
19 | 3 | 125000 |
20 | 3 | 1843000 |
21 | 3 | 829000 |
22 | 3 | 84000 |
23 | 3 | 150000 |
24 | 3 | 130000 |
25 | 3 | 90000 |
26 | 3 | 220000 |
27 | 3 | 225000 |
28 | 3 | 65000 |
29 | 3 | 80000 |
30 | 3 | 135000 |
31 | ⋮ | ⋮ |
847115 | 4 | 120000 |
847116 | 4 | 200000 |
847117 | 4 | 365000 |
847118 | 4 | 600000 |
847119 | 4 | 100000 |
847120 | 4 | 124000 |
847121 | 4 | 200000 |
847122 | 4 | 160000 |
847123 | 4 | 250000 |
847124 | 4 | 285000 |
847125 | 4 | 100000 |
847126 | 4 | 205000 |
847127 | 4 | 189000 |
847128 | 4 | 350000 |
847129 | 4 | 130000 |
847130 | 4 | 150000 |
847131 | 4 | 110000 |
847132 | 4 | 200000 |
847133 | 4 | 250000 |
847134 | 4 | 500000 |
847135 | 4 | 7100 |
847136 | 4 | 175000 |
847137 | 4 | 500000 |
847138 | 4 | 25000 |
847139 | 4 | 150000 |
847140 | 4 | 240000 |
847141 | 4 | 12000 |
847142 | 4 | 56000 |
847143 | 4 | 99000 |
847144 | 4 | 150000 |
That is, we have up to four different ways of subsetting a data frame with SparkR. We can plot any of the previous resulting data frames with a ggplot2 chart as we did before.
housing_valp_1000_bracket$REGION <- factor(
x=housing_valp_1000_bracket$REGION,
levels=c(1,2,3,4,9),
labels=c('Northeast', 'Midwest','South','West','Puerto Rico')
)
c <- ggplot(data=housing_region_df_local, aes(x=factor(REGION)))
c + geom_bar() + ggtitle("Samples with VALP>1000") + xlab("Region")
Finally, a function that is useful, specially when imputing missing values in data frames is isNaN
that can be applied to columns as we do with regular R data frames.
Data Aggregation and Sorting
In the previous notebook we already had a look at summary
/describe
that we can pass column names and get summary statistics that way. If we want instead to be specific about the statistic we want, SparkR also defines the following aggregation functions that we can apply to DataFrame
objects columns:
We use them passing columns with the $ notation, and they return columns, so they need to be part of a select
call for a DataFrame
. For example.
collect(select(housing_df, avg(housing_df$VALP)))
. | avg(VALP) |
---|---|
1 | 247682.8 |
groupBy
and summarize
/ agg
A basic operation when doing data aggregations on data frames is groupBy
. Basically it groups the DataFrame
we pass using the specified columns, so we can run aggregation on them. We use it in combination with summarize
/agg
in order to apply aggregation functions. For example, using the previous avg
example, let's averagle property values by region as follows.
housing_avg_valp <- collect(agg(
groupBy(housing_df, "REGION"),
NUM_PROPERTIES=n(housing_df$REGION),
AVG_VALP = avg(housing_df$VALP),
MAX_VALUE=max(housing_df$VALP),
MIN_VALUE=min(housing_df$VALP)
))
housing_avg_valp$REGION <- factor(
housing_avg_valp$REGION,
levels=c(1,2,3,4,9),
labels=c('Northeast', 'Midwest','South','West','Puerto Rico')
)
housing_avg_valp
. | REGION | NUM_PROPERTIES | AVG_VALP | MAX_VALUE | MIN_VALUE |
---|---|---|---|---|---|
1 | Northeast | 268285 | 314078.1 | 4775000 | 100 |
2 | Midwest | 328148 | 168305.3 | 2381000 | 100 |
3 | South | 560520 | 204236.9 | 3934000 | 100 |
4 | West | 319360 | 365559.3 | 4727000 | 110 |
We can add as many summary/aggregation columns as functions we want to calculate. There is also the possibility of adding several levels of grouping. For example, let's add the number of bedrooms (BDSP
in our dictionary) as follows.
housing_avg_valp <- collect(agg(
groupBy(housing_df, "REGION", "BDSP"),
NUM_PROPERTIES=n(housing_df$REGION),
AVG_VALP = avg(housing_df$VALP),
MAX_VALUE=max(housing_df$VALP),
MIN_VALUE=min(housing_df$VALP)
))
housing_avg_valp$REGION <- factor(
housing_avg_valp$REGION,
levels=c(1,2,3,4,9),
labels=c('Northeast', 'Midwest','South','West','Puerto Rico')
)
housing_avg_valp
. | REGION | BDSP | NUM_PROPERTIES | AVG_VALP | MAX_VALUE | MIN_VALUE |
---|---|---|---|---|---|---|
1 | West | NA | 30339 | NA | NA | NA |
2 | West | 0 | 7750 | 226487.3 | 4727000 | 120 |
3 | West | 1 | 32620 | 212315 | 4727000 | 110 |
4 | West | 2 | 74334 | 258654.5 | 4727000 | 110 |
5 | West | 3 | 106532 | 325764.1 | 4727000 | 110 |
6 | West | 4 | 51785 | 459180.8 | 4727000 | 110 |
7 | West | 5 | 12533 | 607017.6 | 4727000 | 120 |
8 | West | 6 | 929 | 391539.8 | 2198000 | 170 |
9 | West | 7 | 555 | 518111.6 | 3972000 | 160 |
10 | West | 8 | 87 | 478757.2 | 2221000 | 230 |
11 | West | 9 | 374 | 480418.7 | 2386000 | 140 |
12 | West | 10 | 1486 | 975835.3 | 4727000 | 250 |
13 | West | 19 | 36 | 671500 | 1100000 | 65000 |
14 | Northeast | NA | 31319 | NA | NA | NA |
15 | Northeast | 0 | 4951 | 311725.2 | 4775000 | 170 |
16 | Northeast | 1 | 30030 | 268071.8 | 4775000 | 110 |
17 | Northeast | 2 | 59301 | 233250.9 | 4775000 | 110 |
18 | Northeast | 3 | 89835 | 262429.5 | 4775000 | 110 |
19 | Northeast | 4 | 40622 | 393485.4 | 4775000 | 100 |
20 | Northeast | 5 | 8974 | 599000.8 | 4775000 | 110 |
21 | Northeast | 6 | 38 | 335772.7 | 750000 | 180 |
22 | Northeast | 7 | 273 | 1305308 | 4532000 | 280 |
23 | Northeast | 8 | 2228 | 864495.4 | 4775000 | 200 |
24 | Northeast | 9 | 56 | 607983.3 | 2383000 | 200 |
25 | Northeast | 10 | 642 | 416859.6 | 1826000 | 180 |
26 | Northeast | 13 | 16 | 298750 | 550000 | 150000 |
27 | Midwest | NA | 32390 | NA | NA | NA |
28 | Midwest | 0 | 3588 | 131162.5 | 1688000 | 120 |
29 | Midwest | 1 | 24826 | 100265.2 | 2381000 | 110 |
30 | Midwest | 2 | 76965 | 112534.6 | 2381000 | 110 |
31 | Midwest | 3 | 126023 | 149800.3 | 2381000 | 100 |
32 | Midwest | 4 | 51108 | 229332.6 | 2381000 | 100 |
33 | Midwest | 5 | 10804 | 314773.4 | 2381000 | 110 |
34 | Midwest | 7 | 53 | 282746.7 | 1548000 | 1000 |
35 | Midwest | 8 | 812 | 359498.4 | 1562000 | 150 |
36 | Midwest | 9 | 1261 | 424151.2 | 2381000 | 150 |
37 | Midwest | 10 | 318 | 344710.9 | 1659000 | 1000 |
38 | South | NA | 54208 | NA | NA | NA |
39 | South | 0 | 6599 | 132867.8 | 2518000 | 110 |
40 | South | 1 | 42047 | 119018.9 | 2880000 | 110 |
41 | South | 2 | 125856 | 127456.9 | 3934000 | 100 |
42 | South | 3 | 227546 | 168659.6 | 3934000 | 100 |
43 | South | 4 | 83899 | 287290.5 | 3934000 | 110 |
44 | South | 5 | 14095 | 462709.2 | 3934000 | 120 |
45 | South | 6 | 4258 | 545635.4 | 3934000 | 130 |
46 | South | 7 | 1027 | 609865.2 | 2552000 | 200 |
47 | South | 8 | 652 | 681768.1 | 2738000 | 250 |
48 | South | 9 | 314 | 609922.2 | 2057000 | 300 |
49 | South | 14 | 19 | 1996615 | 3934000 | 320000 |
arrange
One last thing. We can arrange a DataFrame
as follows.
head(arrange(select(housing_df, "REGION", "VALP"), desc(housing_df$VALP)))
. | REGION | VALP |
---|---|---|
1 | 1 | 4775000 |
2 | 1 | 4775000 |
3 | 1 | 4775000 |
4 | 1 | 4775000 |
5 | 1 | 4775000 |
6 | 1 | 4775000 |
Or we can arrange the result of our aggregations.
housing_avg_agg <- agg(
groupBy(housing_df, "REGION", "BDSP"),
NUM_PROPERTIES=n(housing_df$REGION),
AVG_VALP = avg(housing_df$VALP),
MAX_VALUE=max(housing_df$VALP),
MIN_VALUE=min(housing_df$VALP)
)
housing_avg_sorted <- head(arrange(
housing_avg_agg,
desc(housing_avg_agg$AVG_VALP)
))
housing_avg_sorted$REGION <- factor(
housing_avg_sorted$REGION,
levels=c(1,2,3,4,9),
labels=c('Northeast', 'Midwest','South','West','Puerto Rico')
)
housing_avg_sorted
. | REGION | BDSP | NUM_PROPERTIES | AVG_VALP | MAX_VALUE | MIN_VALUE |
---|---|---|---|---|---|---|
1 | South | 14 | 19 | 1996615 | 3934000 | 320000 |
2 | Northeast | 7 | 273 | 1305308 | 4532000 | 280 |
3 | West | 10 | 1486 | 975835.3 | 4727000 | 250 |
4 | Northeast | 8 | 2228 | 864495.4 | 4775000 | 200 |
5 | South | 8 | 652 | 681768.1 | 2738000 | 250 |
6 | West | 19 | 36 | 671500 | 1100000 | 65000 |
Conclusions
So that's it. In the next tutorial we will dig deeper into property values (VALP
) using these operations and ggplot2 charts. We want to explore what factors influence the variables in our dataset. In this third tutorial we have introduced most of the tools we need in order to perform a exploratory data analysis using Spark and R on a large dataset.
And finally, remember that all the code for these series of Spark and R tutorials can be found in its own GitHub repository. Go there and make it yours.
emergency_calls_911$zip=as.factor(emergency_calls_911$zip)
emergency_calls_911 = separate(emergency_calls_911, col = title, into = c(“Type”, “SubType”), sep = “:”)
what is eqivalent code in SparkR for above two lines
Are you aware of any methods to manipulate strings in sparkR? For instance, using the gsub() or strsplit() functions? I was disappointed to find that you can’t convert columns to vectors in sparkR. :(