Recently I started developing some custom Flux plugins I wanted to share with Flux users. It is always fun to develop custom plugins and use them within a Flux workflow. Our users are more used to the concept of custom actions and triggers. I believe plugin is a more generic and widely used terminology these days. Flux 7.10 ships over 50 built-in plugins (actions/triggers) which covers most of the common customer requirements (ex: TimerTrigger, ProcessAction, MailAction, Split, Join, etc and a slew of File based triggers and actions). Writing your own is a child’s play as Flux uses plain-old JavaBean framework to build custom actions/triggers that can be used seamlessly in your workflows. Some of our customers write their own based on their requirements and sometimes we quickly develop one for them. You just need a Java API (Ex: RabbitMQ integration) or an HTTP interface (Ex: Twitter integration) to build one yourself.

Custom plugins make it lot easier to manage your workflow integration points with external systems and enjoy load of Flux benefits like scheduling, file transfer, error handling, variable substitution, runtime data mapping, job dependencies, and the most important real-time monitoring from within a web browser.

Developing applications around Twitter is always fun with their well documented REST APIs. Twitter notification plugin supports OAuth for authentication when accessing protected resources and does all the magic behind the scenes without any user intervention for getting OAuth access token and invoking the protected resource.

The next thing I developed was a messaging plugin based on RabbitMQ. RabbitMQ is a popular high-performant AMQP implementation built using erlang. Using RabbitMQ plugin, users can publish and consume messages from its queue and it supports variety of topologies (fanout, direct, etc). RabbitMQ provides a decent Java API to interface with the broker and integrating it with Flux is no brainer. Now comes the interesting part, I need to show something so I can keep my long story short :-)

Here is a simple Flux web designer screenshot of Twitter plugin configured to send a direct message to a user:

Here is a simple Flux web designer screenshot of RabbitMQ plugin configured to publish message to a queue:.

Here is a simple Flux web designer screenshot of Twitter plugin configured to consumes messages from a queue:

The next thing you may want to do is to try out yourself. Maven users should include these dependencies in your POM.

<dependency>
  <groupId>flux</groupId>
  <artifactId>flux</artifactId>
  <version>7.10.1</version>
</dependency>
<dependency>
  <groupId>flux</groupId>
  <artifactId>rabbitmq-action</artifactId>
  <version>1.0.0</version>
</dependency>
<dependency>
  <groupId>flux</groupId>
  <artifactId>rabbitmq-trigger</artifactId>
  <version>1.0.0</version>
</dependency>
<dependency>
  <groupId>flux</groupId>
  <artifactId>twitter-action</artifactId>
  <version>1.0.0</version>
</dependency>

And, add the following repository to your repositories tag in your POM.

<repository>
  <id>maven2-repository.dev.java.net</id>
  <name>Java.net Repository for Maven</name>
  <url>http://download.java.net/maven/2/</url>
  <layout>default</layout>
</repository>

Non-maven users can download it from the project website. If you wish to use the Java APIs, here are some examples to get started. Complete examples are available in the source repository.

Publishing an AMQP message to a RabbitMQ channel would look something like this using Java API:

FlowChart flowChart = helper.makeFlowChart("RabbitMQ Action Example");
RabbitMQActionFactory customActionFactory = (RabbitMQActionFactory) flowChart.makeFactory("RabbitMQActionFactory");
RabbitMQAction rabbitMQAction = customActionFactory.makeRabbitMQAction("Publish AMQP Message");
rabbitMQAction.setHost("localhost");
rabbitMQAction.setUsername("guest");
rabbitMQAction.setPassword("guest");
rabbitMQAction.setExchangeName("nasdaq.exchange");
rabbitMQAction.setExchangeType("direct");
rabbitMQAction.setQueueName("nasdaq.request");
rabbitMQAction.setQueueType("shared");
rabbitMQAction.setRoutingKey("nasdaq.request");
rabbitMQAction.setMessage("BUY GOOG 200");

//Schedule this job
String name = engine.put(flowChart);

Consuming a message using RabbitMQTrigger which monitors for messages in the RabbitMQ queue would look something like this using Java API:

FlowChart flowChart = helper.makeFlowChart("RabbitMQ Trigger Example");
RabbitMQTriggerFactory customTriggerFactory = (RabbitMQTriggerFactory) flowChart.makeFactory("RabbitMQTriggerFactory");
RabbitMQTrigger rabbitMQTrigger = customTriggerFactory.makeRabbitMQTrigger("Consume AMQP Message");
rabbitMQTrigger.setHost("localhost");
rabbitMQTrigger.setUsername("guest");
rabbitMQTrigger.setPassword("guest");
rabbitMQTrigger.setQueueName("nasdaq.request");

JavaAction messageProcessor = flowChart.makeJavaAction("Message Processor");
messageProcessor.setListener(MessageProcessor.class);

rabbitMQTrigger.addFlow(messageProcessor);

//loop back required for polling
messageProcessor.addFlow(rabbitMQTrigger);

//Schedule this job
String name = engine.put(flowChart);

Twitter plugin allows to access OAuth protected resources. For example, retrieving twitter mentions would look something like this using Java API:

FlowChart flowChart = helper.makeFlowChart("Twitter Action Example");
TwitterActionFactory customActionFactory = (TwitterActionFactory) flowChart.makeFactory("TwitterActionFactory");
TwitterAction twitterMentions = customActionFactory.makeTwitterAction("Twitter Action");
twitterMentions.setConsumerKey(CONSUMER_KEY);
twitterMentions.setConsumerSecret(CONSUMER_SECRET);
twitterMentions.setOAuthEnabled(true);
twitterMentions.setRequestMethod(RestActionType.GET);
twitterMentions.setResourceUrl("http://api.twitter.com/1/statuses/mentions.json");
twitterMentions.setUsername(TWITTER_USER);
twitterMentions.setPassword(TWITTER_PASS);

//Schedule this job
engine.put(flowChart);

For example, sending a direct message would look something like this using Java API:

TwitterAction directMessage = customActionFactory.makeTwitterAction("Twitter DM");
directMessage.setConsumerKey(CONSUMER_KEY);
directMessage.setConsumerSecret(CONSUMER_SECRET);
directMessage.setOAuthEnabled(true);
directMessage.setRequestMethod(RestActionType.POST);
directMessage.setResourceUrl("http://api.twitter.com/1/direct_messages/new.xml");
directMessage.setUsername(TWITTER_USER);

//add the required parameters
Properties properties = new Properties();
properties.put("screen_name", "fluxarul");
properties.put("user_id", "97133917");
properties.put("text", "Hello from Flux Twitter Action!");

directMessage.setParameters(properties);
directMessage.setPassword(TWITTER_PASS);

//Schedule this job
engine.put(flowChart);

The Flux plugins project is hosted at bitbucket.org. Feel free to check it out, shout should you have any feedback or log an issue for bugs/improvements or support for additional plugins.

Possibly Related Posts:


Comments are closed.