Cleaning broadcast variables

Broadcast variables do occupy memory on all executors and depending on the size of the data contained in the broadcasted variable, this could cause resource issues at some point. There is a way to remove broadcasted variables from the memory of all executors.

Calling unpersist() on a broadcast variable removed the data of the broadcast variable from the memory cache of all executors to free up resources. If the variable is used again, then the data is retransmitted to the executors in order for it to be used again. The Driver, however, holds onto the memory as if the Driver does not have the data, then broadcast variable is no longer valid.

We look at destroying broadcast variables next.

The following is an example of how unpersist() can be invoked on a broadcast variable. After calling unpersist if we access the broadcast variable again, it works as usual but behind the scenes, the executors are pulling the data for the variable again.

scala> val rdd_one = sc.parallelize(Seq(1,2,3))
rdd_one: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[101] at parallelize at <console>:25

scala> val k = 5
k: Int = 5

scala> val bk = sc.broadcast(k)
bk: org.apache.spark.broadcast.Broadcast[Int] = Broadcast(163)

scala> rdd_one.map(j => j + bk.value).take(5)
res184: Array[Int] = Array(6, 7, 8)

scala> bk.unpersist

scala> rdd_one.map(j => j + bk.value).take(5)
res186: Array[Int] = Array(6, 7, 8)
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset