Logging with log4j with Spark

Spark uses log4j for its own logging. All the operations that happen backend get logged to the Spark shell console (which is already configured to the underlying storage). Spark provides a template of log4j as a property file, and we can extend and modify that file for logging in Spark. Move to the SPARK_HOME/conf directory and you should see the log4j.properties.template file. This could help us as the starting point for our own logging system.

Now, let's create our own custom logging system while running a Spark job. When you are done, rename the file as log4j.properties and put it under the same directory (that is, project tree). A sample snapshot of the file can be seen as follows:

Figure 17: A snap of the log4j.properties file

By default, everything goes to console and file. However, if you want to bypass all the noiser logs to a system file located at, say, /var/log/sparkU.log, then you can set these properties in the log4j.properties file as follows:

log4j.logger.spark.storage=INFO, RollingAppender
log4j.logger.spark.scheduler=INFO, RollingAppender
log4j.logger.spark.CacheTracker=INFO, RollingAppender
log4j.logger.spark.CacheTrackerActor=INFO, RollingAppender
log4j.logger.spark.MapOutputTrackerActor=INFO, RollingAppender
log4j.logger.spark.MapOutputTracker=INFO, RollingAppender

Basically, we want to hide all logs Spark generates so that we don't have to deal with them in the shell. We redirect them to be logged in the filesystem. On the other hand, we want our own logs to be logged in the shell and a separate file so that they don't get mixed up with the ones from Spark. From here, we will point Splunk to the files where our own logs are, which in this particular case is /var/log/sparkU.log.


Then thelog4j.properties file is picked up by Spark when the application starts, so we don't have to do anything aside from placing it in the mentioned location.

Now let's see how we can create our own logging system. Look at the following code and try to understand what is happening here:

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.log4j.LogManager
import org.apache.log4j.Level
import org.apache.log4j.Logger

object MyLog {
def main(args: Array[String]):Unit= {
// Stting logger level as WARN
val log = LogManager.getRootLogger

// Creating Spark Context
val conf = new SparkConf().setAppName("My App").setMaster("local[*]")
val sc = new SparkContext(conf)

//Started the computation and printing the logging information
val data = sc.parallelize(1 to 100000)

The preceding code conceptually logs only the warning message. It first prints the warning message and then creates an RDD by parallelizing numbers from 1 to 100,000. Once the RDD job is finished, it prints another warning log. However, there is a problem we haven't noticed yet with the earlier code segment.

One drawback of the org.apache.log4j.Logger class is that it is not serializable (refer to the optimization technique section for more details), which implies that we cannot use it inside a closure while doing operations on some parts of the Spark API. For example, if you try to execute the following code, you should experience an exception that says Task not serializable:

object MyLog {
def main(args: Array[String]):Unit= {
// Stting logger level as WARN
val log = LogManager.getRootLogger
// Creating Spark Context
val conf = new SparkConf().setAppName("My App").setMaster("local[*]")
val sc = new SparkContext(conf)
//Started the computation and printing the logging information
val i = 0
val data = sc.parallelize(i to 100000)
data.foreach(i => log.info("My number"+ i))

To solve this problem is also easy; just declare the Scala object with extends Serializable and now the code looks like the following:

class MyMapper(n: Int) extends Serializable{
@transient lazy val log = org.apache.log4j.LogManager.getLogger
def MyMapperDosomething(rdd: RDD[Int]): RDD[String] =
rdd.map{ i =>
log.warn("mapping: " + i)
(i + n).toString

So what is happening in the preceding code is that the closure can't be neatly distributed to all partitions since it can't close on the logger; hence, the whole instance of type MyMapper is distributed to all partitions; once this is done, each partition creates a new logger and uses it for logging.

In summary, the following is the complete code that helps us to get rid of this problem:

package com.example.Personal
import org.apache.log4j.{Level, LogManager, PropertyConfigurator}
import org.apache.spark._
import org.apache.spark.rdd.RDD

class MyMapper(n: Int) extends Serializable{
@transient lazy val log = org.apache.log4j.LogManager.getLogger
def MyMapperDosomething(rdd: RDD[Int]): RDD[String] =
rdd.map{ i =>
log.warn("Serialization of: " + i)
(i + n).toString

object MyMapper{
def apply(n: Int): MyMapper = new MyMapper(n)

object MyLog {
def main(args: Array[String]) {
val log = LogManager.getRootLogger
val conf = new SparkConf().setAppName("My App").setMaster("local[*]")
val sc = new SparkContext(conf)
val data = sc.parallelize(1 to 100000)
val mapper = MyMapper(1)
val other = mapper.MyMapperDosomething(data)

The output is as follows:

17/04/29 15:33:43 WARN root: Started 
17/04/29 15:31:51 WARN myLogger: mapping: 1
17/04/29 15:31:51 WARN myLogger: mapping: 49992
17/04/29 15:31:51 WARN myLogger: mapping: 49999
17/04/29 15:31:51 WARN myLogger: mapping: 50000
17/04/29 15:31:51 WARN root: Finished

We will discuss the built-in logging of Spark in the next section.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.