Broadcast and accumulator variables together—an example

Although broadcast and accumulator variables are simple and very limited variables (one is read-only, and the other one is write only), they can be actively used to create very complex operations. For example, let's try to apply different machine learning algorithms on the iris dataset in a distributed environment. We will build a Spark job in the following way:

  • The dataset is read and broadcast to all the nodes (as it's small enough to fit in-memory).
  • Each node will use a different classifier on the dataset and return the classifier name and its accuracy score on the full dataset. Note that, to keep things easy in this simple example, we won't do any preprocessing, train/test splitting, or hyperparameter optimization.
  • If the classifiers raise an exception, the string representation of the error, along with the classifier name, should be stored in an accumulator.
  • The final output should contain a list of the classifiers that performed the classification task without errors and their accuracy score.

As the first step, we load the iris dataset and broadcast it to all the nodes in the cluster:

In: from sklearn.datasets import load_iris
bcast_dataset = sc.broadcast(load_iris())

Now, let's continue coding by creating a custom accumulator. It will contain a list of tuples to store the classifier name and the exception it experienced as a string. The custom accumulator is derived using the AccumulatorParam class and should contain at least two methods: zero (which is called when it's initialized) and addInPlace (which is called when the add method is called on the accumulator).

The easiest way to do this is shown in the following code, followed by its initialization as an empty list. Bear in mind that the additive operation is a bit tricky: we need to combine two elements—a tuple and a list—but we don't know which element is the list and which is the tuple; therefore, we first ensure that both elements are lists, and then we can proceed to concatenate them in an easy way (by using the plus operator):

In: from pyspark import AccumulatorParam
class ErrorAccumulator(AccumulatorParam):
def zero(self, initialList):
return initialList
def addInPlace(self, v1, v2):
if not isinstance(v1, list):
v1 = [v1]
if not isinstance(v2, list):
v2 = [v2]
return v1 + v2

errAccum = sc.accumulator([], ErrorAccumulator())

Now, let's define the mapping function: each node should train, test, and evaluate a classifier on the broadcast iris dataset. As an argument, the function will receive the classifier object and should return a tuple containing the classifier name and its accuracy score contained in a list.

If an exception is raised by doing so, the classifier name and the exception, quoted as a string, are added to the accumulator, and an empty list is returned:

In: def apply_classifier(clf, dataset):
clf_name = clf.__class__.name
X = dataset.value.data

y = dataset.value.target
try:
from sklearn.metrics import accuracy_score
clf.fit(X, y)
y_pred = clf.predict(X)
acc = accuracy_score(y, y_pred)
return [(clf_name, acc)]
except Exception as e:
errAccum.add((clf_name, str(e)))
return []

Finally, we have arrived at the core of the job. We're now instantiating a few objects from scikit-learn (some of them are not classifiers, in order to test the accumulator). We will transform them into an RDD, and apply the map function that we created in the previous cell. As the returned value is a list, we can use flatMap to collect only the outputs of the mappers that didn't get caught in an exception:

In: from sklearn.linear_model import SGDClassifier
from sklearn.dummy import DummyClassifier
from sklearn.decomposition import PCA
from sklearn.manifold import MDS

classifiers = [DummyClassifier('most_frequent'),
SGDClassifier(),
PCA(),
MDS()]

(sc.parallelize(classifiers)
.flatMap(lambda x: apply_classifier(x, bcast_dataset))
.collect())

Out: [('DummyClassifier', 0.33333333333333331),
('SGDClassifier', 0.85333333333333339)]

As expected, only the real classifiers are contained in the output. Let's see which classifiers generated an error. Unsurprisingly, here we spot the two missing ones from the preceding output:

In: print("The errors are:", errAccum.value)

Out: The errors are: [('PCA', "'PCA' object has no attribute 'predict'"),
('MDS', "'MDS' object has no attribute 'predict'")]

As a final step, let's clean up the broadcast dataset:

In: bcast_dataset.unpersist()

Remember that, in this example, we've used a small dataset that could be broadcast. In real-world big-data problems, you'll need to load the dataset from the HDFS and broadcast the HDFS path.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset