Step 2 - Creating vocabulary and tokens count to train the LDA after text pre-processing

The run() method takes params such as input text, predefined vocabulary size, and stop word file:

def run(params: Params)

Then, it starts text pre-processing for the LDA model as follows (that is, inside the run method):

// Load documents, and prepare them for LDA.
val preprocessStart = System.nanoTime()
val (corpus, vocabArray, actualNumTokens) = preprocess(params.input, params.vocabSize, params.stopwordFile)

The Params case class is used to define the parameters to train the LDA model. This goes as follows:

//Setting the parameters before training the LDA model
case class Params(var input: String = "", var ldaModel: LDAModel = null,
k: Int = 5,
maxIterations: Int = 100,
docConcentration: Double = 5,
topicConcentration: Double = 5,
vocabSize: Int = 2900000,
stopwordFile: String = "data/docs/stopWords.txt",
algorithm: String = "em",
checkpointDir: Option[String] = None,
checkpointInterval: Int = 100)

For better result, you set these parameters in try and error basis. Alternatively, you should go with the cross-validation for even better performance. Now that if you want to checkpoint the current parameters, uses the following line of code:

if (params.checkpointDir.nonEmpty) {
spark.sparkContext.setCheckpointDir(params.checkpointDir.get)
}

The preprocess method is used to process the raw text. First, let's read the whole text using the wholeTextFiles() method as follows:

val initialrdd = spark.sparkContext.wholeTextFiles(paths).map(_._2) 
initialrdd.cache()

In the preceding code, paths are the path of the text files. Then, we need to prepare the morphological RDD from the raw text after, based on the lemma texts, as follows:

val rdd = initialrdd.mapPartitions { partition =>
val morphology = new Morphology()
partition.map { value => helperForLDA.getLemmaText(value, morphology) }
}.map(helperForLDA.filterSpecialCharacters)

Here, the getLemmaText() method from the helperForLDA class supplies the lemma texts after filtering the special characters, such as ("""[! @ # $ % ^ & * ( ) _ + - − , " ' ; : . ` ? --]), as regular expressions, using the filterSpaecialChatacters() method. The method goes as follows:

def getLemmaText(document: String, morphology: Morphology) = {
val string =
new StringBuilder()
val value =
new Document(document).sentences().toList.flatMap {
a =>
val words = a.words().toList
val tags = a.posTags().toList
(words zip tags).toMap.map {
a =>
val newWord = morphology.lemma(a._1, a._2)
val addedWoed =
if (newWord.length > 3) {
newWord
}
else { "" }
string.append(addedWoed + " ")
}
}
string.toString()
}

It is to be noted that the Morphology() class computes the base form of English words by removing only inflections (not derivational morphology). That is, it only does noun plurals, pronoun case, and verb endings, and not things such as comparative adjectives or derived nominal. The getLemmaText() method takes the document and the corresponding morphology and finally returns the lemmatized texts.

This comes from the Stanford NLP group. To use this, you should have the following import in the main class file: edu.stanford.nlp.process.Morphology. In the pom.xml file, you will have to include the following entries as dependencies:

<dependency>
<groupId>edu.stanford.nlp</groupId>
<artifactId>stanford-corenlp</artifactId>
<version>3.6.0</version>
</dependency>
<dependency>
<groupId>edu.stanford.nlp</groupId>
<artifactId>stanford-corenlp</artifactId>
<version>3.6.0</version>
<classifier>models</classifier>
</dependency>

The filterSpecialCharacters() goes as follows:

def filterSpecialCharacters(document: String) = document.replaceAll("""[! @ # $ % ^ & * ( ) _ + - − , " ' ; : . ` ? --]""", " ")

Once we have the RDD with special characters removed, we can create a DataFrame for building the text analytics pipeline:

rdd.cache()
initialrdd.unpersist()
val df = rdd.toDF("docs")
df.show()

The DataFrame contains only document tags. A snapshot of the DataFrame is as follows:


Figure 5: Raw texts from the input dataset

Now if you look at the preceding DataFrame carefully, you will see that we still need to tokenize them. Moreover, there are stop words in the DataFrame, such as this, with, and so on, so we need to remove them as well. First, let's tokenize them using the RegexTokenizer API as follows:

val tokenizer = new RegexTokenizer()
.setInputCol("docs")
.setOutputCol("rawTokens")

Now let's remove all the stop words as follows:

val stopWordsRemover = new StopWordsRemover()
.setInputCol("rawTokens")
.setOutputCol("tokens")
stopWordsRemover.setStopWords(stopWordsRemover.getStopWords ++ customizedStopWords)

Furthermore, we also need to apply count vectors to find only the important features from the tokens. This will help make the pipeline chained as the pipeline stage. Let's do it as follows:

val countVectorizer = new CountVectorizer()
.setVocabSize(vocabSize)
.setInputCol("tokens")
.setOutputCol("features")
When an a-priori dictionary is not available, CountVectorizer can be used as an Estimator to extract the vocabulary and generate a CountVectorizerModel. In other words, CountVectorizer is used to convert a collection of text documents to vectors of token (that is, term) counts. The CountVectorizerModel produces sparse representations for the documents over the vocabulary, which can then be fed to LDA. More technically, when the fit() method is invoked for the fitting process, CountVectorizer will select the top vocabSize words ordered by term frequency across the corpus.

Now, create the pipeline by chaining the transformers (tokenizer, stopWordsRemover, and countVectorizer) as follows:

val pipeline = new Pipeline().setStages(Array(tokenizer, stopWordsRemover, countVectorizer)) 

Now, let's fit and transform the pipeline toward the vocabulary and number of tokens:

val model = pipeline.fit(df)
val documents = model.transform(df).select("features").rdd.map {
case Row(features: MLVector) => Vectors.fromML(features)
}.zipWithIndex().map(_.swap)

Finally, return the vocabulary and token count pairs as follows:

(documents, model.stages(2).asInstanceOf[CountVectorizerModel].vocabulary, documents.map(_._2.numActives).sum().toLong) Now let's see the statistics of the training data: 

println() println("Training corpus summary:")
println("-------------------------------")
println("Training set size: " + actualCorpusSize + " documents")
println("Vocabulary size: " + actualVocabSize + " terms")
println("Number of tockens: " + actualNumTokens + " tokens")
println("Preprocessing time: " + preprocessElapsed + " sec")
println("-------------------------------")
println()
>>>
Training corpus summary:
-------------------------------
Training set size: 19 documents
Vocabulary size: 21611 terms
Number of tockens: 75784 tokens
Preprocessing time: 46.684682086 sec
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset