Lab-53: AMQP 1.0 protocol

1.0 Introduction

In the series of IoT protocol, in this lab we will go over AMQP protocol theory and then demo using RabbitMQ an open source AMQP broker and Qpid Proton as AMQP client. AMQP stands for Advanced Messaging Queue Protocol. AMQP 1.0 is the primary protocol used in Azure IoT hub.

AMQP 1.0 and pre AMQP 1.0 are totally different protocols. AMQP 1.0 is a standard protocols while pre-AMQP 1.0 are drafts. You can download AMQP 1.0 standard from here. You can read the difference between AMQP 1.0 and pre-AMQP 1.0 here

AMQP is a publish and subscribe protocol. In a typical pub sub protocol you have a producer, a broker and a consumer. The job of producer is to generate message. Broker store the messages in a queue until consumer comes and read the messages. Pub sub protocol decouple producer from consumer, what this mean is producer doesn’t know about consumer and consumer doesn’t know about producer. Consumer can be offline while producer generates message and whenever it comes online it read messages.

AMQP is a wire protocol which means it provides rules for data encoding on the wire. Let’s start from bottom up that’s how AMQP standard organized. I will start with how data is encoded, then the concept of container/node and various endpoints and  finally message transfer

2.0 AMQP Types

AMQP defines type system for encoding data. AMQP supports 4 types. In this tutorial we will go over Primitive and Described types

  1. Primitive types
  2. Described types
  3. Compound types
  4. Restricted types

Primitive types

AMQP defines Primitive types for encoding common scalar values and common collections.

  • The scalar types include booleans, integral numbers, floating point numbers, timestamps, UUIDs, characters, strings, binary data, and symbols.
  • The collection types include arrays and lists

An AMQP encoded data stream consists of untyped bytes with embedded constructors. The embedded constructor indicates how to interpret the untyped bytes that follow.

Let’s see how variable string and fixed data encoded using Primitive types

Variable width encoding:

The size of variable-width data is determined based on an encoded size that prefixes the data. The width of the encoded size is determined by the subcategory of the format code for the variable width value

Below example show how to encode a string “Hello Glorious Messaging World” using Primitive type. The first byte is a constructor 0xA1, which represents variable width data with one octet for data length (the size of length field is one octet). Next to constructor byte is length of data in this case 0x1E

amqp_1

Below is the breakdown of Constructor with 4 bits of subcategory and 4 bits of subtype.

amqp_2

Subcategory is defined in this table. There is fixed width subcategory to encode numbers and variable width to encode strings

amqp_4

Let’s see how it looks on wire. Below show Wireshark capture of variable width encoded string “Hello Glorious Messaging World”. As you can see encoded string starts with constructor 0xa1 then length filed of one octet 0x1e and then string

amqp_3

Fixed width encoding:

The size of fixed-width data is determined based solely on the subcategory of the format code for the fixed width value.

Below is an example fixed width encoding. In case of fixed width encoding there is no length octet after constructor. The length can be computed from subcategory. It is encoded using fixed width subcategory 0x54 (smallint) which mean one octet of data

amqp_5

Described types

AMQP provides a means to annotate Primitive types with descriptor to create described types. Described types is a way to create custom types

– A descriptor defines how to produce a domain specific type from an AMQP primitive value

amqp_6

 

amqp_7

 

3.0 AMQP Frame Transfer

AMQP network consists of nodes connected via links. Nodes are named entities responsible for the safe storage and/or delivery of messages. Messages can originate from, terminate at, or be relayed by nodes. A link is a unidirectional route between two nodes. A link attaches to a node at a terminus. There are two kinds of terminus: sources and targets. A terminus is responsible for tracking the state of a particular stream of incoming or outgoing messages. Sources track outgoing messages and targets track incoming messages. Messages only travel along a link if they meet the entry criteria at the source. As a message travels through an AMQP network, the responsibility for safe storage and delivery of the message is transferred between the nodes it encounters.

Container: is an application, a client app or a broker

Node: is a message producer or consumer. Within an application you can have process which produces messages and process which consume messages

Connection: is a tcp connection between two containers

Channel: A connection is divided into multiple unidirectional channels

Session: Is a logical binding of channels between two nodes. Two unidirectional channels form a bi-directional session. A connection can have multiple sessions

Link: are unidirectional  between two sessions. A session can have multiple links. Within a session Links are uniquely identified by link handle. Links contains target of message delivery

amqp_8

Details on above entities

Connection:

A Connection may contain multiple independent Sessions, up to the negotiated channel numbers

Connection opening:

A connection opened between two containers by sending Open frame. First a TCP connection is established and header frame exchanged to negotiate AMQP version. After that each peer send Open frame to setup connection

  • After establishing or accepting a TCP connection and sending the protocol header, each peer MUST send an open frame before sending any other frames. The open frame describes the capabilities and limits of that peer. The open frame can only be sent on channel 0.
  • After sending the open frame and reading its partner’s open frame a peer MUST operate within mutually acceptable limitations from this point forward

amqp_16

Connection idle timeout

Connections are subject to idle timeout threshold. Timeout is triggered by local peer if no AMQP frames are received after threshold exceeded.The idle timeout is measured in milliseconds, and starts from the time the last frame is received. If the threshold is exceeded, then a peer SHOULD try to gracefully close the connection using a close frame with an error explaining why. If the remote peer does not respond gracefully within a threshold to this, then the peer MAY close the TCP socket

At connection open each peer communicates the maximum period between activity (frames) on the connection that it desires from its partner

amqp_22

Session:

An AMQP  session binds together two unidirectional channels to form a bidirectional, sequential conversation between two containers. Session is created using Begin performative and deleted using End performative

Establishing a session:

  • A Session is established by creating a session endpoint and assigning it an un-used channel number. Sending begin and announcing association of session endpoint with outgoing channel number
  • Upon receiving the begin the partner will check the remote-channel field and find it empty. This indicates that the begin is referring to remotely initiated session
  • The partner will therefore allocate an unused outgoing channel for the remotely initiated session and indicate this by sending its own begin setting the remote-channel field to the incoming channel of the remotely initiated session

amqp_17

Link:

Within a Session a Link is a unidirectional route between a source and a target node, one at each end of the Connection. Links are the paths over which messages are transferred.

Links are unidirectional. Pairs of links are bound to create full duplex communication channels between endpoints. A single Session may be associated with any number of Links.

Links provide a credit-based flow control scheme based on the number of messages transmitted, allowing applications to control which nodes to receive messages from at a given point (e.g., to explicitly fetch a message from a given queue).

Link contains source and target names these are the names of queues or topics in the partner

Links are created using performative attached and destroyed using detach

A link attaches to a node at a terminus. There are two kinds of terminus: sources and targets. A terminus is responsible for tracking the state of a particular stream of incoming or outgoing messages

4.0 AMQP Frame

Frames are divided into three distinct areas: a fixed width frame header, a variable width extended header, and a variable width frame body

frame header

The frame header is a fixed size (8 byte) structure that precedes each frame. The frame header includes mandatory information necessary to parse the rest of the frame including size and type information.

extended header

The extended header is a variable width area preceding the frame body. This is an extension point defined for future expansion. The treatment of this area depends on the frame type.

frame body

The frame body is a variable width sequence of bytes the format of which depends on the frame type.

amqp_18

amqp_19

5.0 Performative

amqp_15

 

 

6.0 QOS

AMQP provides QOS using disposition and transfer performative. Three QOS types are supported which is very similar to MQTT

  1. At-most once

If the sending application settles the delivery before sending it, this results in an at-most-once guarantee. The sender has indicated up front with his initial transmission that he has forgotten everything about this delivery and will therefore make no further attempts to send it. This is fire and forget approach.

If this delivery makes it to the receiver, the receiver clearly has no obligation to respond with updates of the receiver’s delivery state, as they would be meaningless and ignored by the sender

If message lost it will not be delivered again. It is a best effort service

amqp_12

2. At-least once

Sender sends delivery with settled = false and only settle when settled = true disposition received from the receiver. Sender sends transfer message and keeps it’s delivery tag in unsettled map

Receiver settle immediately upon processing the message. Receiver sends disposition with settled = true

Sender receives disposition with settled = true and removes delivery tag from unsettled map. At this point the delivery is considered settled and forgotten.

If transfer message is lost, sender will send the message again after ttl expiry

If the disposition frame is lost, sender will send the message again after ttl expiry

Receiver can get duplicate message. It is a two step process

amqp_13

3. Exactly once

In this method receiver receives message only once, duplicate message at receiver is not allowed, AMQP achieve this by having acknowledge from both Sender and Receiver. Sender sends transfer message with settled = false and keeps delivery tag in unsettled map

Receiver sends disposition with settled = false

sender sends disposition back with settled=true and remove delivery tag from unsettled map.

Upon receiving the settled = true from sender receiver also settled and removed tag from unsettled map

If transfer message is lost, sender will send the message again after ttl expiry

If the disposition frame from receiver  is lost, it will send is again after ttl expiry. If receiver doesn’t receive settled = true for delivery it will send disposition with settled = false again

Receiver will get only one message.

amqp_14

 

7.0 Message Transfer

Messaging frame follow AMQP transfer frame.

An annotated message consists of the bare message plus sections for annotation at the head and tail of the bare message. There are two classes of annotations: annotations that travel with the message indefinitely, and annotations that are consumed by the next node

The bare message consists of three sections: standard properties, application-properties, and application-data (the body). Application-properties is where you can add your properties

The bare message is immutable within the AMQP network. That is, none of the sections can be changed by any node acting as an AMQP intermediary

amqp_20

Altogether a message consists of the following sections:

  • Zero or one header.
  • Zero or one delivery-annotations.
  • Zero or one message-annotations.
  • Zero or one properties.
  • Zero or one application-properties.
  • The body consists of one of the following three choices: one or more data sections, one or more amqp-sequence sections, or a single amqp-value section.
  • Zero or one footer.

Header

The header section carries standard delivery details about the transfer of a message through the AMQP network. If the header section is omitted the receiver MUST assume the appropriate default values (or the meaning implied by no value being set) for the fields within the header unless other target or node specific defaults have otherwise been set

Delivery Annotations

The delivery-annotations section is used for delivery-specific non-standard properties at the head of the message. Delivery annotations convey information from the sending peer to the receiving peer. If the recipient does not understand the annotation it cannot be acted upon and its effects (such as any implied propagation) cannot be acted upon. Annotations might be specific to one implementation, or common to multiple implementations

If the delivery-annotations section is omitted, it is equivalent to a delivery-annotations section containing an empty map of annotations

Message Annotations

The message-annotations section is used for properties of the message which are aimed at the infrastructure and SHOULD be propagated across every delivery step. Message annotations convey information about the message

Message-properties

Immutable properties of the message. The properties section is used for a defined set of standard properties of the message. The properties section is part of the bare message; therefore, if retransmitted by an intermediary, it MUST remain unaltered

Application-properties

The application-properties section is a part of the bare message used for structured application data. Intermediaries can use the data within this structure for the purposes of filtering or routing. The keys of this map are restricted to be of type string (which excludes the possibility of a null key) and the values are restricted to be of simple types only, that is, excluding map, list, and array types

amqp_21

 

8.0 Demo

For the demo we will use RabbitMQ as broker and Qpid Proton Python as client

  • Follow this link to install RabbitMQ. Note: I am using Ubuntu 16.04 LTS

https://tecadmin.net/install-rabbitmq-server-on-ubuntu/

  • Enable AMQP 1.0 plugin
$sudo rabbitmq-plugins list
$sudo rabbitmq-plugins enable rabbitmq_amqp1_0
  • Check rabbitmq server status
$sudo service rabbitmq-server status
  • Install Python Qpid Proton package
$sudo apt-get update
$sudo apt-get install python-qpid-proton

 

Demo setup

Demo is setup on Ubuntu 16.04 VM.

amqp_11

Two programs created using Qpid Proton library, 1) producer 2) consumer. Producer will send  100 messages to RabbitMQ broker and consumer will retrieve messages from broker.  The destination queue in broker is /examples

Launch web browser with localhost:15672 to open rabbitMQ webUI , login/password: guest/guest, click on Queues and example queue. Here you can see message count increase when broker receive messages from producer

Below Producer and Consumer programs

producer.py. This program will send 100 messages with application properties and wait for confirmation from broker

from __future__ import print_function
import optparse
from proton import Message
from proton.handlers import MessagingHandler
from proton.reactor import Container

class Send(MessagingHandler):
    def __init__(self, url, messages):
        super(Send, self).__init__()
        self.url = url
        self.sent = 0
        self.confirmed = 0
        self.total = messages

    def on_start(self, event):
        event.container.create_sender(self.url)

    def on_sendable(self, event):
        while event.sender.credit and self.sent < self.total:
            msg = Message(id=(self.sent+1), body={'sequence':(self.sent+1)}, properties ={'Divine': 'Life'})
            event.sender.send(msg)
            self.sent += 1

    def on_accepted(self, event):
        self.confirmed += 1
        if self.confirmed == self.total:
            print("all messages confirmed")
            event.connection.close()

    def on_disconnected(self, event):
        self.sent = self.confirmed

parser = optparse.OptionParser(usage="usage: %prog [options]",
                               description="Send messages to the supplied address.")
parser.add_option("-a", "--address", default="localhost:5672/examples",
                  help="address to which messages are sent (default %default)")
parser.add_option("-m", "--messages", type="int", default=100,
                  help="number of messages to send (default %default)")
opts, args = parser.parse_args()

try:
    Container(Send(opts.address, opts.messages)).run()
except KeyboardInterrupt: pass

consumer.py. This program will retrieve messages from broker

from __future__ import print_function
import optparse
from proton.handlers import MessagingHandler
from proton.reactor import Container

class Recv(MessagingHandler):
    def __init__(self, url, count):
        super(Recv, self).__init__()
        self.url = url
        self.expected = count
        self.received = 0

    def on_start(self, event):
        event.container.create_receiver(self.url)

    def on_message(self, event):
        if event.message.id and event.message.id < self.received:
            # ignore duplicate message
            return
        if self.expected == 0 or self.received < self.expected:
            print(event.message.body)
            print(event.message.properties)
            self.received += 1
            if self.received == self.expected:
                event.receiver.close()
                event.connection.close()

parser = optparse.OptionParser(usage="usage: %prog [options]")
parser.add_option("-a", "--address", default="localhost:5672/examples",
                  help="address from which messages are received (default %default)")
parser.add_option("-m", "--messages", type="int", default=100,
                  help="number of messages to receive; 0 receives indefinitely (default %default)")
opts, args = parser.parse_args()

try:
    Container(Recv(opts.address, opts.messages)).run()
except KeyboardInterrupt: pass

Open a terminal and execute producer.py

$python producer.py

Open a browser and point to url localhost:15672. Click on queues and examples . You should see 100 messages in examples queue

amqp_24

Open another terminal and execute consumer.py program. You should see all 100 messages printed on screen and queue in rabbitmq server become zero

$python consumer.py

 

Leave a comment