Actions

Actions, in contrast to transformations, execute the scheduled task on the dataset; once you have finished transforming your data you can execute your transformations. This might contain no transformations (for example, .take(n) will just return n records from an RDD even if you did not do any transformations to it) or execute the whole chain of transformations.

The .take(...) method

This is most arguably the most useful (and used, such as the .map(...) method). The method is preferred to .collect(...) as it only returns the n top rows from a single data partition in contrast to .collect(...), which returns the whole RDD. This is especially important when you deal with large datasets:

data_first = data_from_file_conv.take(1)

If you want somewhat randomized records you can use .takeSample(...) instead, which takes three arguments: First whether the sampling should be with replacement, the second specifies the number of records to return, and the third is a seed to the pseudo-random numbers generator:

data_take_sampled = data_from_file_conv.takeSample(False, 1, 667)

The .collect(...) method

This method returns all the elements of the RDD to the driver. As we have just provided a caution about it, we will not repeat ourselves here.

The .reduce(...) method

The .reduce(...) method reduces the elements of an RDD using a specified method.

You can use it to sum the elements of your RDD:

rdd1.map(lambda row: row[1]).reduce(lambda x, y: x + y)

This will produce the sum of 15.

We first create a list of all the values of the rdd1 using the .map(...) transformation, and then use the .reduce(...) method to process the results. The reduce(...) method, on each partition, runs the summation method (here expressed as a lambda) and returns the sum to the driver node where the final aggregation takes place.

Note

A word of caution is necessary here. The functions passed as a reducer need to be associative, that is, when the order of elements is changed the result does not, and commutative, that is, changing the order of operands does not change the result either.

The example of the associativity rule is (5 + 2) + 3 = 5 + (2 + 3), and of the commutative is 5 + 2 + 3 = 3 + 2 + 5. Thus, you need to be careful about what functions you pass to the reducer.

If you ignore the preceding rule, you might run into trouble (assuming your code runs at all). For example, let's assume we have the following RDD (with one partition only!):

data_reduce = sc.parallelize([1, 2, .5, .1, 5, .2], 1)

If we were to reduce the data in a manner that we would like to divide the current result by the subsequent one, we would expect a value of 10:

works = data_reduce.reduce(lambda x, y: x / y)

However, if you were to partition the data into three partitions, the result will be wrong:

data_reduce = sc.parallelize([1, 2, .5, .1, 5, .2], 3)
data_reduce.reduce(lambda x, y: x / y)

It will produce 0.004.

The .reduceByKey(...) method works in a similar way to the .reduce(...) method, but it performs a reduction on a key-by-key basis:

data_key = sc.parallelize(
    [('a', 4),('b', 3),('c', 2),('a', 8),('d', 2),('b', 1),
     ('d', 3)],4)
data_key.reduceByKey(lambda x, y: x + y).collect()

The preceding code produces the following:

The .reduce(...) method

The .count(...) method

The .count(...) method counts the number of elements in the RDD. Use the following code:

data_reduce.count()

This code will produce 6, the exact number of elements in the data_reduce RDD.

The .count(...) method produces the same result as the following method, but it does not require moving the whole dataset to the driver:

len(data_reduce.collect()) # WRONG -- DON'T DO THIS!

If your dataset is in a key-value form, you can use the .countByKey() method to get the counts of distinct keys. Run the following code:

data_key.countByKey().items()

This code will produce the following output:

The .count(...) method

The .saveAsTextFile(...) method

As the name suggests, the .saveAsTextFile(...) the RDD and saves it to text files: Each partition to a separate file:

data_key.saveAsTextFile(
'/Users/drabast/Documents/PySpark_Data/data_key.txt')

To read it back, you need to parse it back as all the rows are treated as strings:

def parseInput(row):
    import re    
    pattern = re.compile(r'('([a-z])', ([0-9]))')
    row_split = pattern.split(row)
    return (row_split[1], int(row_split[2]))
    
data_key_reread = sc 
    .textFile(
        '/Users/drabast/Documents/PySpark_Data/data_key.txt') 
    .map(parseInput)
data_key_reread.collect()

The list of keys read matches what we had initially:

The .saveAsTextFile(...) method

The .foreach(...) method

This is a method that applies the same function to each element of the RDD in an iterative way; in contrast to .map(..), the .foreach(...) method applies a defined function to each record in a one-by-one fashion. It is useful when you want to save the data to a database that is not natively supported by PySpark.

Here, we'll use it to print (to CLI - not the Jupyter Notebook) all the records that are stored in data_key RDD:

def f(x): 
    print(x)

data_key.foreach(f)

If you now navigate to CLI you should see all the records printed out. Note, that every time the order will most likely be different.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset