Codementor Events

Spark & Python: MLlib Logistic Regression

Published Jul 03, 2015Last updated Feb 10, 2017

Instructions

My Spark & Python series of tutorials can be examined individually, although there is a more or less linear 'story' when followed in sequence. By using the same dataset they try to solve a related set of tasks with it.

It is not the only one but, a good way of following these Spark tutorials is by first cloning the GitHub repo, and then starting your own IPython notebook in pySpark mode. For example, if we have a standalone Spark installation running in our localhost with a maximum of 6Gb per node assigned to IPython:

MASTER="spark://127.0.0.1:7077" SPARK_EXECUTOR_MEMORY="6G" IPYTHON_OPTS="notebook --pylab inline" ~/spark-1.3.1-bin-hadoop2.6/bin/pyspark

Notice that the path to the pyspark command will depend on your specific installation. So as a requirement, you need to have Spark installed in the same machine you are going to start the IPython notebook server.

For more Spark options see here. In general it works the rule of passign options described in the form spark.executor.memory as SPARK_EXECUTOR_MEMORY when calling IPython/pySpark.

Datasets

We will be using datasets from the KDD Cup 1999.

References

The reference book for these and other Spark related topics is Learning Spark by Holden Karau, Andy Konwinski, Patrick Wendell, and Matei Zaharia.

The KDD Cup 1999 competition dataset is described in detail here.

Introduction

In this tutorial we will use Spark's machine learning library MLlib to build a Logistic Regression classifier for network attack detection. We will use the complete KDD Cup 1999 datasets in order to test Spark capabilities with large datasets.

Additionally, we will introduce two ways of performing model selection: by using a correlation matrix and by using hypothesis testing.

For your reference when showing execution times, at the time of processing this tutorial, our Spark cluster contains:

  • Eight nodes, with one of them acting as master and the rest as workers (for a total of 14 execution cores available).
  • Each node contains 8Gb of RAM, with 6Gb being used for each node.
  • Each node has a 2.4Ghz Intel dual core processor.

Getting the data and creating the RDD

As we said, this time we will use the complete dataset provided for the KDD Cup 1999, containing nearly half million network interactions. The file is provided as a Gzip file that we will download locally.

import urllib
f = urllib.urlretrieve ("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data.gz", "kddcup.data.gz")


data_file = "./kddcup.data.gz"
raw_data = sc.textFile(data_file)

print "Train data size is {}".format(raw_data.count())
Train data size is 4898431

The KDD Cup 1999 also provide test data that we will load in a separate RDD.

ft = urllib.urlretrieve("http://kdd.ics.uci.edu/databases/kddcup99/corrected.gz", "corrected.gz")


test_data_file = "./corrected.gz"
test_raw_data = sc.textFile(test_data_file)

print "Test data size is {}".format(test_raw_data.count())
Test data size is 311029

Labeled Points

A labeled point is a local vector associated with a label/response. In MLlib, labeled points are used in supervised learning algorithms and they are stored as doubles. For binary classification, a label should be either 0 (negative) or 1 (positive).

Preparing the training data

In our case, we are interested in detecting network attacks in general. We don't need to detect which type of attack we are dealing with. Therefore we will tag each network interaction as non attack (i.e. 'normal' tag) or attack (i.e. anything else but 'normal').

from pyspark.mllib.regression import LabeledPoint
from numpy import array

def parse_interaction(line):
    line_split = line.split(",")
    # leave_out = [1,2,3,41]
    clean_line_split = line_split[0:1]+line_split[4:41]
    attack = 1.0
    if line_split[41]=='normal.':
        attack = 0.0
    return LabeledPoint(attack, array([float(x) for x in clean_line_split]))

training_data = raw_data.map(parse_interaction)

Preparing the test data

Similarly, we process our test data file.

test_data = test_raw_data.map(parse_interaction)

Detecting network attacks using Logistic Regression

Logistic regression is widely used to predict a binary response. Spark implements two algorithms to solve logistic regression: mini-batch gradient descent and L-BFGS. L-BFGS is recommended over mini-batch gradient descent for faster convergence.

Training a classifier

from pyspark.mllib.classification import LogisticRegressionWithLBFGS
from time import time

# Build the model
t0 = time()
logit_model = LogisticRegressionWithLBFGS.train(training_data)
tt = time() - t0

print "Classifier trained in {} seconds".format(round(tt,3))
Classifier trained in 365.599 seconds

Evaluating the model on new data

In order to measure the classification error on our test data, we use map on the test_data RDD and the model to predict each test point class.

labels_and_preds = test_data.map(lambda p: (p.label, logit_model.predict(p.features)))

Classification results are returned in pars, with the actual test label and the predicted one. This is used to calculate the classification error by using filter and count as follows.

t0 = time()
test_accuracy = labels_and_preds.filter(lambda (v, p): v == p).count() / float(test_data.count())
tt = time() - t0

print "Prediction made in {} seconds. Test accuracy is {}".format(round(tt,3), round(test_accuracy,4))
Prediction made in 34.291 seconds. Test accuracy is 0.9164

That's a decent accuracy. We know that there is space for improvement with a better variable selection and also by including categorical variables (e.g. we have excluded 'protocol' and 'service').

Model selection

Model or feature selection helps us building more interpretable and efficient models (or a classifier in this case). For ilustrative purposes, we will follow two different approaches, correlation matrices and hypothesis testing.

Using a correlation matrix

In a previous notebook we calculated a correlation matrix in order to find predictors that are highly correlated. There are many possitble choices there in order to simplify our model. We can pick differen combinations of correlated variables and leave just those that represent them. The reader can try different combinations. Here we will choose the following for illustrative purposes:

  • From the description of the KDD Cup 99 task we know that the variable dst_host_same_src_port_rate references the percentage of the last 100 connections to the same port, for the same destination host. In our correlation matrix (and auxiliar dataframes) we find that this one is highly and positively correlated to src_bytes and srv_count. The former is the number of bytes sent form source to destination. The later is the number of connections to the same service as the current connection in the past 2 seconds. We decide not to include dst_host_same_src_port_rate in our model since we include the other two.

  • Variables serror_rate and srv_error_rate (% of connections that have SYN errors for same host and same service respectively) are highly positively correlated. Moreover, the set of variables that they highly correlate with are pretty much the same. They look like contributing very similarly to our model. We will keep just serror_rate.

  • A similar situation happens with rerror_rate and srv_rerror_rate (% of connections that have REJ errors) so we will keep just rerror_rate.

  • Same thing with variables prefixed with dst_host_ for the previous ones (e.g. dst_host_srv_serror_rate).

