Skip to content

Latest commit

 

History

History
436 lines (278 loc) · 7.99 KB

File metadata and controls

436 lines (278 loc) · 7.99 KB
layout slide
title ActiveMQ

ActiveMQ

Open source multi-protocol Messaging Broker.

--

Advantages


Installing

ActiveMQ requires Java 7 to run and to build.

Brew (on MacOS)

brew install apache-activemq
activemq start

Unix Binary Installation

Download latest version here

And follow up the documentation

--

Docker

https://hub.docker.com/r/rmohr/activemq/

docker pull rmohr/activemq
docker run -p 61616:61616 -p 8161:8161 rmohr/activemq

--

CLI commands

activemq start - Creates and starts a broker using a configuration file.

activemq stop - Stops a running broker

activemq restart - Restarts a running broker

--

Start ActiveMQ

activemq start

INFO: Loading '/usr/local/Cellar/activemq/5.15.9/libexec//bin/env'
INFO: Using java '/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/bin/java'
INFO: Starting - inspect logfiles specified in logging.properties and log4j.properties to get details
INFO: pidfile created : '/usr/local/Cellar/activemq/5.15.9/libexec//data/activemq.pid' (pid '61388')

--

Monitoring ActiveMQ

You can monitor ActiveMQ using the Web Console by pointing your browser at

http://localhost:8161/admin

Default credentials

login: admin
pass: admin

--


Messaging Patterns

--

Queue

Queues are the most obvious messaging pattern implemented by ActiveMQ. They provide a direct channel between a producer and a consumer. The producer creates messages, while the consumer reads one after another. After a message was read, it’s gone. If multiple consumers are registered for a queue, only one of them will get the message.

--

Topic

Topics implement an one-to-many channel between a producer and multiple consumers. Unlike an queue, every consumer will receive a message send by the producer.

--

Virtual Topics

Virtual topics combine both approaches. While the producer sends messages to a topic, consumers will receive a copy of the message on their own dedicated queue.


Protocols

AMQP, AUTO, MQTT, OpenWire, REST, RSS and Atom, Stomp, WSIF, WS Notification, XMPP

https://activemq.apache.org/protocols.html

--

REST

ActiveMQ implements a RESTful API to messaging which allows any web capable device to publish messages using a regular HTTP POST or GET.

--

Publish to Queue

curl -u admin:admin -d "body=order_id" http://localhost:8161/api/message/shop?type=queue

--

--

Publish to Topic

curl -u admin:admin -d "body=order_id" http://localhost:8161/api/message/shop?type=topic

--


Integration with Ruby

--

STOMP

The Simple Text Oriented Messaging Protocol

STOMP provides an interoperable wire format so that STOMP clients can communicate with any STOMP message broker to provide easy and widespread messaging interoperability among many languages, platforms and brokers.

--

STOMP

A ruby gem for sending and receiving messages from a Stomp protocol compliant message queue. Includes: failover logic, ssl support.

Gemfile

gem 'stomp'
bundle install

--

Initialize Connection

def config_hash
  { 
    hosts: [
      { 
        login: 'admin', 
        passcode: 'admin',
        host: '0.0.0.0',
        port: 61613,
        ssl: false 
      }
    ]
  }
end

client = Stomp::Client.new(config_hash)

--

Send a Message to Queue

client = Stomp::Client.new(config_hash)

data = { order_id: 1, command: :paid }

client.publish('/queue/user-notifications', data.to_json)

client.close

--

Receive a Message from Queue

client = Stomp::Client.new(config_hash)

Thread.new do
  client.subscribe('/queue/user-notifications') do |msg|
    begin
      msg = JSON.parse(msg.body)

      # message processing...
    rescue StandardError => e
      Raven.capture_exception(e)
    end
  end
end

--


Topics

--

Send a Message to Topic

client = Stomp::Client.new(config_hash)

data = { order_id: 1, command: :paid }

client.publish('/topic/user-notifications', data.to_json)

client.close

--

Receive a Message from Topic

client = Stomp::Client.new(config_hash)

Thread.new do
  client.subscribe('/topic/user-notifications') do |msg|
    begin
      msg = JSON.parse(msg.body)

      # message processing...
    rescue StandardError => e
      Raven.capture_exception(e)
    end
  end
end

Integration with Rails

--

ActiveMessaging

Attempt to bring the simplicity and elegance of rails development to the world of messaging.

Gemfile

gem 'activemessaging', github: 'kookster/activemessaging', branch: 'feat/rails5'
gem 'stomp'
bundle install

--

Initializing

rails g active_messaging:install
  create  app/processors/application_processor.rb
  create  script/poller
    chmod  script/poller
  create  script/threaded_poller
    chmod  script/threaded_poller
  create  lib/poller.rb
  create  config/broker.yml
  gemfile  daemons

--

Generate a listener

rails g active_messaging:processor RailsQueue
  create  app/processors/rails_queue_processor.rb
  create  config/messaging.rb
  invoke  rspec
  create    spec/functional/rails_queue_processor_spec.rb

--

Destination config

ActiveMessaging::Gateway.define do |s|
  s.destination :rails_queue, '/queue/RailsQueue'
end

--

Processor

class RailsQueueProcessor < ApplicationProcessor
  subscribes_to :rails_queue

  def on_message(message)
    logger.debug 'RailsQueueProcessor received: ' + message
  end
end

--

Run Application

script/poller run

Production

--

AmazonMQ

https://aws.amazon.com/amazon-mq

  • Managed message broker service for Apache ActiveMQ
  • Uses Apache KahaDB as its data store. Other data stores, such as JDBC and LevelDB, aren't supported.
  • Offers low latency messaging, often as low as single digit milliseconds.
  • Persistence out of the box

--

Endpoints

--

FIFO

http://activemq.apache.org/total-ordering.html

<destinationPolicy>
  <policyMap>
    <policyEntries>
      <policyEntry topic="&gt;">
        <!--
        The constantPendingMessageLimitStrategy is used to prevent
        slow topic consumers to block producers and affect other consumers
        by limiting the number of messages that are retained

        For more information, see: http://activemq.apache.org/slow-consumer-handling.html
        -->
        <dispatchPolicy>
          <strictOrderDispatchPolicy/>
        </dispatchPolicy>
      </policyEntry>
    </policyEntries>
  </policyMap>
</destinationPolicy>

The End