Spark & Python: Working with RDDs (II)
Instructions
My Spark & Python series of tutorials can be examined individually, although there is a more or less linear 'story' when followed in sequence. By using the same dataset they try to solve a related set of tasks with it.
It is not the only one but, a good way of following these Spark tutorials is by first cloning the GitHub repo, and then starting your own IPython notebook in pySpark mode. For example, if we have a standalone Spark installation running in our localhost
with a maximum of 6Gb per node assigned to IPython:
MASTER="spark://127.0.0.1:7077" SPARK_EXECUTOR_MEMORY="6G" IPYTHON_OPTS="notebook --pylab inline" ~/spark-1.3.1-bin-hadoop2.6/bin/pyspark
Notice that the path to the pyspark
command will depend on your specific installation. So as requirement, you need to have Spark installed in the same machine you are going to start the IPython notebook
server.
For more Spark options see here. In general it works the rule of passign options described in the form spark.executor.memory
as SPARK_EXECUTOR_MEMORY
when calling IPython/pySpark.
Datasets
We will be using datasets from the KDD Cup 1999.
References
The reference book for these and other Spark related topics is Learning Spark by Holden Karau, Andy Konwinski, Patrick Wendell, and Matei Zaharia.
The KDD Cup 1999 competition dataset is described in detail here.
Data Aggregations on RDDs
We can aggregate RDD data in Spark by using three different actions: reduce
, fold
, and aggregate
. The last one is the more general one and someway includes the first two.
Getting the Data and Creating the RDD
In this section we will use the reduced dataset (10 percent) provided for the KDD Cup 1999, containing nearly half million network interactions. The file is provided as a Gzip file that we will download locally.
import urllib
f = urllib.urlretrieve ("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz", "kddcup.data_10_percent.gz")
data_file = "./kddcup.data_10_percent.gz"
raw_data = sc.textFile(data_file)
Inspecting Interaction Duration by Tag
Both fold
and reduce
take a function as an argument that is applied to two elements of the RDD. The fold
action differs from reduce
in that it gets and additional initial zero value to be used for the initial call. This value should be the identity element for the function provided.
As an example, imagine we want to know the total duration of our interactions for normal and attack interactions. We can use reduce
as follows.
# parse data
csv_data = raw_data.map(lambda x: x.split(","))
# separate into different RDDs
normal_csv_data = csv_data.filter(lambda x: x[41]=="normal.")
attack_csv_data = csv_data.filter(lambda x: x[41]!="normal.")
The function that we pass to reduce
gets and returns elements of the same type of the RDD. If we want to sum durations we need to extract that element into a new RDD.
normal_duration_data = normal_csv_data.map(lambda x: int(x[0]))
attack_duration_data = attack_csv_data.map(lambda x: int(x[0]))
Now we can reduce these new RDDs.
total_normal_duration = normal_duration_data.reduce(lambda x, y: x + y)
total_attack_duration = attack_duration_data.reduce(lambda x, y: x + y)
print "Total duration for 'normal' interactions is {}".\
format(total_normal_duration)
print "Total duration for 'attack' interactions is {}".\
format(total_attack_duration)
Total duration for 'normal' interactions is 21075991
Total duration for 'attack' interactions is 2626792
We can go further and use counts to calculate duration means.
normal_count = normal_duration_data.count()
attack_count = attack_duration_data.count()
print "Mean duration for 'normal' interactions is {}".\
format(round(total_normal_duration/float(normal_count),3))
print "Mean duration for 'attack' interactions is {}".\
format(round(total_attack_duration/float(attack_count),3))
Mean duration for 'normal' interactions is 216.657
Mean duration for 'attack' interactions is 6.621
We have a first (and too simplistic) approach to identify attack interactions.
aggregate
A Better Way, Using The aggregate
action frees us from the constraint of having the return be the same type as the RDD we are working on. Like with fold
, we supply an initial zero value of the type we want to return. Then we provide two functions. The first one is used to combine the elements from our RDD with the accumulator. The second function is needed to merge two accumulators. Let's see it in action calculating the mean we did before.
normal_sum_count = normal_duration_data.aggregate(
(0,0), # the initial value
(lambda acc, value: (acc[0] + value, acc[1] + 1)), # combine val/acc
(lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1]))
)
print "Mean duration for 'normal' interactions is {}".\
format(round(normal_sum_count[0]/float(normal_sum_count[1]),3))
Mean duration for 'normal' interactions is 216.657
In the previous aggregation, the accumulator first element keeps the total sum, while the second element keeps the count. Combining an accumulator with an RDD element consists in summing up the value and incrementing the count. Combining two accumulators requires just a pairwise sum.
We can do the same with attack type interactions.
attack_sum_count = attack_duration_data.aggregate(
(0,0), # the initial value
(lambda acc, value: (acc[0] + value, acc[1] + 1)), # combine value with acc
(lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])) # combine accumulators
)
print "Mean duration for 'attack' interactions is {}".\
format(round(attack_sum_count[0]/float(attack_sum_count[1]),3))
Mean duration for 'attack' interactions is 6.621
Working with Key/Value Pair RDDs
Spark provides specific functions to deal with RDDs which elements are key/value pairs. They are sually used to perform aggregations and other processings by key.
In this section we will show how, by working with key/value pairs, we can process our network interactions dataset in a more practical and powerful way than that used in previous notebooks. Key/value pair aggregations will show to be particularly effective when trying to explore each type of tag in our network attacks, in an individual way.
Getting the Data and Creating the RDD
As we did in our first section, we will use the reduced dataset (10 percent) provided for the KDD Cup 1999, containing nearly half million network interactions. The file is provided as a Gzip file that we will download locally.
import urllib
f = urllib.urlretrieve ("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz", "kddcup.data_10_percent.gz")
data_file = "./kddcup.data_10_percent.gz"
raw_data = sc.textFile(data_file)
Creating a Pair RDD for Interaction Types
In this notebook we want to do some exploratory data analysis on our network interactions dataset. More concretly we want to profile each network interaction type in terms of some of its variables such as duration. In order to do so, we first need to create the RDD suitable for that, where each interaction is parsed as a CSV row representing the value, and is put together with its corresponding tag as a key.
Normally we create key/value pair RDDs by applying a function using map
to the original data. This function returns the corresponding pair for a given RDD element. We can proceed as follows.
csv_data = raw_data.map(lambda x: x.split(","))
key_value_data = csv_data.map(lambda x: (x[41], x)) # x[41] contains the network interaction tag
We have now our key/value pair data ready to be used. Let's get the first element in order to see how it looks like.
key_value_data.take(1)
[(u'normal.',
[u'0',
u'tcp',
u'http',
u'SF',
u'181',
u'5450',
u'0',
u'0',
u'0',
u'0',
u'0',
u'1',
u'0',
u'0',
u'0',
u'0',
u'0',
u'0',
u'0',
u'0',
u'0',
u'0',
u'8',
u'8',
u'0.00',
u'0.00',
u'0.00',
u'0.00',
u'1.00',
u'0.00',
u'0.00',
u'9',
u'9',
u'1.00',
u'0.00',
u'0.11',
u'0.00',
u'0.00',
u'0.00',
u'0.00',
u'0.00',
u'normal.'])]
Data Aggregations with Key/Value Pair RDDs
We can use all the transformations and actions available for normal RDDs with key/value pair RDDs. We just need to make the functions work with pair elements. Additionally, Spark provides specific functions to work with RDDs containing pair elements. They are very similar to those available for general RDDs.
For example, we have a reduceByKey
transformation that we can use as follows to calculate the total duration of each network interaction type.
key_value_duration = csv_data.map(lambda x: (x[41], float(x[0])))
durations_by_key = key_value_duration.reduceByKey(lambda x, y: x + y)
durations_by_key.collect()
[(u'guess_passwd.', 144.0),
(u'nmap.', 0.0),
(u'warezmaster.', 301.0),
(u'rootkit.', 1008.0),
(u'warezclient.', 627563.0),
(u'smurf.', 0.0),
(u'pod.', 0.0),
(u'neptune.', 0.0),
(u'normal.', 21075991.0),
(u'spy.', 636.0),
(u'ftp_write.', 259.0),
(u'phf.', 18.0),
(u'portsweep.', 1991911.0),
(u'teardrop.', 0.0),
(u'buffer_overflow.', 2751.0),
(u'land.', 0.0),
(u'imap.', 72.0),
(u'loadmodule.', 326.0),
(u'perl.', 124.0),
(u'multihop.', 1288.0),
(u'back.', 284.0),
(u'ipsweep.', 43.0),
(u'satan.', 64.0)]
We have a specific counting action for key/value pairs.
counts_by_key = key_value_data.countByKey()
counts_by_key
defaultdict(<type 'int'>, {u'guess_passwd.': 53, u'nmap.': 231, u'warezmaster.': 20, u'rootkit.': 10, u'warezclient.': 1020, u'smurf.': 280790, u'pod.': 264, u'neptune.': 107201, u'normal.': 97278, u'spy.': 2, u'ftp_write.': 8, u'phf.': 4, u'portsweep.': 1040, u'teardrop.': 979, u'buffer_overflow.': 30, u'land.': 21, u'imap.': 12, u'loadmodule.': 9, u'perl.': 3, u'multihop.': 7, u'back.': 2203, u'ipsweep.': 1247, u'satan.': 1589})
combineByKey
Using This is the most general of the per-key aggregation functions. Most of the other per-key combiners are implemented using it. We can think about it as the aggregate
equivlent since it allows the user to return values that are not the same type as our input data.
For example, we can use it to calculate per-type average durations as follows.
sum_counts = key_value_duration.combineByKey(
(lambda x: (x, 1)), # the initial value, with value x and count 1
(lambda acc, value: (acc[0]+value, acc[1]+1)), # how to combine a pair value with the accumulator: sum value, and increment count
(lambda acc1, acc2: (acc1[0]+acc2[0], acc1[1]+acc2[1])) # combine accumulators
)
sum_counts.collectAsMap()
{u'back.': (284.0, 2203),
u'buffer_overflow.': (2751.0, 30),
u'ftp_write.': (259.0, 8),
u'guess_passwd.': (144.0, 53),
u'imap.': (72.0, 12),
u'ipsweep.': (43.0, 1247),
u'land.': (0.0, 21),
u'loadmodule.': (326.0, 9),
u'multihop.': (1288.0, 7),
u'neptune.': (0.0, 107201),
u'nmap.': (0.0, 231),
u'normal.': (21075991.0, 97278),
u'perl.': (124.0, 3),
u'phf.': (18.0, 4),
u'pod.': (0.0, 264),
u'portsweep.': (1991911.0, 1040),
u'rootkit.': (1008.0, 10),
u'satan.': (64.0, 1589),
u'smurf.': (0.0, 280790),
u'spy.': (636.0, 2),
u'teardrop.': (0.0, 979),
u'warezclient.': (627563.0, 1020),
u'warezmaster.': (301.0, 20)}
We can see that the arguments are pretty similar to those passed to aggregate
in the previous notebook. The result associated to each type is in the form of a pair. If we want to actually get the averages, we need to do the division before collecting the results.
duration_means_by_type = sum_counts.map(lambda (key,value): (key, round(value[0]/value[1],3))).collectAsMap()
# Print them sorted
for tag in sorted(duration_means_by_type, key=duration_means_by_type.get, reverse=True):
print tag, duration_means_by_type[tag]
portsweep. 1915.299
warezclient. 615.258
spy. 318.0
normal. 216.657
multihop. 184.0
rootkit. 100.8
buffer_overflow. 91.7
perl. 41.333
loadmodule. 36.222
ftp_write. 32.375
warezmaster. 15.05
imap. 6.0
phf. 4.5
guess_passwd. 2.717
back. 0.129
satan. 0.04
ipsweep. 0.034
nmap. 0.0
smurf. 0.0
pod. 0.0
neptune. 0.0
teardrop. 0.0
land. 0.0
A small step into understanding what makes a network interaction be considered an attack.