As we discussed earlier about the custom plugin development, RabbitMQ gives us a chance to develop our own plugins. Sometimes we need to access internal functionality of RabbitMQ, which is not possible with AMQP interface. Therefore, we need to design and develop our custom plugins.
Ok, we decided to develop our custom RabbitMQ plugin. Now, we should know the requirements for custom plugin development in RabbitMQ. As RabbitMQ is developed in Erlang, we have to know Erlang system and its design principles first. After that, we have to know the internal API of RabbitMQ to use APIs in plugin. To access the RabbitMQ APIs, we need a working RabbitMQ development environment. Therefore, we need to download all source code using Mercurial source code management tool and make the source code available using the following command:
http://hg.rabbitmq.com/rabbitmq-public-umbrella/hg clone http://hg.rabbitmq.com/rabbitmq-public-umbrella/ cd rabbitmq-public-umbrella make co make
Output of the public umbrella plugin is as follows:
After preparing the development environment, we are now ready to talk about the basics of Erlang.
Erlang was developed by Ericsson to manage telecom projects to support distributed, real-time, high availability applications. Erlang is a purpose oriented programming language that is concurrent and distributed naturally. The first version of Erlang was released in 1986, and the first open source version of Erlang was released in 1998. Erlang is a general-purpose, concurrent, garbage-collected programming language and runtime system.
Erlang relies on a very simple concurrency model that allows individual blocks of code to be executed multiple times on the same host. Additionally, Erlang provides a failure model on its concurrency model to handle errors on the processes. Thus, developing distributed, scalable, and highly fault tolerant software systems could be easily done by Erlang.
Erlang is a functional programming language similar to Clojure, Scala, and so on. After talking about the brief introduction of Erlang, let's now move on to the basics of Erlang.
If you installed the RabbitMQ Server on your computer with the help of Chapter 1, Getting Started, you also have Erlang runtime environment on your computer. If not installed, please turn back to Chapter 1, Getting Started, and read the installation instructions. When you run the following command on your Terminal or Command:
$ erl
You will get the following Erlang shell:
Erlang R16B03 (erts-5.10.4) [source] [64-bit] [smp:4:4] [async-threads:10] [hipe] [kernel-poll:false] Eshell V5.10.4 (abort with ^G) 1>
Then, if you add two integers with a stop point and press Enter key, you'll get the sum of these integers as follows:
1> 10 + 27. 37
Single line comments on the Erlang can be shown with the percentage (%) symbol such as:
% This is comment
Variables in Erlang are similar to any other dynamic interpreted programming languages such as Python, Ruby, and so on. Number types are separated into two, integers and float numbers, as shown in the following code:
2> 3+4. 7 3> 3.5 * 3.6. 12.6
Strings are described with the double or single quotation marks. Moreover, Erlang has lots of helper functions for strings:
5> "ahmet". "ahmet" 8> string:substr("ahmet",1,3). "ahm" 9> string:join(["one","two","three"],", "). "one, two, three"
The substr()
function takes the substring of the string ("ahmet"
) within the given range indexes. The Join()
function takes the string list and joins them with a separator. Erlang has lots of utility functions for String.
Erlang supports all Boolean expressions such as equality, more than, less than, and so on. Examples of the Boolean expressions are listed in the following command lines:
10> "abc" < "def". true 11> "abc" == "def". false 12> 5 == 5. true
Lastly, Erlang has different variable type from other programming languages called Atoms. Atoms are similar with the #define value in C. Atoms are named constants and they are used for comparison. We'll see the usage of atom type variable in the Function and Modules section.
Tuples is the compound data type that stores the fixed number of elements. It is similar to Python's tuples. The following examples show Tuple and its utility functions that is provided by Erlang:
1> T = {test,32,{12,23,"emrah"}}. {test,32,{12,23,"emrah"}} 2> element(1,T). test 3> setelement(2,T,23). {test,23,{12,23,"emrah"}} 4> tuple_size(T). 3
The element()
function gets the related element with the given index. The setelement()
function sets the element with new value with given index. Lastly, tuple size is showed with the tuple_size()
function.
List
is a compound data type that stores the variable number of elements. It is similar to Tuples; however, some properties of List give advantages over Tuples. List has head and tail structure, which are also a List data structure. Head and Tail of the List is described as follows: [H|T]
. The following examples show the List
and its helper functions that is provided by Erlang:
1> L1 = [a,2,{c,4}]. [a,2,{c,4}] 2> [H|T] = L1. [a,2,{c,4}] 3> H. a 4> T. [2,{c,4}] 5> L2 = [d|T]. [d,2,{c,4}] 6> length(L1). 3 7> lists:append([L1,5]). [a,2,{c,4}|5]
We first assign a list to a variable. In Erlang, after assignment of a variable, we cannot assign to the variable again. In the second example, we were splitting the list into head and tails lists. Moreover, we have the length()
function to calculate the length of the lists. Additionally, Erlang provides lots of list utility functions, and the append()
function is one of them. It appends an element onto the list.
As we want to make our components reusable, we have to use such structures to store functions, attributes, and so on. In Erlang, we are using Modules to reuse our functional elements. A module in Erlang consists of attributes and function declarations. The following example shows the Fibonacci series function within module code:
%File Name: fact.erl -module(fact). % module attribute -export([fact/1]). % export attribute fact(N) when N>0 -> % beginning of function declaration N * fact(N-1); fact(0) -> 1. % end of function declaration
As we look at fibo.erl in detail, we can see the module attributes, module, export, and functions with their statements. The module
attribute gives the name to the module that will be useful when you import the module. The export
attribute defines each function with their length of parameters. If we want to import this code into the Erlang shell, we can use the following:
1> c(fact). {ok,fact} 2> fact:fact(5). 120
As you can see, we first compile the source code using the c
function. Then, we are able to call the function fact using the module
name (fib.erl
). Moreover, we can call the same name function with the one parameter atom. However, functionality of the function is different. The following example code shows these functions:
%Filename: fib.erl -module(fib). -export([fib/1]). fib(0) -> 0; fib(1) -> 1; fib(N) when N > 1 -> fib(N-1) + fib(N-2).
Finally, if we compile and run the Fibonacci sequence code, we'll get the following result in our command line:
2> c(fib). {ok,fib} 3> fib:fib(5). 5 4> fib:fib(10). 55
Functions in the Erlang simply match the parameters with the provided parameters from the function users. If parameters are matched with the function, Erlang runtime calls matched function. As you can see, functions of Erlang simply fit well with the recursive algorithms. The last example code shows you how well-known Merge Sort could be written in four lines of code:
%Filename: sorting.erl -module(sorting). -export([mergeSort/1]). mergeSort(L) when length(L) == 1 -> L; mergeSort(L) when length(L) > 1 -> {L1, L2} = lists:split(length(L) div 2, L), lists:merge(mergeSort(L1), mergeSort(L2)).
Will give the following output:
2> c(sorting). {ok,sorting} 3> sorting:mergeSort([1,34,21,22,42,55]). [1,21,22,34,42,55]
Erlang has the if clauses; however, its structure is somehow different from other programming languages. The structure of the if
clause can be seen in the following example:
%Filename comp.erl -module(comp). -export([compare/2]). compare(A,B) -> if A > B -> a_more_than_b; B > A -> b_more_than_a; A == B -> a_is_equal_to_b end.
After running the preceding code in Erlang shell, you will get the following command line:
3> c(comp). {ok,comp} 4> comp:compare(10,5). a_more_than_b 5> comp:compare(5,5). a_is_equal_to_b 6> comp:compare(5,7). b_more_than_a
As Erlang gives us a chance to develop our codes easily in a recursive way, we don't need to use for
loops similar to other programming languages. Therefore, if we need to iterate over list
or any other data structure, all we need to do is develop recursive function.
The following example shows how to sum up all the elements within the list:
%Filename: sum.erl -module(sum). -export([sum/1]). sum([]) -> 0; sum([H|T]) -> H + sum(T).
Note that, H
and T
are arguments to sum()
function.
Now, we are ready to compile and run the module in the Erlang shell:
12> c(sum). {ok,sum} 13> sum:sum([1,3,2,4435,232,1]). 4674 14> sum:sum([]). 0
Erlang has some helper functions in its data structures. The foreach()
function is one of the helper functions of the list data structure. The following code is one of the examples of foreach
attribute:
%Filename: iter.erl -module(iter). -export([iter/1]). iter(L) -> lists:foreach(fun (N) -> io:format("Value: ~p ",[N]) end, L).
Now, we can compile and run the foreach
code, as shown in the following example:
16> c(iter). {ok,iter} 17> iter:iter([1,2,34,3,25,24]). Value: 1 Value: 2 Value: 34 Value: 3 Value: 25 Value: 24 ok 18> iter:iter([1,6,32,32,2,34,13,67,25,24]). Value: 1 Value: 6 Value: 32 Value: 32 Value: 2 Value: 34 Value: 13 Value: 67 Value: 25 Value: 24 ok
As earlier explained, one of the main reasons for developing in Erlang is its capacity to handle concurrency and distributed programming. With concurrency, we can run programs that will run in the numerous threads. Erlang gives us an amazing chance to create parallel threads and communicate these threads with each other easily. Erlang has no mutable data structures, which means no locks are need for threading. This removes a lot of the complexity while programming concurrent programs. It also means that every time you think you are changing a variable, you are actually getting a new copy of the variable with new value, and not actually changing the value.
Erlang calls the threads of execution as process. Erlang creates the new threads using the spawn()
function. Definition of spawn function is as follows:
spawn (Module, Function, Arguments)
In the following example you can see the easy usage of creating the threads using spawn:
% Filename: talk.erl -module(talk). -export([talk/2,run_concurrently/0]). talk(Word, 0) -> done; talk(Word, N) -> io:format("~p~n",[Word]), talk(Word, N - 1). run_concurrently() -> spawn(talk, talk, [hello, 5]), spawn(talk, talk, [world, 4]).
The following command line is viewed after compiling and running the module and function:
8> c(talk). talk.erl:4: Warning: variable 'Word' is unused {ok,talk} 9> talk:run_concurrently(). hello world hello world <0.74.0> hello world hello world hello
Erlang also gives us the opportunity to send and receive messages between threads, using the receive
construct. The receive
construct has one role: to allow processes to await messages from the other threads. The structure of receive is as follows:
receive pattern1 -> actions1; pattern2 -> actions2; pattern3 -> actions3 end.
Sending message is transmitted by the operator "!
". The syntax of "!
" is as follows:
Pid ! Message
The following code is an example to send and receive messages between threads using the Erlang's helper message receiver structure:
% Filename: msg.erl
-module(msg).
-export([sender_func/2,receiver_func/0,start_func/0]).
sender_func(0, Sender_PID) ->
Sender_PID ! finished,
io:format("Sender is Finished~n",[]);
sender_func(N, Sender_PID) ->
Sender_PID ! {sender_func, self()},
receive
receiver_func ->
io:format("Sender received message~n",[])
end,
sender_func(N-1, Sender_PID).
receiver_func() ->
receive
finished ->
io:format("Receiver finished~n",[]);
{sender_func, Sender_PID} ->
io:format("Receiver receives message~n",[]),
Sender_PID ! receiver_func,
receiver_func()
end.
start_func() ->
Receiver_PID = spawn(msg, receiver_func, []),
spawn(msg, sender_func, [5, Receiver_PID]).
As you see in the preceding example code, we have two functions: sender_func
sends the messages in a recurrent way, while receiver_func
receives the messages and outputs them. Whenever we want to send message to the other thread, we have to send the message with the destination's PID, where PID is process identifier. Therefore, you can see the PID related information in the sending message structure. Moreover, you see that the receive structure helps receive the message inside the thread functions while filtering the message. The following command line shows the compiled message code and its functions:
5> c(msg). {ok,msg} 6> msg:start_func(). Receiver receives message <0.55.0> Sender received message Receiver receives message Sender received message Receiver receives message Sender received message Receiver receives message Sender received message Receiver receives message Sender received message Sender is Finished Receiver finished
Now we have nearly learned to write code in Erlang. Our final task is to develop our own RabbitMQ plugin called Metronome, which is an official custom plugin of the RabbitMQ. It is published at https://www.rabbitmq.com/plugin-development.html.
Metronome plugin simply declares an exchange called "metronome" and sends a message every second with routing key in the form of yyyy.MM.dd.dow.hh.mm.ss. Therefore, every RabbitMQ client receives the message which is bound to this queue with routing key such as "*.*.*.*.*.*.20", "2014.*.*.*.*.*.*", and so on.
You can download the metronome plugin from the rabbitmq-metronome repository in RabbitMQ's official Mercurial repository into your RabbitMQ development environment. Moreover, you need to make this plugin and enable the metronome plugin. Finally, run the RabbitMQ Server, and you will see that RabbitMQ executes the rabbitmq-metronome plugin. The following command lines show the process of running the metronome plugin:
http://hg.rabbitmq.com/rabbitmq-metronome/hg clone http://hg.rabbitmq.com/rabbitmq-metronome/ make mkdir –p rabbitmq-server/plugins-folder cd rabbitmq-server/plugins-folder ln –s rabbitmq-erlang-client ln –s rabbitmq-metronome scripts/rabbitmq-plugins enable rabbitmq_metronome make run vagrant@precise32:~$ sudo rabbitmqctl status Status of node rabbit@precise32 ... [{pid,844}, {running_applications, [{rabbitmq_management,"RabbitMQ Management Console","3.0.4"}, {rabbitmq_web_dispatch,"RabbitMQ Web Dispatcher","3.0.4"}, {rabbitmq-metronome, "Embedded Rabbit Metronome", "0.01"}, {webmachine,"webmachine","1.9.1-rmq3.0.4-git52e62bc"}, {mochiweb,"MochiMedia Web Server","2.3.1-rmq3.0.4-gitd541e9a"}, {rabbitmq_management_agent,"RabbitMQ Management Agent","3.0.4"}, {rabbit,"RabbitMQ","3.0.4"}, {os_mon,"CPO CXC 138 46","2.2.7"}, {inets,"INETS CXC 138 49","5.7.1"}, {xmerl,"XML parser","1.2.10"}, {mnesia,"MNESIA CXC 138 12","4.5"}, {amqp_client,"RabbitMQ AMQP Client","3.0.4"}, {sasl,"SASL CXC 138 11","2.1.10"}, {stdlib,"ERTS CXC 138 10","1.17.5"}, {kernel,"ERTS CXC 138 10","2.14.5"}]}, {os,{unix,linux}}, {erlang_version, "Erlang R14B04 (erts-5.8.5) [source] [rq:1] [async-threads:30] [kernel-poll:true] "}, {memory, [{total,16283792}, {connection_procs,2728}, {queue_procs,25080}, {plugins,48952}, {other_proc,4756524}, {mnesia,31508}, {mgmt_db,25444}, {msg_index,11208}, {other_ets,521060}, {binary,2784}, {code,9136933}, {atom,1027009}, {other_system,694562}]}, {vm_memory_high_watermark,0.4}, {vm_memory_limit,154828800}, {disk_free_limit,1000000000}, {disk_free,77275533312}, {file_descriptors, [{total_limit,924},{total_used,5},{sockets_limit,829},{sockets_used,1}]}, {processes,[{limit,1048576},{used,190}]}, {run_queue,0}, {uptime,206}] ...done.
Now that we have seen that our custom plugin rabbitmq-metronome worked on the RabbitMQ Server, let's move onto the details of this plugin and its codes.
Firstly, we should look over each code in rabbitmq-metronome with the following table:
Filename |
Description |
---|---|
|
This file simply defines the dependencies and the module properties such as its name, its version, and so on. |
|
This file presents the Erlang "application" behavior and starts and stops the plugin with the related Erlang VM. |
|
This file presents the Erlang "supervisor" behavior that monitors the worker process and restarts it if it crashes. |
|
This file is the core of the plugin. All of the work is done by this code. In metronome plugin, this code connects to the RabbitMQ Server and creates a task that will be triggered every second. |
|
This file represents the tests of the plugin. You can run the tests with the following command line:
make test
|
After talking about the overlook of the codes inside rabbitmq-metronome, we will now go into the details of the important codes, starting with rabbitmq_metronome.app.src
:
% Filename: rabbitmq_metronome.app.src {application, rabbitmq_metronome, [{description, "Embedded Rabbit Metronome"}, {vsn, "0.01"}, {modules, []}, {registered, []}, {mod, {rabbit_metronome, []}}, {env, []}, {applications, [kernel, stdlib, rabbit, amqp_client]}]}.
As we can see, this code simply defines the application name, version, its modules, environment variables, and its dependencies. Every module should have this kind of parameters to describe the module.
The following code shows the main functionality code called
rabbit_metronome_worker
. First we will look at the code, and then we'll discuss the code in detail:
%% Filename: rabbit_metronome_worker.erl %% Copyright (c) 2007-2013 GoPivotal, Inc. %% You may use this code for any purpose. -module(rabbit_metronome_worker). -behaviour(gen_server). -export([start_link/0]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -export([fire/0]). -include_lib("amqp_client/include/amqp_client.hrl"). -record(state, {channel}). -define(RKFormat, "~4.10.0B.~2.10.0B.~2.10.0B.~1.10.0B.~2.10.0B.~2.10.0B.~2.10.0B"). start_link() -> gen_server:start_link({global, ?MODULE}, ?MODULE, [], []). %--------------------------- % Gen Server Implementation % -------------------------- init([]) -> {ok, Connection} = amqp_connection:start(#amqp_params_direct{}), {ok, Channel} = amqp_connection:open_channel(Connection), amqp_channel:call(Channel, #'exchange.declare'{exchange = <<"metronome">>,type = <<"topic">>}), fire(), {ok, #state{channel = Channel}}. handle_call(_Msg, _From, State) -> {reply, unknown_command, State}. handle_cast(fire, State = #state{channel = Channel}) -> Properties = #'P_basic'{content_type = <<"text/plain">>, delivery_mode = 1}, {Date={Year,Month,Day},{Hour, Min,Sec}} = erlang:universaltime(), DayOfWeek = calendar:day_of_the_week(Date), RoutingKey = list_to_binary( io_lib:format(?RKFormat, [Year, Month, Day, DayOfWeek, Hour, Min, Sec])), Message = RoutingKey, BasicPublish = #'basic.publish'{exchange = <<"metronome">>, routing_key = RoutingKey}, Content = #amqp_msg{props = Properties, payload = Message}, amqp_channel:call(Channel, BasicPublish, Content), timer:apply_after(1000, ?MODULE, fire, []), {noreply, State}; handle_cast(_, State) -> {noreply,State}. handle_info(_Info, State) -> {noreply, State}. terminate(_, #state{channel = Channel}) -> amqp_channel:call(Channel, #'channel.close'{}), ok. code_change(_OldVsn, State, _Extra) -> {ok, State}. %--------------------------- fire() -> gen_server:cast({global, ?MODULE}, fire).
The preceding code simply performs the opening connection on initializing the server connection, and then, in every second, code sends message to the queue with routing name equals to the date and time of the message sent. If the connection to RabbitMQ Server is closed, then module's connection to the RabbitMQ Server also terminates.
Connection to the server is opened in the init()
function. The functions with their names starting with "handle" are to be called in every RabbitMQ Server invocation. Therefore, we need to implement our message sending code into these functions. Finally, we should terminate our connection from the terminate()
function.
The following code shows the supervisor code of the plugin. As said earlier, the supervisor module monitors the functionality of the worker. First we'll look at the source code, and then we'll dive into the code details:
%% Copyright (c) 2007-2013 GoPivotal, Inc. %% You may use this code for any purpose. -module(rabbit_metronome_sup). -behaviour(supervisor). -export([start_link/0, init/1]). start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, _Arg = []). init([]) -> {ok, {{one_for_one, 3, 10}, [{rabbit_metronome_worker, {rabbit_metronome_worker, start_link, []}, permanent, 10000, worker, [rabbit_metronome_worker]} ]}}.
Supervisor code just monitors the worker. Supervisor uses the start_link()
function. The following code describes starting and stopping the rabbitmq-metronome
plugin. RabbitMQ Server calls the start()
function when RabbitMQ enables the plugin, and it calls the stop()
function when RabbitMQ disables the plugin:
%% Copyright (c) 2007-2013 GoPivotal, Inc. %% You may use this code for any purpose. -module(rabbit_metronome). -behaviour(application). -export([start/2, stop/1]). start(normal, []) -> rabbit_metronome_sup:start_link(). stop(_State) -> ok.