Wednesday, June 29, 2016

Connecting CloverETL and Amazon SQS

In a recent blog on CloverETL's blog called Building own components in CloverETL, we’ve seen how to build your own component than to have some sort of production-ready solution. In this article, I’d like to introduce a plugin based on things covered there – more specifically, connector to Amazon SQS – AWS’s implementation of queuing service. Unfortunately, SQS does not come with native JMS interface (although adapter is available), so we cannot connect to it using standard CloverETL JMS connector – I tried, but failed pretty bad since this adapter requires some specific configuration which can be done on Java code level only and not even supports JNDI. If you’d find any way how to use SQS via JNDI, please share your findings in the comments below – would be very helpful not only for me but for other people as well.
Good news for users of version 4.2.0 and above – Amazon SDK (required for this connector to work) is bundled with the product itself so you don’t need to download it separately, others unfortunately needs to download this SDK from Amazon’s website along with some of its dependencies (all of those are provided in an attached project).

Introducing Amazon SQS connector family

Figure 1: Amazon SQS plugin components
There are two components:
  1. AmazonSqsReader – taps into the interface and retrieves defined number of messages/all pending messages
    • Component provides two types of metadata: messages (first output port) and message attributes (second optional output port)
  2. AmazonSqsWriter – pushes new messages into the queue
    • First input port accepts any metadata, but requires Body attribute option to be set – contents of the field will be passed as actual message;
    • If second input port is used; both ports needs to be sorted by Join key which becomes a mandatory attribute, same thing applies to Attribute mapping
    • Second input port is used for message parameters – every message may contain up to 10 (Amazon SQS limit) of parameters

Plugin installation

Introducing this plugin into your project is fairly simple:
  1. Take attached zip file and decompress it to your project’s root directory
  2. Open workspace.prm file and add new parameter PLUGINS_DIR with value ${PROJECT}/plugins
  3. (Version 4.2.0 and newer) Add <classpathentry kind="lib" path="plugins/amazonsqs/lib/cloveretl-amazonsqs-connector-0.2a.jar"/> into your .classpath file
  4. (Version 4.1.x) Add all libraries from plugins/amazonsqs/lib directory (covered in previous article)
  5. Set master password for your ETL Runtime in Window → Preferences (or in Configuration section of your CloverETL server)
Note this plugin won’t work in any version prior 4.1.0!
Figure 2: Setting up master password in CloverETL Developer

AmazonSqsReader

Does as its name suggest – connects to SQS interface to gets messages waiting in a queue. It is able to delete the messages upon read.

Shared configuration options between both components:

  • Queue name – name of a queue to tap into (i.e. MyCloverQueue),
  • Queue endpoint – URL of region where queue resides (i.e. https://sqs.us-east-1.amazonaws.com),
  • AWS Access key – access for an account which is supposed to be used to utilize this interface (i.e. ZQRTP1NM0ZDWKEYRVC3U),
  • AWS Secret key – key for credentials provided in access key field.

Configuration specific to reader:

  • Message threshold – maximum amount of messages which will be read from an interface
  • Remove read messages – whether or not messages read from the queue should be removed upon successful read attempt

Output port data:

Reader uses metadata propagation to push pre-defined metadata out of the component – it is not possible to change them.
  • Out0 (mandatory): message output
  • Out1 (optional): message attribute output
    • Field messageId corresponds to id of message output – those are IDs assigned by SQS
This component works in a batch mode, which means that it tries to read as many messages as possible through the interface until threshold is reached or all messages are read.

AmazonSqsWriter

Also does as name would suggest – flushes data stream into SQS queue. It supports both message and message attribute data stream; method of linking those two data streams is very similar to algorithm used in ExtMergeJoin component – this means that if optional second input is connected to the component, both data streams needs to be sorted!

Configuration specific to writer:

  • Body attribute – field from input port 0 containing message body (default field name is “body”)
  • Join key – (Mandatory when input port 1 is connected) key used to link message with its attributes
  • Attribute mapping – (Optional when input port 1 is connected) maps input metadata to SQS message attribute object
This component is NOT running in batch mode – every message is sent separately which may slow down processing.

Conclusion

Plugin introduced here was created to satisfy needs I had during project’s development, so it may be not optimized for use in every situation. Making writer batch mode-ready is my next to-do improvement, this I do plan to implement over the summer if beer and hikes won’t keep me too busy :)

Download plugin version 0.2a

No comments:

Post a Comment