1.0 Introduction
In the series of IoT protocol, in this lab we will go over AMQP protocol theory and then demo using RabbitMQ an open source AMQP broker and Qpid Proton as AMQP client. AMQP stands for Advanced Messaging Queue Protocol. AMQP 1.0 is the primary protocol used in Azure IoT hub.
AMQP 1.0 and pre AMQP 1.0 are totally different protocols. AMQP 1.0 is a standard protocols while pre-AMQP 1.0 are drafts. You can download AMQP 1.0 standard from here. You can read the difference between AMQP 1.0 and pre-AMQP 1.0 here
AMQP is a publish and subscribe protocol. In a typical pub sub protocol you have a producer, a broker and a consumer. The job of producer is to generate message. Broker store the messages in a queue until consumer comes and read the messages. Pub sub protocol decouple producer from consumer, what this mean is producer doesn’t know about consumer and consumer doesn’t know about producer. Consumer can be offline while producer generates message and whenever it comes online it read messages.
AMQP is a wire protocol which means it provides rules for data encoding on the wire. Let’s start from bottom up that’s how AMQP standard organized. I will start with how data is encoded, then the concept of container/node and various endpoints and finally message transfer
2.0 AMQP Types
AMQP defines type system for encoding data. AMQP supports 4 types. In this tutorial we will go over Primitive and Described types
- Primitive types
- Described types
- Compound types
- Restricted types
Primitive types
AMQP defines Primitive types for encoding common scalar values and common collections.
- The scalar types include booleans, integral numbers, floating point numbers, timestamps, UUIDs, characters, strings, binary data, and symbols.
- The collection types include arrays and lists
An AMQP encoded data stream consists of untyped bytes with embedded constructors. The embedded constructor indicates how to interpret the untyped bytes that follow.
Let’s see how variable string and fixed data encoded using Primitive types
Variable width encoding:
The size of variable-width data is determined based on an encoded size that prefixes the data. The width of the encoded size is determined by the subcategory of the format code for the variable width value
Below example show how to encode a string “Hello Glorious Messaging World” using Primitive type. The first byte is a constructor 0xA1, which represents variable width data with one octet for data length (the size of length field is one octet). Next to constructor byte is length of data in this case 0x1E
Below is the breakdown of Constructor with 4 bits of subcategory and 4 bits of subtype.
Subcategory is defined in this table. There is fixed width subcategory to encode numbers and variable width to encode strings
Let’s see how it looks on wire. Below show Wireshark capture of variable width encoded string “Hello Glorious Messaging World”. As you can see encoded string starts with constructor 0xa1 then length filed of one octet 0x1e and then string
Fixed width encoding:
The size of fixed-width data is determined based solely on the subcategory of the format code for the fixed width value.
Below is an example fixed width encoding. In case of fixed width encoding there is no length octet after constructor. The length can be computed from subcategory. It is encoded using fixed width subcategory 0x54 (smallint) which mean one octet of data
Described types
AMQP provides a means to annotate Primitive types with descriptor to create described types. Described types is a way to create custom types
– A descriptor defines how to produce a domain specific type from an AMQP primitive value
3.0 AMQP Frame Transfer
AMQP network consists of nodes connected via links. Nodes are named entities responsible for the safe storage and/or delivery of messages. Messages can originate from, terminate at, or be relayed by nodes. A link is a unidirectional route between two nodes. A link attaches to a node at a terminus. There are two kinds of terminus: sources and targets. A terminus is responsible for tracking the state of a particular stream of incoming or outgoing messages. Sources track outgoing messages and targets track incoming messages. Messages only travel along a link if they meet the entry criteria at the source. As a message travels through an AMQP network, the responsibility for safe storage and delivery of the message is transferred between the nodes it encounters.
Container: is an application, a client app or a broker
Node: is a message producer or consumer. Within an application you can have process which produces messages and process which consume messages
Connection: is a tcp connection between two containers
Channel: A connection is divided into multiple unidirectional channels
Session: Is a logical binding of channels between two nodes. Two unidirectional channels form a bi-directional session. A connection can have multiple sessions
Link: are unidirectional between two sessions. A session can have multiple links. Within a session Links are uniquely identified by link handle. Links contains target of message delivery
Details on above entities
Connection:
A Connection may contain multiple independent Sessions, up to the negotiated channel numbers
Connection opening:
A connection opened between two containers by sending Open frame. First a TCP connection is established and header frame exchanged to negotiate AMQP version. After that each peer send Open frame to setup connection
- After establishing or accepting a TCP connection and sending the protocol header, each peer MUST send an open frame before sending any other frames. The open frame describes the capabilities and limits of that peer. The open frame can only be sent on channel 0.
- After sending the open frame and reading its partner’s open frame a peer MUST operate within mutually acceptable limitations from this point forward
Connection idle timeout
Connections are subject to idle timeout threshold. Timeout is triggered by local peer if no AMQP frames are received after threshold exceeded.The idle timeout is measured in milliseconds, and starts from the time the last frame is received. If the threshold is exceeded, then a peer SHOULD try to gracefully close the connection using a close frame with an error explaining why. If the remote peer does not respond gracefully within a threshold to this, then the peer MAY close the TCP socket
At connection open each peer communicates the maximum period between activity (frames) on the connection that it desires from its partner
Session:
An AMQP session binds together two unidirectional channels to form a bidirectional, sequential conversation between two containers. Session is created using Begin performative and deleted using End performative
Establishing a session:
- A Session is established by creating a session endpoint and assigning it an un-used channel number. Sending begin and announcing association of session endpoint with outgoing channel number
- Upon receiving the begin the partner will check the remote-channel field and find it empty. This indicates that the begin is referring to remotely initiated session
- The partner will therefore allocate an unused outgoing channel for the remotely initiated session and indicate this by sending its own begin setting the remote-channel field to the incoming channel of the remotely initiated session
Link:
Within a Session a Link is a unidirectional route between a source and a target node, one at each end of the Connection. Links are the paths over which messages are transferred.
Links are unidirectional. Pairs of links are bound to create full duplex communication channels between endpoints. A single Session may be associated with any number of Links.
Links provide a credit-based flow control scheme based on the number of messages transmitted, allowing applications to control which nodes to receive messages from at a given point (e.g., to explicitly fetch a message from a given queue).
Link contains source and target names these are the names of queues or topics in the partner
Links are created using performative attached and destroyed using detach
A link attaches to a node at a terminus. There are two kinds of terminus: sources and targets. A terminus is responsible for tracking the state of a particular stream of incoming or outgoing messages
4.0 AMQP Frame
Frames are divided into three distinct areas: a fixed width frame header, a variable width extended header, and a variable width frame body
frame header
The frame header is a fixed size (8 byte) structure that precedes each frame. The frame header includes mandatory information necessary to parse the rest of the frame including size and type information.
extended header
The extended header is a variable width area preceding the frame body. This is an extension point defined for future expansion. The treatment of this area depends on the frame type.
frame body
The frame body is a variable width sequence of bytes the format of which depends on the frame type.
5.0 Performative
6.0 QOS
AMQP provides QOS using disposition and transfer performative. Three QOS types are supported which is very similar to MQTT
- At-most once
If the sending application settles the delivery before sending it, this results in an at-most-once guarantee. The sender has indicated up front with his initial transmission that he has forgotten everything about this delivery and will therefore make no further attempts to send it. This is fire and forget approach.
If this delivery makes it to the receiver, the receiver clearly has no obligation to respond with updates of the receiver’s delivery state, as they would be meaningless and ignored by the sender
If message lost it will not be delivered again. It is a best effort service
2. At-least once
Sender sends delivery with settled = false and only settle when settled = true disposition received from the receiver. Sender sends transfer message and keeps it’s delivery tag in unsettled map
Receiver settle immediately upon processing the message. Receiver sends disposition with settled = true
Sender receives disposition with settled = true and removes delivery tag from unsettled map. At this point the delivery is considered settled and forgotten.
If transfer message is lost, sender will send the message again after ttl expiry
If the disposition frame is lost, sender will send the message again after ttl expiry
Receiver can get duplicate message. It is a two step process
3. Exactly once
In this method receiver receives message only once, duplicate message at receiver is not allowed, AMQP achieve this by having acknowledge from both Sender and Receiver. Sender sends transfer message with settled = false and keeps delivery tag in unsettled map
Receiver sends disposition with settled = false
sender sends disposition back with settled=true and remove delivery tag from unsettled map.
Upon receiving the settled = true from sender receiver also settled and removed tag from unsettled map
If transfer message is lost, sender will send the message again after ttl expiry
If the disposition frame from receiver is lost, it will send is again after ttl expiry. If receiver doesn’t receive settled = true for delivery it will send disposition with settled = false again
Receiver will get only one message.
7.0 Message Transfer
Messaging frame follow AMQP transfer frame.
An annotated message consists of the bare message plus sections for annotation at the head and tail of the bare message. There are two classes of annotations: annotations that travel with the message indefinitely, and annotations that are consumed by the next node
The bare message consists of three sections: standard properties, application-properties, and application-data (the body). Application-properties is where you can add your properties
The bare message is immutable within the AMQP network. That is, none of the sections can be changed by any node acting as an AMQP intermediary
Altogether a message consists of the following sections:
- Zero or one header.
- Zero or one delivery-annotations.
- Zero or one message-annotations.
- Zero or one properties.
- Zero or one application-properties.
- The body consists of one of the following three choices: one or more data sections, one or more amqp-sequence sections, or a single amqp-value section.
- Zero or one footer.
Header
The header section carries standard delivery details about the transfer of a message through the AMQP network. If the header section is omitted the receiver MUST assume the appropriate default values (or the meaning implied by no value being set) for the fields within the header unless other target or node specific defaults have otherwise been set
Delivery Annotations
The delivery-annotations section is used for delivery-specific non-standard properties at the head of the message. Delivery annotations convey information from the sending peer to the receiving peer. If the recipient does not understand the annotation it cannot be acted upon and its effects (such as any implied propagation) cannot be acted upon. Annotations might be specific to one implementation, or common to multiple implementations
If the delivery-annotations section is omitted, it is equivalent to a delivery-annotations section containing an empty map of annotations
Message Annotations
The message-annotations section is used for properties of the message which are aimed at the infrastructure and SHOULD be propagated across every delivery step. Message annotations convey information about the message
Message-properties
Immutable properties of the message. The properties section is used for a defined set of standard properties of the message. The properties section is part of the bare message; therefore, if retransmitted by an intermediary, it MUST remain unaltered
Application-properties
The application-properties section is a part of the bare message used for structured application data. Intermediaries can use the data within this structure for the purposes of filtering or routing. The keys of this map are restricted to be of type string (which excludes the possibility of a null key) and the values are restricted to be of simple types only, that is, excluding map, list, and array types
8.0 Demo
For the demo we will use RabbitMQ as broker and Qpid Proton Python as client
- Follow this link to install RabbitMQ. Note: I am using Ubuntu 16.04 LTS
https://tecadmin.net/install-rabbitmq-server-on-ubuntu/
- Enable AMQP 1.0 plugin
$sudo rabbitmq-plugins list $sudo rabbitmq-plugins enable rabbitmq_amqp1_0
- Check rabbitmq server status
$sudo service rabbitmq-server status
- Install Python Qpid Proton package
$sudo apt-get update $sudo apt-get install python-qpid-proton
Demo setup
Demo is setup on Ubuntu 16.04 VM.
Two programs created using Qpid Proton library, 1) producer 2) consumer. Producer will send 100 messages to RabbitMQ broker and consumer will retrieve messages from broker. The destination queue in broker is /examples
Launch web browser with localhost:15672 to open rabbitMQ webUI , login/password: guest/guest, click on Queues and example queue. Here you can see message count increase when broker receive messages from producer
Below Producer and Consumer programs
producer.py. This program will send 100 messages with application properties and wait for confirmation from broker
from __future__ import print_function import optparse from proton import Message from proton.handlers import MessagingHandler from proton.reactor import Container class Send(MessagingHandler): def __init__(self, url, messages): super(Send, self).__init__() self.url = url self.sent = 0 self.confirmed = 0 self.total = messages def on_start(self, event): event.container.create_sender(self.url) def on_sendable(self, event): while event.sender.credit and self.sent < self.total: msg = Message(id=(self.sent+1), body={'sequence':(self.sent+1)}, properties ={'Divine': 'Life'}) event.sender.send(msg) self.sent += 1 def on_accepted(self, event): self.confirmed += 1 if self.confirmed == self.total: print("all messages confirmed") event.connection.close() def on_disconnected(self, event): self.sent = self.confirmed parser = optparse.OptionParser(usage="usage: %prog [options]", description="Send messages to the supplied address.") parser.add_option("-a", "--address", default="localhost:5672/examples", help="address to which messages are sent (default %default)") parser.add_option("-m", "--messages", type="int", default=100, help="number of messages to send (default %default)") opts, args = parser.parse_args() try: Container(Send(opts.address, opts.messages)).run() except KeyboardInterrupt: pass
consumer.py. This program will retrieve messages from broker
from __future__ import print_function import optparse from proton.handlers import MessagingHandler from proton.reactor import Container class Recv(MessagingHandler): def __init__(self, url, count): super(Recv, self).__init__() self.url = url self.expected = count self.received = 0 def on_start(self, event): event.container.create_receiver(self.url) def on_message(self, event): if event.message.id and event.message.id < self.received: # ignore duplicate message return if self.expected == 0 or self.received < self.expected: print(event.message.body) print(event.message.properties) self.received += 1 if self.received == self.expected: event.receiver.close() event.connection.close() parser = optparse.OptionParser(usage="usage: %prog [options]") parser.add_option("-a", "--address", default="localhost:5672/examples", help="address from which messages are received (default %default)") parser.add_option("-m", "--messages", type="int", default=100, help="number of messages to receive; 0 receives indefinitely (default %default)") opts, args = parser.parse_args() try: Container(Recv(opts.address, opts.messages)).run() except KeyboardInterrupt: pass
Open a terminal and execute producer.py
$python producer.py
Open a browser and point to url localhost:15672. Click on queues and examples . You should see 100 messages in examples queue
Open another terminal and execute consumer.py program. You should see all 100 messages printed on screen and queue in rabbitmq server become zero
$python consumer.py