Custom plugin development

As we discussed earlier about the custom plugin development, RabbitMQ gives us a chance to develop our own plugins. Sometimes we need to access internal functionality of RabbitMQ, which is not possible with AMQP interface. Therefore, we need to design and develop our custom plugins.

Ok, we decided to develop our custom RabbitMQ plugin. Now, we should know the requirements for custom plugin development in RabbitMQ. As RabbitMQ is developed in Erlang, we have to know Erlang system and its design principles first. After that, we have to know the internal API of RabbitMQ to use APIs in plugin. To access the RabbitMQ APIs, we need a working RabbitMQ development environment. Therefore, we need to download all source code using Mercurial source code management tool and make the source code available using the following command:

http://hg.rabbitmq.com/rabbitmq-public-umbrella/hg clone http://hg.rabbitmq.com/rabbitmq-public-umbrella/
cd rabbitmq-public-umbrella
make co
make

Output of the public umbrella plugin is as follows:

Custom plugin development

Making the RabbitMQ from Source Code

After preparing the development environment, we are now ready to talk about the basics of Erlang.

Basics of Erlang

Erlang was developed by Ericsson to manage telecom projects to support distributed, real-time, high availability applications. Erlang is a purpose oriented programming language that is concurrent and distributed naturally. The first version of Erlang was released in 1986, and the first open source version of Erlang was released in 1998. Erlang is a general-purpose, concurrent, garbage-collected programming language and runtime system.

Erlang relies on a very simple concurrency model that allows individual blocks of code to be executed multiple times on the same host. Additionally, Erlang provides a failure model on its concurrency model to handle errors on the processes. Thus, developing distributed, scalable, and highly fault tolerant software systems could be easily done by Erlang.

Erlang is a functional programming language similar to Clojure, Scala, and so on. After talking about the brief introduction of Erlang, let's now move on to the basics of Erlang.

If you installed the RabbitMQ Server on your computer with the help of Chapter 1, Getting Started, you also have Erlang runtime environment on your computer. If not installed, please turn back to Chapter 1, Getting Started, and read the installation instructions. When you run the following command on your Terminal or Command:

$ erl

You will get the following Erlang shell:

Erlang R16B03 (erts-5.10.4) [source] [64-bit] [smp:4:4] [async-threads:10] [hipe] [kernel-poll:false]

Eshell V5.10.4  (abort with ^G)
1>

Then, if you add two integers with a stop point and press Enter key, you'll get the sum of these integers as follows:

1> 10 + 27.
37

Single line comments on the Erlang can be shown with the percentage (%) symbol such as:

% This is comment

Variables and expressions

Variables in Erlang are similar to any other dynamic interpreted programming languages such as Python, Ruby, and so on. Number types are separated into two, integers and float numbers, as shown in the following code:

2> 3+4.
7
3> 3.5 * 3.6. 
12.6

Strings are described with the double or single quotation marks. Moreover, Erlang has lots of helper functions for strings:

5> "ahmet".
"ahmet"
8> string:substr("ahmet",1,3).
"ahm"
9> string:join(["one","two","three"],", ").
"one, two, three"

The substr() function takes the substring of the string ("ahmet") within the given range indexes. The Join() function takes the string list and joins them with a separator. Erlang has lots of utility functions for String.

Erlang supports all Boolean expressions such as equality, more than, less than, and so on. Examples of the Boolean expressions are listed in the following command lines:

10> "abc" < "def".
true
11> "abc" == "def".
false
12> 5 == 5.
true

Lastly, Erlang has different variable type from other programming languages called Atoms. Atoms are similar with the #define value in C. Atoms are named constants and they are used for comparison. We'll see the usage of atom type variable in the Function and Modules section.

Tuples and lists

Tuples is the compound data type that stores the fixed number of elements. It is similar to Python's tuples. The following examples show Tuple and its utility functions that is provided by Erlang:

1> T = {test,32,{12,23,"emrah"}}.
{test,32,{12,23,"emrah"}}
2> element(1,T).
test
3> setelement(2,T,23).
{test,23,{12,23,"emrah"}}
4> tuple_size(T).
3

The element() function gets the related element with the given index. The setelement() function sets the element with new value with given index. Lastly, tuple size is showed with the tuple_size() function.

List is a compound data type that stores the variable number of elements. It is similar to Tuples; however, some properties of List give advantages over Tuples. List has head and tail structure, which are also a List data structure. Head and Tail of the List is described as follows: [H|T]. The following examples show the List and its helper functions that is provided by Erlang:

1> L1 = [a,2,{c,4}].
[a,2,{c,4}]
2> [H|T] = L1.
[a,2,{c,4}]
3> H.
a
4> T.
[2,{c,4}]
5> L2 = [d|T].
[d,2,{c,4}]
6> length(L1).
3
7> lists:append([L1,5]).
[a,2,{c,4}|5]

We first assign a list to a variable. In Erlang, after assignment of a variable, we cannot assign to the variable again. In the second example, we were splitting the list into head and tails lists. Moreover, we have the length() function to calculate the length of the lists. Additionally, Erlang provides lots of list utility functions, and the append() function is one of them. It appends an element onto the list.

Functions and modules

As we want to make our components reusable, we have to use such structures to store functions, attributes, and so on. In Erlang, we are using Modules to reuse our functional elements. A module in Erlang consists of attributes and function declarations. The following example shows the Fibonacci series function within module code:

%File Name: fact.erl
-module(fact).       % module attribute
-export([fact/1]).   % export attribute
fact(N) when N>0 ->  % beginning of function declaration
N * fact(N-1);
fact(0) ->
1.                   % end of function declaration

As we look at fibo.erl in detail, we can see the module attributes, module, export, and functions with their statements. The module attribute gives the name to the module that will be useful when you import the module. The export attribute defines each function with their length of parameters. If we want to import this code into the Erlang shell, we can use the following:

1> c(fact).
{ok,fact}
2> fact:fact(5).
120

As you can see, we first compile the source code using the c function. Then, we are able to call the function fact using the module name (fib.erl). Moreover, we can call the same name function with the one parameter atom. However, functionality of the function is different. The following example code shows these functions:

%Filename: fib.erl
-module(fib).
-export([fib/1]).

fib(0) -> 0;
fib(1) -> 1;
fib(N) when N > 1 -> fib(N-1) + fib(N-2).

Finally, if we compile and run the Fibonacci sequence code, we'll get the following result in our command line:

2> c(fib).
{ok,fib}
3> fib:fib(5).
5
4> fib:fib(10).
55

Functions in the Erlang simply match the parameters with the provided parameters from the function users. If parameters are matched with the function, Erlang runtime calls matched function. As you can see, functions of Erlang simply fit well with the recursive algorithms. The last example code shows you how well-known Merge Sort could be written in four lines of code:

%Filename: sorting.erl
-module(sorting).
-export([mergeSort/1]).

mergeSort(L) when length(L) == 1 -> L;
mergeSort(L) when length(L) > 1 ->
{L1, L2} = lists:split(length(L) div 2, L),
lists:merge(mergeSort(L1), mergeSort(L2)).

Will give the following output:

2> c(sorting).
{ok,sorting}
3> sorting:mergeSort([1,34,21,22,42,55]).
[1,21,22,34,42,55]

Conditionals

Erlang has the if clauses; however, its structure is somehow different from other programming languages. The structure of the if clause can be seen in the following example:

%Filename comp.erl
-module(comp).
-export([compare/2]).

compare(A,B) ->
  if
    A > B ->
      a_more_than_b;
    B > A ->
      b_more_than_a;
    A == B ->
      a_is_equal_to_b
  end.

After running the preceding code in Erlang shell, you will get the following command line:

3> c(comp).
{ok,comp}
4> comp:compare(10,5).
a_more_than_b
5> comp:compare(5,5). 
a_is_equal_to_b
6> comp:compare(5,7).
b_more_than_a

Looping in Erlang

As Erlang gives us a chance to develop our codes easily in a recursive way, we don't need to use for loops similar to other programming languages. Therefore, if we need to iterate over list or any other data structure, all we need to do is develop recursive function.

The following example shows how to sum up all the elements within the list:

%Filename: sum.erl
-module(sum).
-export([sum/1]).

sum([]) ->
  0;
sum([H|T]) ->
  H + sum(T).

Note that, H and T are arguments to sum() function.

Now, we are ready to compile and run the module in the Erlang shell:

12> c(sum).
{ok,sum}
13> sum:sum([1,3,2,4435,232,1]).
4674
14> sum:sum([]).
0

Erlang has some helper functions in its data structures. The foreach() function is one of the helper functions of the list data structure. The following code is one of the examples of foreach attribute:

%Filename: iter.erl
-module(iter).
-export([iter/1]).

iter(L) ->
  lists:foreach(fun
    (N) ->
      io:format("Value: ~p ",[N])
  end, L).

