A session represents a user's continuous interaction with a website, and the user session ends when an arbitrary activity timeout has occurred.
A new session begins once the user returns to the website after a period of inactivity. This recipe will use Apache Pig and a Pig user-defined function (UDF) to generate a subset of records from apache_nobots_tsv.txt
that marks the beginning of a session for a specific IP.
You will need to download/compile/install the following:
apache_nobots_tsv.txt
from http://www.packtpub.com/supportThe following are the steps to create an Apache Pig UDF to sessionize web server log data:
EvalFunc
and implements the Pig interface, Accumulator
. This class is responsible for applying the session logic on the web server log dataset:public class Sessionize extends EvalFunc<DataBag> implements Accumulator<DataBag> { private long sessionLength = 0; private Long lastSession = null; private DataBag sessionBag = null; public Sessionize(String seconds) { sessionLength = Integer.parseInt(seconds) * 1000; sessionBag = BagFactory.getInstance().newDefaultBag(); } @Override public DataBag exec(Tuple tuple) throws IOException { accumulate(tuple); DataBag bag = getValue(); cleanup(); return bag; } @Override public void accumulate(Tuple tuple) throws IOException { if (tuple == null || tuple.size() == 0) { return; } DataBag inputBag = (DataBag) tuple.get(0); for(Tuple t: inputBag) { Long timestamp = (Long)t.get(1); if (lastSession == null) { sessionBag.add(t); } else if ((timestamp - lastSession) >= sessionLength) { sessionBag.add(t); } lastSession = timestamp; } } @Override public DataBag getValue() { return sessionBag; } @Override public void cleanup() { lastSession = null; sessionBag = BagFactory.getInstance().newDefaultBag(); } }
register myjar.jar; define Sessionize com.packt.ch3.etl.pig.Sessionize('1800'), /* 30 minutes */ nobots_weblogs = LOAD '/user/hadoop/apache_nobots_tsv.txt' AS (ip: chararray, timestamp:long, page:chararray, http_status:int, payload_size:int, useragent:chararray); ip_groups = GROUP nobots_weblogs BY ip;
Sessionize
UDF:sessions = FOREACH ip_groups { ordered_by_timestamp = ORDER nobots_weblogs BY timestamp; GENERATE FLATTEN(Sessionize(ordered_by_timestamp)); } STORE sessions INTO '/user/jowens/sessions';
Sessionize
class to the current working directory, and run the Pig script:$ pig –f sessionize.pig
We first created a UDF that extended the
EvalFunc
abstract class and implemented the Accumulator
interface. The
EvalFunc
class is used to create our own function that can be used within a Pig script. Data will be passed to the UDF via the exec(Tuple t)
method, where it is processed. The Accumulator
interface is optional for custom eval
functions, and allows Pig to optimize the data flow and memory utilization of the UDF. Instead of passing the whole dataset, similar to how the
EvalFunc
class works, the Accumulator
interface allows for subsets of the data to be passed to the UDF.
Next, we wrote a Pig script to group all of the web server log records by IP, and sort the records by timestamp. We need the data sorted by timestamp because the Sessionize
UDF uses the sorted order of the timestamps to determine the start of each session.
Then, we generated all of the sessions associated with a specific IP by calling the Sessionize
alias.
Finally, we used the FLATTEN
operator to unnest the Tuples in the DataBags emitted from the UDF.