This section shows you an example of building a machine learning application for spam detection using RDD-based API. The next section shows an example based on DataFrame-based API.
Let's use two algorithms to build a spam classifier:
HashingTF
to build term frequency feature vectors from a text of spam and ham e-mailsLogisticRegressionWithSGD
to build a model to separate the type of messages, such as spam or hamAs you have learned how to use notebooks in Chapter 6, Notebooks and Dataflows with Spark and Hadoop, you may execute the following code in the IPython Notebook or Zeppelin Notebook. You can execute the code from the command line as well:
[cloudera@quickstart ~]$ cat spam_messages.txt $$$ Send money 100% free Amazing stuff Home based Reverses aging No investment Send SSN and password [cloudera@quickstart ~]$ cat ham_messages.txt Thank you for attending conference Message from school Recommended courses for you Your order is ready for pickup Congratulations on your anniversary
[cloudera@quickstart ~]$ hadoop fs -put spam_messages.txt [cloudera@quickstart ~]$ hadoop fs -put ham_messages.txt
master
to yarn-client
to execute on YARN:[cloudera@quickstart ~]$ pyspark --master local[*]
>>> from pyspark import SparkContext >>> from pyspark.mllib.regression import LabeledPoint >>> from pyspark.mllib.classification import LogisticRegressionWithSGD >>> from pyspark.mllib.feature import HashingTF
>>> spam_messages = sc.textFile("spam_messages.txt") >>> ham_messages = sc.textFile("ham_messages.txt")
HashingTF
instance to map the e-mail text to vectors of 100
features. Split each e-mail into words and then map each word to one feature:>>> from pyspark.mllib.feature import HashingTF >>> tf = HashingTF(numFeatures = 100) >>> spam_features = spam_messages.map(lambda email: tf.transform(email.split(" "))) >>> ham_features = ham_messages.map(lambda email: tf.transform(email.split(" ")))
LabeledPoint
datasets for positive (spam) and negative (ham) examples. LabeledPoint
consists simply of a label and features vector:>>> positive_examples = spam_features.map(lambda features: LabeledPoint(1, features)) >>> negative_examples = ham_features.map(lambda features: LabeledPoint(0, features))
>>> training_data = positive_examples.union(negative_examples) >>> training_data.cache() >>> training_data.collect() [LabeledPoint(1.0, (100,[29,50,71],[1.0,1.0,1.0])) , LabeledPoint(1.0, (100,[0,34,38],[1.0,1.0,1.0])), LabeledPoint(1.0, (100,[0,86,91],[1.0,1.0,1.0])), LabeledPoint(1.0, (100,[1,48],[1.0,1.0])), LabeledPoint(1.0, (100,[65,93],[1.0,1.0])), LabeledPoint(1.0, (100,[85,91],[1.0,1.0])), LabeledPoint(1.0, (100,[50,55,76,79 ],[1.0,1.0,1.0,1.0])), LabeledPoint(0.0, (100,[20,25,57,82], [1.0,1.0,1.0,2.0])), LabeledPoint(0.0, (100,[46,66,92],[1.0,1 .0,1.0])), LabeledPoint(0.0, (100,[25,82,94],[1.0,2.0,1.0])), LabeledPoint(0.0, (100,[1,25,62,78,82,92],[1.0,1.0,1.0,1.0,1.0,1.0 ])), LabeledPoint(0.0, (100,[21,81,88,89],[1.0,1.0,1.0,1.0]))]
>>> model = LogisticRegressionWithSGD.train(training_data) >>> model (weights=[0.747245937607,0.312621971183,0.0,0.0,0.0,0.0,0.0,0.0, 0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,-0.182791053614, -0.396071137454,0.0,0.0,0.0,-0.616391394849,0.0,0.0,0.0, 0.399792136011,0.0,0.0,0.0,0.0,0.400316310041,0.0,0.0,0.0, 0.400316310041,0.0,0.0,0.0,0.0,0.0,0.0,0.0,-0.412567752266, 0.0,0.548276952195,0.0,0.754346808086,0.0,0.0,0.0,0.0, 0.354554672075,0.0,-0.182791053614,0.0,0.0,0.0,0.0, -0.235654981013,0.0,0.0,0.508145068107,-0.412567752266, 0.0,0.0,0.0,0.0,0.399792136011,0.0,0.0,0.0,0.0,0.354554672075, 0.0,-0.235654981013,0.354554672075,0.0,-0.396071137454, -0.997127808685,0.0,0.0,0.454803011847,0.346929627566,0.0, -0.396071137454,-0.396071137454,0.0,0.801732639413, -0.648222733279,0.508145068107,-0.197945360222,0.0,0.0,0.0,0.0, 0.0], intercept=0.0)
HashingTF
feature transformation algorithm used on the training data:>>> pos_example = tf.transform("No investment required".split(" ")) >>> neg_example = tf.transform("Data Science courses recommended for you".split(" "))
>>> print "Prediction for positive test: %g" % model.predict(pos_example) Prediction for positive test: 1 >>> print "Prediction for negative test: %g" % model.predict(neg_example) Prediction for negative test: 0
Try other messages and check the accuracy of the algorithm. If you provide more training data to the algorithm, the accuracy will improve.