We will stop here, although the reader can keep experimenting removing correlated variables has before (e.g. same_srv_rate and diff_srv_rate are good candidates. Our list of variables we will drop includes:

  • dst_host_same_src_port_rate, (column 35).
  • srv_serror_rate (column 25).
  • srv_rerror_rate (column 27).
  • dst_host_srv_serror_rate (column 38).
  • dst_host_srv_rerror_rate (column 40).

Evaluating the new model

Let's proceed with the evaluation of our reduced model. First we need to provide training and testing datasets containing just the selected variables. For that we will define a new function ot parse th raw data that keeps just what we need.

def parse_interaction_corr(line):
    line_split = line.split(",")
    # leave_out = [1,2,3,25,27,35,38,40,41]
    clean_line_split = line_split[0:1]+line_split[4:25]+line_split[26:27]+line_split[28:35]+line_split[36:38]+line_split[39:40]
    attack = 1.0
    if line_split[41]=='normal.':
        attack = 0.0
    return LabeledPoint(attack, array([float(x) for x in clean_line_split]))

corr_reduced_training_data = raw_data.map(parse_interaction_corr)
corr_reduced_test_data = test_raw_data.map(parse_interaction_corr)

Note: when selecting elements in the split, a list comprehension with a leave_out list for filtering is more Pythonic than slicing and concatenation indeed, but we have found it less efficient. This is very important when dealing with large datasets. The parse_interaction functions will be called for every element in the RDD, so we need to make them as efficient as possible.

Now we can train the model.

# Build the model
t0 = time()
logit_model_2 = LogisticRegressionWithLBFGS.train(corr_reduced_training_data)
tt = time() - t0

print "Classifier trained in {} seconds".format(round(tt,3))
Classifier trained in 319.124 seconds

And evaluate its accuracy on the test data.

labels_and_preds = corr_reduced_test_data.map(lambda p: (p.label, logit_model_2.predict(p.features)))
t0 = time()
test_accuracy = labels_and_preds.filter(lambda (v, p): v == p).count() / float(corr_reduced_test_data.count())
tt = time() - t0

print "Prediction made in {} seconds. Test accuracy is {}".format(round(tt,3), round(test_accuracy,4))
Prediction made in 34.102 seconds. Test accuracy is 0.8599

As expected, we have reduced accuracy and also training time. However this doesn't seem a good trade! At least not for logistic regression and considering the predictors we decided to leave out. We have lost quite a lot of accuracy and have not gained a lot of execution time during training. Moreoever prediction time didn't improve.

Using hypothesis testing

Hypothesis testing is a powerful tool in statistical inference and learning to determine whether a result is statistically significant. MLlib supports Pearson's chi-squared ( χ2) tests for goodness of fit and independence. The goodness of fit test requires an input type of Vector, whereas the independence test requires a Matrix as input. Moreover, MLlib also supports the input type RDD[LabeledPoint] to enable feature selection via chi-squared independence tests. Again, these methods are part of the Statistics package.

In our case we want to perform some sort of feature selection, so we will provide an RDD of LabeledPoint. Internally, MLlib will calculate a contingency matrix and perform the Persons's chi-squared (χ2) test. Features need to be categorical. Real-valued features will be treated as categorical in each of its different values. There is a limit of 1000 different values, so we need either to leave out some features or categorise them. In this case, we will consider just features that either take boolean values or just a few different numeric values in our dataset. We could overcome this limitation by defining a more complex parse_interaction function that categorises each feature properly.

feature_names = ["land","wrong_fragment",
             "urgent","hot","num_failed_logins",
             "logged_in","num_compromised",
             "root_shell","su_attempted",
             "num_root","num_file_creations",
             "num_shells","num_access_files",
            "num_outbound_cmds",
             "is_hot_login","is_guest_login",
             "count","srv_count","serror_rate",
             "srv_serror_rate","rerror_rate",
             "srv_rerror_rate","same_srv_rate",
             "diff_srv_rate","srv_diff_host_rate",
             "dst_host_count","dst_host_srv_count",
             "dst_host_same_srv_rate","dst_host_diff_srv_rate",
             "dst_host_same_src_port_rate",
             "dst_host_srv_diff_host_rate","dst_host_serror_rate",
             "dst_host_srv_serror_rate",
             "dst_host_rerror_rate","dst_host_srv_rerror_rate"]


def parse_interaction_categorical(line):
    line_split = line.split(",")
    clean_line_split = line_split[6:41]
    attack = 1.0
    if line_split[41]=='normal.':
        attack = 0.0
    return LabeledPoint(attack, 
        array([float(x) for x in clean_line_split]))

training_data_categorical = raw_data.map(parse_interaction_categorical)


from pyspark.mllib.stat import Statistics

chi = Statistics.chiSqTest(training_data_categorical)

Now we can check the resulting values after putting them into a Pandas data frame.

import pandas as pd
pd.set_option('display.max_colwidth', 30)

records = [(result.statistic, result.pValue) for result in chi]

chi_df = pd.DataFrame(data=records, index= feature_names, columns=["Statistic","p-value"])

chi_df 
label statistic p-value
land 0.464984 0.495304
wrong_fragment 306.855508 0.000000
urgent 38.718442 0.000000
hot 19463.314328 0.000000
num_failed_logins 127.769106 0.000000
logged_in 3273098.055702 0.000000
num_compromised 2011.862724 0.000000
root_shell 1044.917853 0.000000
su_attempted 433.999968 0.000000
num_root 22871.675646 0.000000
num_file_creations 9179.739210 0.000000
num_shells 1380.027801 0.000000
num_access_files 18734.770753 0.000000
num_outbound_cmds 0.000000 1.000000
is_hot_login 8.070987 0.004498
is_guest_login 13500.511066 0.000000
count 4546398.359513 0.000000
srv_count 2296059.697747 0.000000
serror_rate 268419.919232 0.000000
srv_serror_rate 302626.967659 0.000000
rerror_rate 9860.453313 0.000000
srv_rerror_rate 32476.385256 0.000000
same_srv_rate 399912.444046 0.000000
diff_srv_rate 390999.770051 0.000000
srv_diff_host_rate 1365458.716670 0.000000
dst_host_count 2520478.944345 0.000000
dst_host_srv_count 1439086.154193 0.000000
dst_host_same_srv_rate 1237931.905888 0.000000
dst_host_diff_srv_rate 1339002.406545 0.000000
dst_host_same_src_port_rate 2915195.296767 0.000000
dst_host_srv_diff_host_rate 2226290.874366 0.000000
dst_host_serror_rate 407454.601492 0.000000
dst_host_srv_serror_rate 455099.001618 0.000000
dst_host_rerror_rate 136478.956325 0.000000
dst_host_srv_rerror_rate 254547.369785 0.000000

From that we conclude that predictors land and num_outbound_cmds could be removed from our model without affecting our accuracy dramatically. Let's try this.

Evaluating the new model

So the only modification to our first parse_interaction function will be to remove columns 6 and 19, corresponding to the two predictors that we want not to be part of our model.

def parse_interaction_chi(line):
    line_split = line.split(",")
    # leave_out = [1,2,3,6,19,41]
    clean_line_split = line_split[0:1] + line_split[4:6] + line_split[7:19] + line_split[20:41]
    attack = 1.0
    if line_split[41]=='normal.':
        attack = 0.0
    return LabeledPoint(attack, array([float(x) for x in clean_line_split]))

training_data_chi = raw_data.map(parse_interaction_chi)
test_data_chi = test_raw_data.map(parse_interaction_chi)

Now we build the logistic regression classifier again.

# Build the model
t0 = time()
logit_model_chi = LogisticRegressionWithLBFGS.train(training_data_chi)
tt = time() - t0

print "Classifier trained in {} seconds".format(round(tt,3))
Classifier trained in 356.507 seconds

And evaluate in test data.

labels_and_preds = test_data_chi.map(lambda p: (p.label, logit_model_chi.predict(p.features)))
t0 = time()
test_accuracy = labels_and_preds.filter(lambda (v, p): v == p).count() / float(test_data_chi.count())
tt = time() - t0

print "Prediction made in {} seconds. Test accuracy is {}".format(round(tt,3), round(test_accuracy,4))
Prediction made in 34.656 seconds. Test accuracy is 0.9164

So we can see that, by using hypothesis testing, we have been able to remove two predictors without diminishing testing accuracy at all. Training time improved a bit as well. This might now seem like a big model reduction, but it is something when dealing with big data sources. Moreover, we should be able to categorise those five predictors we have left out for different reasons and, either include them in the model or leave them out if they aren't statistically significant.

Additionally, we could try to remove some of those predictors that are highly correlated, trying not to reduce accuracy too much. In the end, we should end up with a model easier to understand and use.

Discover and read more posts from Jose A Dianes
get started
post comments1Reply
Joe Han
9 years ago

Hello,
I’m running spark on python 3. The line below produces an error.
test_accuracy = labels_and_preds.filter(lambda (v, p): v == p).count() / float(test_data.count())

According to my google search, python 3 does not take tuple as an input so for python 3 users, this code below should work
test_accuracy = labels_and_preds.filter(lambda vp: vp[0] == vp[1]).count() / float(test_data.count())

Excuse my variable naming due to my ignorance.