Now, we can compile and run the foreach code, as shown in the following example:

16> c(iter).
{ok,iter}
17> iter:iter([1,2,34,3,25,24]).
Value: 1 Value: 2 Value: 34 Value: 3 Value: 25 Value: 24 ok
18> iter:iter([1,6,32,32,2,34,13,67,25,24]).
Value: 1 Value: 6 Value: 32 Value: 32 Value: 2 Value: 34 Value: 13 Value: 67 Value: 25 Value: 24 ok

Concurrent programming

As earlier explained, one of the main reasons for developing in Erlang is its capacity to handle concurrency and distributed programming. With concurrency, we can run programs that will run in the numerous threads. Erlang gives us an amazing chance to create parallel threads and communicate these threads with each other easily. Erlang has no mutable data structures, which means no locks are need for threading. This removes a lot of the complexity while programming concurrent programs. It also means that every time you think you are changing a variable, you are actually getting a new copy of the variable with new value, and not actually changing the value.

Erlang calls the threads of execution as process. Erlang creates the new threads using the spawn() function. Definition of spawn function is as follows:

spawn (Module, Function, Arguments)

In the following example you can see the easy usage of creating the threads using spawn:

% Filename: talk.erl
-module(talk).
-export([talk/2,run_concurrently/0]).

talk(Word, 0) ->
  done;
talk(Word, N) ->
  io:format("~p~n",[Word]),
  talk(Word, N - 1).

run_concurrently() ->
  spawn(talk, talk, [hello, 5]),
  spawn(talk, talk, [world, 4]).

The following command line is viewed after compiling and running the module and function:

8> c(talk). 
talk.erl:4: Warning: variable 'Word' is unused
{ok,talk}
9> talk:run_concurrently().
hello
world
hello
world
<0.74.0>
hello
world
hello
world
hello

Erlang also gives us the opportunity to send and receive messages between threads, using the receive construct. The receive construct has one role: to allow processes to await messages from the other threads. The structure of receive is as follows:

receive
  pattern1 ->
    actions1;
  pattern2 ->
    actions2;
  pattern3 ->
    actions3
end.

Sending message is transmitted by the operator "!". The syntax of "!" is as follows:

Pid ! Message

The following code is an example to send and receive messages between threads using the Erlang's helper message receiver structure:

% Filename: msg.erl
-module(msg).
-export([sender_func/2,receiver_func/0,start_func/0]).

sender_func(0, Sender_PID) ->
  Sender_PID ! finished,
  io:format("Sender is Finished~n",[]);
sender_func(N, Sender_PID) ->
  Sender_PID ! {sender_func, self()},
  receive
    receiver_func ->
      io:format("Sender received message~n",[])
  end,
  sender_func(N-1, Sender_PID).

receiver_func() ->
  receive
    finished ->
      io:format("Receiver finished~n",[]);
    {sender_func, Sender_PID} ->
      io:format("Receiver receives message~n",[]),
      Sender_PID ! receiver_func,
      receiver_func()
  end.

start_func() ->
  Receiver_PID = spawn(msg, receiver_func, []),
  spawn(msg, sender_func, [5, Receiver_PID]).	

As you see in the preceding example code, we have two functions: sender_func sends the messages in a recurrent way, while receiver_func receives the messages and outputs them. Whenever we want to send message to the other thread, we have to send the message with the destination's PID, where PID is process identifier. Therefore, you can see the PID related information in the sending message structure. Moreover, you see that the receive structure helps receive the message inside the thread functions while filtering the message. The following command line shows the compiled message code and its functions:

5> c(msg). 
{ok,msg}
6> msg:start_func().
Receiver receives message
<0.55.0>
Sender received message
Receiver receives message
Sender received message
Receiver receives message
Sender received message
Receiver receives message
Sender received message
Receiver receives message
Sender received message
Sender is Finished
Receiver finished

Simple RabbitMQ metronome plugin

Now we have nearly learned to write code in Erlang. Our final task is to develop our own RabbitMQ plugin called Metronome, which is an official custom plugin of the RabbitMQ. It is published at https://www.rabbitmq.com/plugin-development.html.

Metronome plugin simply declares an exchange called "metronome" and sends a message every second with routing key in the form of yyyy.MM.dd.dow.hh.mm.ss. Therefore, every RabbitMQ client receives the message which is bound to this queue with routing key such as "*.*.*.*.*.*.20", "2014.*.*.*.*.*.*", and so on.

