event_rabbitmq Module

Razvan Crainea

OpenSIPS Solutions

Edited by

Razvan Crainea


Table of Contents

1. Admin Guide
1.1. Overview
1.2. RabbitMQ events syntax
1.3. RabbitMQ socket syntax
1.4. Dependencies
1.4.1. OpenSIPS Modules
1.4.2. External Libraries or Applications
1.5. Exported Parameters
1.5.1. heartbeat (integer)
1.6. Exported Functions
1.7. Example
1.8. Installation and Running
1.8.1. OpenSIPS config file
2. Frequently Asked Questions

List of Examples

1.1. Set heartbeat parameter
1.2. E_PIKE_BLOCKED event
1.3. RabbitMQ socket
1.4. OpenSIPS config script - sample event_rabbitmq usage
2.1. Event subscription
2.2. Event subscription

Chapter 1. Admin Guide

1.1. Overview

RabbitMQ (http://www.rabbitmq.com/) is an open source messaging server. It's purpose is to manage received messages in queues, taking advantage of the flexible AMQP protocol.

This module provides the implementation of a RabbitMQ client for the Event Interface. It is used to send AMQP messages to a RabbitMQ server each time the Event Interface triggers an event subscribed for.

The AMQP protocol is only used as the transport layer for notifications. The content of a message is presented in the next section.

1.2. RabbitMQ events syntax

The raised events will follow the following grammar:

  • event = event_name (argument '\n')*

  • event_name = non-quoted_string'\n'

  • argument = ((arg_name '::')? arg_value)? | (arg_value)

  • arg_name = not-quoted_string

  • arg_value = not-quoted_string | '"' string '"'

  • not-quoted_string = string - {',",\n,\r}

The event name can contain any non-quoted string character, but it is recommended to follow the syntax: E_MODULE_NAME_EXTRA_NAME

1.3. RabbitMQ socket syntax

'rabbitmq:' [user[':'password] '@' host [':' port] '/' [exchange '?'] routing_key

Meanings:

  • 'rabbitmq:' - informs the Event Interface that the events sent to this subscriber should be handled by the event_rabbitmq module.

  • user - username used for RabbitMQ server authentication. The default value is 'guest'.

  • password - password used for RabbitMQ server authentication. The default value is 'guest'.

  • host - host name of the RabbitMQ server.

  • port - port of the RabbitMQ server. The default value is '5672'.

  • exchange - exchange of the RabbitMQ server. The default value is ''.

  • routing_key - this is the routing key used by the AMQP protocol and it is used to identify the queue where the event should be sent.

    NOTE: if the queue does not exist, this module will not try to create it.

1.4. Dependencies

1.4.1. OpenSIPS Modules

The following modules must be loaded before this module:

  • No dependencies on other OpenSIPS modules.

1.4.2. External Libraries or Applications

The following libraries or applications must be installed before running OpenSIPS with this module loaded:

  • librabbitmq-dev

1.5. Exported Parameters

1.5.1. heartbeat (integer)

Enables heartbeat support for the AMQP communication. If the client does not receive a heartbeat from server within the specified interval, the socket is automatically closed by the rabbitmq-client. This prevents OpenSIPS from blocking while waiting for a response from a dead rabbitmq-server. The value represents the heartbit interval in seconds.

Default value is 0 (disabled).

Example 1.1. Set heartbeat parameter

...
modparam("event_rabbitmq", "heartbeat", 3)
...

1.6. Exported Functions

No function exported to be used from configuration file.

1.7. Example

This is an example of an event raised by the pike module when it decides an ip should be blocked:

Example 1.2. E_PIKE_BLOCKED event


	E_PIKE_BLOCKED
	ip::192.168.2.11


Example 1.3. RabbitMQ socket


	rabbitmq:guest:guest@127.0.0.1:5672/pike

	# same socket can be written as
	rabbitmq:127.0.0.1/pike


1.8. Installation and Running

1.8.1. OpenSIPS config file

This configuration file presents the usage of the event_rabbitmq module. In this scenario, a message is sent to a RabbitMQ server everytime OpenSIPS receives a MESSAGE request. The parameters passed to the server are the R-URI username and the message body.

Example 1.4. OpenSIPS config script - sample event_rabbitmq usage

...
loadmodule "signaling.so"
loadmodule "sl.so"
loadmodule "tm.so"
loadmodule "rr.so"
loadmodule "maxfwd.so"
loadmodule "usrloc.so"
loadmodule "registrar.so"
loadmodule "textops.so"
loadmodule "uri.so"
loadmodule "acc.so"
loadmodule "event_rabbitmq.so"

startup_route {
	if (!subscribe_event("E_SIP_MESSAGE", "rabbitmq:127.0.0.1/sipmsg")) {
		xlog("L_ERR","cannot the RabbitMQ server to the E_SIP_MESSAGE event\n");
	}
}

route{

	if (!mf_process_maxfwd_header("10")) {
		sl_send_reply("483","Too Many Hops");
		exit;
	}

	if (has_totag()) {
		if (loose_route()) {
			if (is_method("INVITE")) {
				record_route();
			}
			route(1);
		} else {
			if ( is_method("ACK") ) {
				if ( t_check_trans() ) {
					t_relay();
					exit;
				} else {
					exit;
				}
			}
			sl_send_reply("404","Not here");
		}
		exit;
	}

	if (is_method("CANCEL"))
	{
		if (t_check_trans())
			t_relay();
		exit;
	}

	t_check_trans();

	if (loose_route()) {
		xlog("L_ERR",
		"Attempt to route with preloaded Route's [$fu/$tu/$ru/$ci]");
		if (!is_method("ACK"))
			sl_send_reply("403","Preload Route denied");
		exit;
	}

	if (!is_method("REGISTER|MESSAGE"))
		record_route();

	if (!uri==myself)
	{
		append_hf("P-hint: outbound\r\n"); 
		route(1);
	}

	if (is_method("PUBLISH"))
	{
		sl_send_reply("503", "Service Unavailable");
		exit;
	}
	

	if (is_method("REGISTER"))
	{
		if (!save("location"))
			sl_reply_error();

		exit;
	}

	if ($rU==NULL) {
		sl_send_reply("484","Address Incomplete");
		exit;
	}

	if (is_method("MESSAGE")) {
		$avp(attrs) = "user";
		$avp(vals) = $rU;
		$avp(attrs) = "msg";
		$avp(vals) = $rb;
		if (!raise_event("E_SIP_MESSAGE", $avp(attrs), $avp(vals)))
			xlog("L_ERR", "cannot raise E_SIP_MESSAGE event\n");
	}

	if (!lookup("location","m")) {
		switch ($retcode) {
			case -1:
			case -3:
				t_newtran();
				t_reply("404", "Not Found");
				exit;
			case -2:
				sl_send_reply("405", "Method Not Allowed");
				exit;
		}
	}

	route(1);
}


route[1] {
	if (is_method("INVITE")) {
		t_on_failure("1");
	}

	if (!t_relay()) {
		sl_reply_error();
	};
	exit;
}


failure_route[1] {
	if (t_was_cancelled()) {
		exit;
	}
}

...

Chapter 2. Frequently Asked Questions

2.1.

What is the maximum lenght of a AMQP message?

The maximum length of a datagram event is 16384 bytes.

2.2.

Where can I find more about OpenSIPS?

Take a look at http://www.opensips.org/.

2.3.

What is the vhost used by the AMQP server?

Currently, the only vhost supported is '/'.

2.4.

How can I set a vhost in the socket?

This version doesn't support a different vhost.

2.5.

How can I send an event to my RabbitMQ server?

This module acts as a transport module for the OpenSIPS Event Interface. Therefore, this module should follow the Event Interface behavior:

The first step is to subscribe the RabbitMQ server to the OpenSIPS Event Interface. This can be done using the subscribe_event core function:

Example 2.1. Event subscription

startup_route {
	subscribe_event("E_RABBITMQ_EVENT", "rabbitmq:127.0.0.1/queue");
}
		

The next step is to raise the event from the script, using the raise_event core function:

Example 2.2. Event subscription

route {
	...
	/* decided that an event should be raised */
	raise_event("E_RABBITMQ_EVENT");
	...
}
		

NOTE that the event used above is only to exemplify the usage from the script. Any event published through the OpenSIPS Event Interface can be raised using this module.

2.6.

Where can I find more information about RabbitMQ?

You can find more information about RabbitMQ on their official website ( http://www.rabbitmq.com/).

2.7.

Where can I post a question about this module?

First at all check if your question was already answered on one of our mailing lists:

E-mails regarding any stable OpenSIPS release should be sent to and e-mails regarding development versions should be sent to .

If you want to keep the mail private, send it to .

2.8.

How can I report a bug?

Please follow the guidelines provided at: https://github.com/OpenSIPS/opensips/issues.