Spark & Python: MLlib Logistic Regression
Instructions
My Spark & Python series of tutorials can be examined individually, although there is a more or less linear 'story' when followed in sequence. By using the same dataset they try to solve a related set of tasks with it.
It is not the only one but, a good way of following these Spark tutorials is by first cloning the GitHub repo, and then starting your own IPython notebook in pySpark mode. For example, if we have a standalone Spark installation running in our localhost
with a maximum of 6Gb per node assigned to IPython:
MASTER="spark://127.0.0.1:7077" SPARK_EXECUTOR_MEMORY="6G" IPYTHON_OPTS="notebook --pylab inline" ~/spark-1.3.1-bin-hadoop2.6/bin/pyspark
Notice that the path to the pyspark
command will depend on your specific installation. So as a requirement, you need to have Spark installed in the same machine you are going to start the IPython notebook
server.
For more Spark options see here. In general it works the rule of passign options described in the form spark.executor.memory
as SPARK_EXECUTOR_MEMORY
when calling IPython/pySpark.
Datasets
We will be using datasets from the KDD Cup 1999.
References
The reference book for these and other Spark related topics is Learning Spark by Holden Karau, Andy Konwinski, Patrick Wendell, and Matei Zaharia.
The KDD Cup 1999 competition dataset is described in detail here.
Introduction
In this tutorial we will use Spark's machine learning library MLlib to build a Logistic Regression classifier for network attack detection. We will use the complete KDD Cup 1999 datasets in order to test Spark capabilities with large datasets.
Additionally, we will introduce two ways of performing model selection: by using a correlation matrix and by using hypothesis testing.
For your reference when showing execution times, at the time of processing this tutorial, our Spark cluster contains:
- Eight nodes, with one of them acting as master and the rest as workers (for a total of 14 execution cores available).
- Each node contains 8Gb of RAM, with 6Gb being used for each node.
- Each node has a 2.4Ghz Intel dual core processor.
Getting the data and creating the RDD
As we said, this time we will use the complete dataset provided for the KDD Cup 1999, containing nearly half million network interactions. The file is provided as a Gzip file that we will download locally.
import urllib
f = urllib.urlretrieve ("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data.gz", "kddcup.data.gz")
data_file = "./kddcup.data.gz"
raw_data = sc.textFile(data_file)
print "Train data size is {}".format(raw_data.count())
Train data size is 4898431
The KDD Cup 1999 also provide test data that we will load in a separate RDD.
ft = urllib.urlretrieve("http://kdd.ics.uci.edu/databases/kddcup99/corrected.gz", "corrected.gz")
test_data_file = "./corrected.gz"
test_raw_data = sc.textFile(test_data_file)
print "Test data size is {}".format(test_raw_data.count())
Test data size is 311029
Labeled Points
A labeled point is a local vector associated with a label/response. In MLlib, labeled points are used in supervised learning algorithms and they are stored as doubles. For binary classification, a label should be either 0 (negative) or 1 (positive).
Preparing the training data
In our case, we are interested in detecting network attacks in general. We don't need to detect which type of attack we are dealing with. Therefore we will tag each network interaction as non attack (i.e. 'normal' tag) or attack (i.e. anything else but 'normal').
from pyspark.mllib.regression import LabeledPoint
from numpy import array
def parse_interaction(line):
line_split = line.split(",")
# leave_out = [1,2,3,41]
clean_line_split = line_split[0:1]+line_split[4:41]
attack = 1.0
if line_split[41]=='normal.':
attack = 0.0
return LabeledPoint(attack, array([float(x) for x in clean_line_split]))
training_data = raw_data.map(parse_interaction)
Preparing the test data
Similarly, we process our test data file.
test_data = test_raw_data.map(parse_interaction)
Detecting network attacks using Logistic Regression
Logistic regression is widely used to predict a binary response. Spark implements two algorithms to solve logistic regression: mini-batch gradient descent and L-BFGS. L-BFGS is recommended over mini-batch gradient descent for faster convergence.
Training a classifier
from pyspark.mllib.classification import LogisticRegressionWithLBFGS
from time import time
# Build the model
t0 = time()
logit_model = LogisticRegressionWithLBFGS.train(training_data)
tt = time() - t0
print "Classifier trained in {} seconds".format(round(tt,3))
Classifier trained in 365.599 seconds
Evaluating the model on new data
In order to measure the classification error on our test data, we use map
on the test_data
RDD and the model to predict each test point class.
labels_and_preds = test_data.map(lambda p: (p.label, logit_model.predict(p.features)))
Classification results are returned in pars, with the actual test label and the predicted one. This is used to calculate the classification error by using filter
and count
as follows.
t0 = time()
test_accuracy = labels_and_preds.filter(lambda (v, p): v == p).count() / float(test_data.count())
tt = time() - t0
print "Prediction made in {} seconds. Test accuracy is {}".format(round(tt,3), round(test_accuracy,4))
Prediction made in 34.291 seconds. Test accuracy is 0.9164
That's a decent accuracy. We know that there is space for improvement with a better variable selection and also by including categorical variables (e.g. we have excluded 'protocol' and 'service').
Model selection
Model or feature selection helps us building more interpretable and efficient models (or a classifier in this case). For ilustrative purposes, we will follow two different approaches, correlation matrices and hypothesis testing.
Using a correlation matrix
In a previous notebook we calculated a correlation matrix in order to find predictors that are highly correlated. There are many possitble choices there in order to simplify our model. We can pick differen combinations of correlated variables and leave just those that represent them. The reader can try different combinations. Here we will choose the following for illustrative purposes:
-
From the description of the KDD Cup 99 task we know that the variable
dst_host_same_src_port_rate
references the percentage of the last 100 connections to the same port, for the same destination host. In our correlation matrix (and auxiliar dataframes) we find that this one is highly and positively correlated tosrc_bytes
andsrv_count
. The former is the number of bytes sent form source to destination. The later is the number of connections to the same service as the current connection in the past 2 seconds. We decide not to includedst_host_same_src_port_rate
in our model since we include the other two. -
Variables
serror_rate
andsrv_error_rate
(% of connections that have SYN errors for same host and same service respectively) are highly positively correlated. Moreover, the set of variables that they highly correlate with are pretty much the same. They look like contributing very similarly to our model. We will keep justserror_rate
. -
A similar situation happens with
rerror_rate
andsrv_rerror_rate
(% of connections that have REJ errors) so we will keep justrerror_rate
. -
Same thing with variables prefixed with
dst_host_
for the previous ones (e.g.dst_host_srv_serror_rate
).
We will stop here, although the reader can keep experimenting removing correlated variables has before (e.g. same_srv_rate
and diff_srv_rate
are good candidates. Our list of variables we will drop includes:
dst_host_same_src_port_rate
, (column 35).srv_serror_rate
(column 25).srv_rerror_rate
(column 27).dst_host_srv_serror_rate
(column 38).dst_host_srv_rerror_rate
(column 40).
Evaluating the new model
Let's proceed with the evaluation of our reduced model. First we need to provide training and testing datasets containing just the selected variables. For that we will define a new function ot parse th raw data that keeps just what we need.
def parse_interaction_corr(line):
line_split = line.split(",")
# leave_out = [1,2,3,25,27,35,38,40,41]
clean_line_split = line_split[0:1]+line_split[4:25]+line_split[26:27]+line_split[28:35]+line_split[36:38]+line_split[39:40]
attack = 1.0
if line_split[41]=='normal.':
attack = 0.0
return LabeledPoint(attack, array([float(x) for x in clean_line_split]))
corr_reduced_training_data = raw_data.map(parse_interaction_corr)
corr_reduced_test_data = test_raw_data.map(parse_interaction_corr)
Note: when selecting elements in the split, a list comprehension with a leave_out
list for filtering is more Pythonic than slicing and concatenation indeed, but we have found it less efficient. This is very important when dealing with large datasets. The parse_interaction
functions will be called for every element in the RDD, so we need to make them as efficient as possible.
Now we can train the model.
# Build the model
t0 = time()
logit_model_2 = LogisticRegressionWithLBFGS.train(corr_reduced_training_data)
tt = time() - t0
print "Classifier trained in {} seconds".format(round(tt,3))
Classifier trained in 319.124 seconds
And evaluate its accuracy on the test data.
labels_and_preds = corr_reduced_test_data.map(lambda p: (p.label, logit_model_2.predict(p.features)))
t0 = time()
test_accuracy = labels_and_preds.filter(lambda (v, p): v == p).count() / float(corr_reduced_test_data.count())
tt = time() - t0
print "Prediction made in {} seconds. Test accuracy is {}".format(round(tt,3), round(test_accuracy,4))
Prediction made in 34.102 seconds. Test accuracy is 0.8599
As expected, we have reduced accuracy and also training time. However this doesn't seem a good trade! At least not for logistic regression and considering the predictors we decided to leave out. We have lost quite a lot of accuracy and have not gained a lot of execution time during training. Moreoever prediction time didn't improve.
Using hypothesis testing
Hypothesis testing is a powerful tool in statistical inference and learning to determine whether a result is statistically significant. MLlib supports Pearson's chi-squared ( χ2) tests for goodness of fit and independence. The goodness of fit test requires an input type of Vector
, whereas the independence test requires a Matrix
as input. Moreover, MLlib also supports the input type RDD[LabeledPoint]
to enable feature selection via chi-squared independence tests. Again, these methods are part of the Statistics
package.
In our case we want to perform some sort of feature selection, so we will provide an RDD of LabeledPoint
. Internally, MLlib will calculate a contingency matrix and perform the Persons's chi-squared (χ2) test. Features need to be categorical. Real-valued features will be treated as categorical in each of its different values. There is a limit of 1000 different values, so we need either to leave out some features or categorise them. In this case, we will consider just features that either take boolean values or just a few different numeric values in our dataset. We could overcome this limitation by defining a more complex parse_interaction
function that categorises each feature properly.
feature_names = ["land","wrong_fragment",
"urgent","hot","num_failed_logins",
"logged_in","num_compromised",
"root_shell","su_attempted",
"num_root","num_file_creations",
"num_shells","num_access_files",
"num_outbound_cmds",
"is_hot_login","is_guest_login",
"count","srv_count","serror_rate",
"srv_serror_rate","rerror_rate",
"srv_rerror_rate","same_srv_rate",
"diff_srv_rate","srv_diff_host_rate",
"dst_host_count","dst_host_srv_count",
"dst_host_same_srv_rate","dst_host_diff_srv_rate",
"dst_host_same_src_port_rate",
"dst_host_srv_diff_host_rate","dst_host_serror_rate",
"dst_host_srv_serror_rate",
"dst_host_rerror_rate","dst_host_srv_rerror_rate"]
def parse_interaction_categorical(line):
line_split = line.split(",")
clean_line_split = line_split[6:41]
attack = 1.0
if line_split[41]=='normal.':
attack = 0.0
return LabeledPoint(attack,
array([float(x) for x in clean_line_split]))
training_data_categorical = raw_data.map(parse_interaction_categorical)
from pyspark.mllib.stat import Statistics
chi = Statistics.chiSqTest(training_data_categorical)
Now we can check the resulting values after putting them into a Pandas data frame.
import pandas as pd
pd.set_option('display.max_colwidth', 30)
records = [(result.statistic, result.pValue) for result in chi]
chi_df = pd.DataFrame(data=records, index= feature_names, columns=["Statistic","p-value"])
chi_df
label | statistic | p-value |
---|---|---|
land |
0.464984 | 0.495304 |
wrong_fragment |
306.855508 | 0.000000 |
urgent |
38.718442 | 0.000000 |
hot |
19463.314328 | 0.000000 |
num_failed_logins |
127.769106 | 0.000000 |
logged_in |
3273098.055702 | 0.000000 |
num_compromised |
2011.862724 | 0.000000 |
root_shell |
1044.917853 | 0.000000 |
su_attempted |
433.999968 | 0.000000 |
num_root |
22871.675646 | 0.000000 |
num_file_creations |
9179.739210 | 0.000000 |
num_shells |
1380.027801 | 0.000000 |
num_access_files |
18734.770753 | 0.000000 |
num_outbound_cmds |
0.000000 | 1.000000 |
is_hot_login |
8.070987 | 0.004498 |
is_guest_login |
13500.511066 | 0.000000 |
count |
4546398.359513 | 0.000000 |
srv_count |
2296059.697747 | 0.000000 |
serror_rate |
268419.919232 | 0.000000 |
srv_serror_rate |
302626.967659 | 0.000000 |
rerror_rate |
9860.453313 | 0.000000 |
srv_rerror_rate |
32476.385256 | 0.000000 |
same_srv_rate |
399912.444046 | 0.000000 |
diff_srv_rate |
390999.770051 | 0.000000 |
srv_diff_host_rate |
1365458.716670 | 0.000000 |
dst_host_count |
2520478.944345 | 0.000000 |
dst_host_srv_count |
1439086.154193 | 0.000000 |
dst_host_same_srv_rate |
1237931.905888 | 0.000000 |
dst_host_diff_srv_rate |
1339002.406545 | 0.000000 |
dst_host_same_src_port_rate |
2915195.296767 | 0.000000 |
dst_host_srv_diff_host_rate |
2226290.874366 | 0.000000 |
dst_host_serror_rate |
407454.601492 | 0.000000 |
dst_host_srv_serror_rate |
455099.001618 | 0.000000 |
dst_host_rerror_rate |
136478.956325 | 0.000000 |
dst_host_srv_rerror_rate |
254547.369785 | 0.000000 |
From that we conclude that predictors land
and num_outbound_cmds
could be removed from our model without affecting our accuracy dramatically. Let's try this.
Evaluating the new model
So the only modification to our first parse_interaction
function will be to remove columns 6 and 19, corresponding to the two predictors that we want not to be part of our model.
def parse_interaction_chi(line):
line_split = line.split(",")
# leave_out = [1,2,3,6,19,41]
clean_line_split = line_split[0:1] + line_split[4:6] + line_split[7:19] + line_split[20:41]
attack = 1.0
if line_split[41]=='normal.':
attack = 0.0
return LabeledPoint(attack, array([float(x) for x in clean_line_split]))
training_data_chi = raw_data.map(parse_interaction_chi)
test_data_chi = test_raw_data.map(parse_interaction_chi)
Now we build the logistic regression classifier again.
# Build the model
t0 = time()
logit_model_chi = LogisticRegressionWithLBFGS.train(training_data_chi)
tt = time() - t0
print "Classifier trained in {} seconds".format(round(tt,3))
Classifier trained in 356.507 seconds
And evaluate in test data.
labels_and_preds = test_data_chi.map(lambda p: (p.label, logit_model_chi.predict(p.features)))
t0 = time()
test_accuracy = labels_and_preds.filter(lambda (v, p): v == p).count() / float(test_data_chi.count())
tt = time() - t0
print "Prediction made in {} seconds. Test accuracy is {}".format(round(tt,3), round(test_accuracy,4))
Prediction made in 34.656 seconds. Test accuracy is 0.9164
So we can see that, by using hypothesis testing, we have been able to remove two predictors without diminishing testing accuracy at all. Training time improved a bit as well. This might now seem like a big model reduction, but it is something when dealing with big data sources. Moreover, we should be able to categorise those five predictors we have left out for different reasons and, either include them in the model or leave them out if they aren't statistically significant.
Additionally, we could try to remove some of those predictors that are highly correlated, trying not to reduce accuracy too much. In the end, we should end up with a model easier to understand and use.
Hello,
I’m running spark on python 3. The line below produces an error.
test_accuracy = labels_and_preds.filter(lambda (v, p): v == p).count() / float(test_data.count())
According to my google search, python 3 does not take tuple as an input so for python 3 users, this code below should work
test_accuracy = labels_and_preds.filter(lambda vp: vp[0] == vp[1]).count() / float(test_data.count())
Excuse my variable naming due to my ignorance.