Apache Pig is a high-level language for creating MapReduce applications. This recipe will use Apache Pig and a Pig user-defined filter function (UDF) to remove all bot traffic from a sample web server log dataset. Bot traffic is the non-human traffic that visits a webpage, such as spiders.
You will need to download/compile/install the following:
apache_tsv.txt
and useragent_blacklist.txt
from the support page on the Packt website, http://www.packtpub.com/supportapache_tsv.txt
in HDFS and put useragent_blacklist.txt
in your current working directoryCarry out the following steps to filter bot traffic using an Apache Pig UDF:
FilterFunc
abstract class. This class will be used to filter records in the weblogs dataset by using the user agent string.public class IsUseragentBot extends FilterFunc { private Set<String> blacklist = null; private void loadBlacklist() throws IOException { blacklist = new HashSet<String>(); BufferedReader in = new BufferedReader(new FileReader("blacklist")); String userAgent = null; while ((userAgent = in.readLine()) != null) { blacklist.add(userAgent); } } @Override public Boolean exec(Tuple tuple) throws IOException { if (blacklist == null) { loadBlacklist(); } if (tuple == null || tuple.size() == 0) { return null; } String ua = (String) tuple.get(0); if (blacklist.contains(ua)) { return true; } return false; } }
useragent_blacklist.txt
in HDFS:set mapred.cache.files '/user/hadoop/blacklist.txt#blacklist'; set mapred.create.symlink 'yes';
IsUseragentBot
class with Pig, and write the Pig script to filter the weblogs by the user agent:register myudfjar.jar; all_weblogs = LOAD '/user/hadoop/apache_tsv.txt' AS (ip: chararray, timestamp:long, page:chararray, http_status:int, payload_size:int, useragent:chararray); nobots_weblogs = FILTER all_weblogs BY NOT com.packt.ch3.etl.pig.IsUseragentBot(useragent); STORE nobots_weblogs INTO '/user/hadoop/nobots_weblogs';
To run the Pig job, put myudfjar.jar
into the same folder as the Pig script and execute it.
$ ls $ myudfjar.jar filter_bot_traffic.pig $ pig –f filter_bot_traffic.pig
Apache Pig is extendable through the use of user-defined functions (UDF). One way to create a UDF is through the use of the Java abstract classes and interfaces that come with the Apache Pig distribution. In this recipe, we wanted to remove all records that contain known bot user agent strings. One way to do this is to create our own Pig filter.
The IsUseragentBot
class extends the abstract class FilterFunc
, which allows us to override the
exec(Tuple t)
method. A Pig Tuple is an ordered list of fields that can be any Pig primitive, or null. At runtime, Pig will feed the exec(Tuple t)
method of the
IsUseragentBot
class with the user agent strings from our dataset. The UDF will extract the user agent string by accessing the first field in the Tuple, and it will return true
if we find the user agent string is a bot, otherwise the UDF returns false
.
In addition, the IsUseragentBot
UDF reads a file called blacklist
and loads the contents into a HashSet
instance. The file named blacklist
is a symbolic link to blacklist.txt
, which has been distributed to the nodes in the cluster using the distributed cache mechanism. To place a file into the distributed cache, and to create the symbolic link, set the following MapReduce properties:
set mapred.cache.files '/user/hadoop/blacklist.txt#blacklist'; set mapred.create.symlink 'yes';
It is important to note that these properties are not Pig properties. These properties are used by the MapReduce framework, so you can use these properties to load a file to the distributed cache for any MapReduce job.
Next, we told Pig where to find the JAR file containing the IsUseragentBot
UDF:
register myudfjar.jar;
Finally, we call the UDF using the Java class name. When the job runs, Pig will instantiate an instance of the
IsUseragentBot
class and feed the exec(Tuple t)
method with records from the all_weblogs
relation.
Starting in Pig Version 0.9, Pig UDFs can access the distributed cache without setting the mapred.cache.files
and mapred.create.symlink
properties. Most abstract Pig classes that used to create UDFs now have a method named List<String> getCacheFiles()
that can be overridden to load files from HDFS into the distributed cache. For example, the
IsUseragentBot
class can be modified to load the blacklist.txt
file to the distributed cache by adding the following method:
@Override public List<String> getCacheFiles() { List<String> list = new ArrayList<String>(); list.add("/user/hadoop/blacklist.txt#blacklist"); return list; }