You can download the metronome plugin from the rabbitmq-metronome repository in RabbitMQ's official Mercurial repository into your RabbitMQ development environment. Moreover, you need to make this plugin and enable the metronome plugin. Finally, run the RabbitMQ Server, and you will see that RabbitMQ executes the rabbitmq-metronome plugin. The following command lines show the process of running the metronome plugin:

http://hg.rabbitmq.com/rabbitmq-metronome/hg clone http://hg.rabbitmq.com/rabbitmq-metronome/
make

mkdir –p rabbitmq-server/plugins-folder
cd rabbitmq-server/plugins-folder
ln –s rabbitmq-erlang-client
ln –s rabbitmq-metronome
scripts/rabbitmq-plugins enable rabbitmq_metronome

make run

vagrant@precise32:~$ sudo rabbitmqctl status
Status of node rabbit@precise32 ...
[{pid,844},
 {running_applications,
     [{rabbitmq_management,"RabbitMQ Management Console","3.0.4"},
      {rabbitmq_web_dispatch,"RabbitMQ Web Dispatcher","3.0.4"},
      {rabbitmq-metronome, "Embedded Rabbit Metronome", "0.01"},
      {webmachine,"webmachine","1.9.1-rmq3.0.4-git52e62bc"},
      {mochiweb,"MochiMedia Web Server","2.3.1-rmq3.0.4-gitd541e9a"},
      {rabbitmq_management_agent,"RabbitMQ Management Agent","3.0.4"},
      {rabbit,"RabbitMQ","3.0.4"},
      {os_mon,"CPO  CXC 138 46","2.2.7"},
      {inets,"INETS  CXC 138 49","5.7.1"},
      {xmerl,"XML parser","1.2.10"},
      {mnesia,"MNESIA  CXC 138 12","4.5"},
      {amqp_client,"RabbitMQ AMQP Client","3.0.4"},
      {sasl,"SASL  CXC 138 11","2.1.10"},
      {stdlib,"ERTS  CXC 138 10","1.17.5"},
      {kernel,"ERTS  CXC 138 10","2.14.5"}]},
 {os,{unix,linux}},
 {erlang_version,
     "Erlang R14B04 (erts-5.8.5) [source] [rq:1] [async-threads:30] [kernel-poll:true]
"},
 {memory,
     [{total,16283792},
      {connection_procs,2728},
      {queue_procs,25080},
      {plugins,48952},
      {other_proc,4756524},
      {mnesia,31508},
      {mgmt_db,25444},
      {msg_index,11208},
      {other_ets,521060},
      {binary,2784},
      {code,9136933},
      {atom,1027009},
      {other_system,694562}]},
 {vm_memory_high_watermark,0.4},
 {vm_memory_limit,154828800},
 {disk_free_limit,1000000000},
 {disk_free,77275533312},
 {file_descriptors,
     [{total_limit,924},{total_used,5},{sockets_limit,829},{sockets_used,1}]},
 {processes,[{limit,1048576},{used,190}]},
 {run_queue,0},
 {uptime,206}]
...done.

Now that we have seen that our custom plugin rabbitmq-metronome worked on the RabbitMQ Server, let's move onto the details of this plugin and its codes.

Firstly, we should look over each code in rabbitmq-metronome with the following table:

Filename

Description

rabbitmq_metronome.app.src

This file simply defines the dependencies and the module properties such as its name, its version, and so on.

rabbitmq_metronome.erl

This file presents the Erlang "application" behavior and starts and stops the plugin with the related Erlang VM.

rabbitmq_metronome_sup.erl

This file presents the Erlang "supervisor" behavior that monitors the worker process and restarts it if it crashes.

rabbitmq_metronome_worker.erl

This file is the core of the plugin. All of the work is done by this code. In metronome plugin, this code connects to the RabbitMQ Server and creates a task that will be triggered every second.

rabbitmq_metronome_tests.erl

This file represents the tests of the plugin. You can run the tests with the following command line:

make test

After talking about the overlook of the codes inside rabbitmq-metronome, we will now go into the details of the important codes, starting with rabbitmq_metronome.app.src:

% Filename: rabbitmq_metronome.app.src
{application, rabbitmq_metronome,
   [{description, "Embedded Rabbit Metronome"},
    {vsn, "0.01"},
    {modules, []},
    {registered, []},
    {mod, {rabbit_metronome, []}},
    {env, []},
    {applications, [kernel, stdlib, rabbit, amqp_client]}]}.

As we can see, this code simply defines the application name, version, its modules, environment variables, and its dependencies. Every module should have this kind of parameters to describe the module.

The following code shows the main functionality code called rabbit_metronome_worker. First we will look at the code, and then we'll discuss the code in detail:

%% Filename: rabbit_metronome_worker.erl
%% Copyright (c) 2007-2013 GoPivotal, Inc.
%% You may use this code for any purpose.

-module(rabbit_metronome_worker).
-behaviour(gen_server).

-export([start_link/0]).

-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
         terminate/2, code_change/3]).

-export([fire/0]).

-include_lib("amqp_client/include/amqp_client.hrl").

-record(state, {channel}).

-define(RKFormat,
        "~4.10.0B.~2.10.0B.~2.10.0B.~1.10.0B.~2.10.0B.~2.10.0B.~2.10.0B").

start_link() ->
    gen_server:start_link({global, ?MODULE}, ?MODULE, [], []).

%---------------------------
% Gen Server Implementation
% --------------------------

init([]) ->
    {ok, Connection} = amqp_connection:start(#amqp_params_direct{}),
    {ok, Channel} = amqp_connection:open_channel(Connection),
    amqp_channel:call(Channel, #'exchange.declare'{exchange = <<"metronome">>,type = <<"topic">>}),
    fire(),
    {ok, #state{channel = Channel}}.

handle_call(_Msg, _From, State) ->
    {reply, unknown_command, State}.

handle_cast(fire, State = #state{channel = Channel}) ->
    Properties = #'P_basic'{content_type = <<"text/plain">>, delivery_mode = 1},
    {Date={Year,Month,Day},{Hour, Min,Sec}} = erlang:universaltime(),
    DayOfWeek = calendar:day_of_the_week(Date),
    RoutingKey = list_to_binary(
                   io_lib:format(?RKFormat, [Year, Month, Day,
                                 DayOfWeek, Hour, Min, Sec])),
    Message = RoutingKey,
    BasicPublish = #'basic.publish'{exchange = <<"metronome">>,
                                    routing_key = RoutingKey},
    Content = #amqp_msg{props = Properties, payload = Message},
    amqp_channel:call(Channel, BasicPublish, Content),
    timer:apply_after(1000, ?MODULE, fire, []),
    {noreply, State};

handle_cast(_, State) ->
    {noreply,State}.

handle_info(_Info, State) ->
    {noreply, State}.

terminate(_, #state{channel = Channel}) ->
    amqp_channel:call(Channel, #'channel.close'{}),
    ok.

code_change(_OldVsn, State, _Extra) ->
    {ok, State}.

%---------------------------

fire() ->
    gen_server:cast({global, ?MODULE}, fire).

The preceding code simply performs the opening connection on initializing the server connection, and then, in every second, code sends message to the queue with routing name equals to the date and time of the message sent. If the connection to RabbitMQ Server is closed, then module's connection to the RabbitMQ Server also terminates.

Connection to the server is opened in the init() function. The functions with their names starting with "handle" are to be called in every RabbitMQ Server invocation. Therefore, we need to implement our message sending code into these functions. Finally, we should terminate our connection from the terminate() function.

The following code shows the supervisor code of the plugin. As said earlier, the supervisor module monitors the functionality of the worker. First we'll look at the source code, and then we'll dive into the code details:

%% Copyright (c) 2007-2013 GoPivotal, Inc.
%% You may use this code for any purpose.

-module(rabbit_metronome_sup).

-behaviour(supervisor).

-export([start_link/0, init/1]).

start_link() ->
    supervisor:start_link({local, ?MODULE}, ?MODULE, _Arg = []).

init([]) ->
    {ok, {{one_for_one, 3, 10},
          [{rabbit_metronome_worker,
            {rabbit_metronome_worker, start_link, []},
            permanent,
            10000,
            worker,
            [rabbit_metronome_worker]}
          ]}}.

Supervisor code just monitors the worker. Supervisor uses the start_link() function. The following code describes starting and stopping the rabbitmq-metronome plugin. RabbitMQ Server calls the start() function when RabbitMQ enables the plugin, and it calls the stop() function when RabbitMQ disables the plugin:

%% Copyright (c) 2007-2013 GoPivotal, Inc.
%% You may use this code for any purpose.

-module(rabbit_metronome).

-behaviour(application).

-export([start/2, stop/1]).

start(normal, []) ->
    rabbit_metronome_sup:start_link().

stop(_State) ->
    ok.
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset