Aloha from Hawaii! Shame on me to not blog for months. Well, honestly I have been quite busy during the past few months. With the big move to Hawaii in December and the recent launch of Floify kept me occupied. Although moving to a new place is fun, getting settled on an island can sometimes be more stressful. Now all that is taken care, there is no good reason to not blog from rainbow nation 🙂

Recently, I was working with an evaluator implementing a proof of concept where they had a requirement for scheduling workflows from a web service request. In Flux, this can be easily implemented as a trigger which typically waits for an event to occur. Flux supports popular triggers out of the box, which are either based on timer or database condition or file or audit trail or mail. While ago, I implemented a custom web service trigger which supports both SOAP and HTTP requests and expedites the workflow to the subsequent step. The sample testcase below shows how web service trigger can be implemented in your Flux workflows.

The sample client code that expedites the trigger is shown below.

With this plugin, your workflows can be designed to trigger based on web service requests and it also allows users to configure an embedded web server that runs part of the workflow. If this were one shot workflows, this nicely fits the bill. If the requirement were to support a recurring workflow, I would classify this approach as heavy weight mainly because it spins a web server for each run of your workflow which may not be ideal for high performance workflows. It makes more sense to reuse a single web server instance that accepts requests and triggers a workflow template from the repository. I do not believe this can be easily supported without making some core changes in Flux. But, it is not the end of the world, there is a more efficient way to implement this in Flux today by using Flux Cluster APIs.

Flux 7.11 Operations Console exposes set of APIs that allows clients to talk to Flux engine cluster via simple HTTP interface. In essence, Flux Opsconsole acts as a web service trigger for the cluster. You can find the documentation of these APIs here. The API that schedules a workflow template from repository will be available in 7.11.4. If you would like to try out this, you can request a 7.11.4 build from Flux support, we would be happy to send one on your way. The Operations Console deploys HTTP resources that can be accessed in a number of ways. If you were a Java shop, you would consider something similar shown in the Gist below. This sample uses Jersey APIs to POST a multipart request to the endpoint. You just need to toss in restaction.jar which is part of the Flux distribution.

In this example, we add three properties to a multipart request. The first one “template” is required and specifies the name of the template in the repository. The “category” and “rate” are optional variables that will be made available in your workflow instance. You can add as many data points that you would like to pass on to your workflow instance. You can also optionally customize the name of the workflow instance that you would like to spin off from the repository by setting the “namespace” property in the request.

There is another API that might interest you as well. This API schedules a given flow chart to the engine. The major difference here is your are exporting a workflow file to the engine instead of spinning an instance from existing repository template.

Enjoy developing in Flux and Mahalo for stopping by!

Possibly Related Posts:


Flux’s RestAction is the go-to construct for users orchestrating Flux workflows that involve web service integration. Be it HTTP-based services or SOAP-based services that implement HTTP binding, RestAction comes very handy for developers. Flux has a WebServiceAction which talks SOAP exclusively for services that implement WS-* such as Addressing, Security etc. RestAction has neat XPath integration, which allows users to navigate the response using XPath query and bind the result to domain model that can be seamlessly used in your workflow context.

In this post, let us look at a simple Weather Web Service which supports both SOAP and HTTP binding. I will be using Eric’s simple test framework to demonstrate the use of RestAction and WebServiceAction in your Flux workflow.

Let us see how to invoke the Weather web service using the traditional WebServiceAction that uses the WSDL. We pass the zip code as a parameter to GetCityForecastByZIP operation and the response is handled using a JavaAction. In this case the response is bound to ForecastReturn complex type.

package flux.test;

import com.cdyne.ws.weatherws.Forecast;
import com.cdyne.ws.weatherws.ForecastReturn;
import flux.*;
import org.junit.Test;

import java.net.URL;
import java.util.Properties;

public class WeatherWebServicesTest extends AbstractFluxTest {

 @Test
 public void testWebServiceAction() throws Exception {
   log.info("[START] FluxTest.testWebServiceAction");
   String namespace = "/FluxFlowChart";

   FlowChart flowChart = engineHelper.makeFlowChart(namespace);

   WebServiceAction webServiceAction = flowChart.makeWebService("GET Weather Info");

   // Specify the WSDL the web service action is to use.
   webServiceAction.setWsdl(new URL("http://wsf.cdyne.com/WeatherWS/Weather.asmx?WSDL"));

   // Specify the signature of the method to call.
   webServiceAction.setListenerSignature("GetCityForecastByZIP(String)");

   // Specify the arguments to pass to the method.
   webServiceAction.setArgument(0, "59102");

   JavaAction javaAction = flowChart.makeJavaAction("Java Action");
   javaAction.setListener(WebServiceActionListener.class);

   webServiceAction.addFlow(javaAction);

   engine.put(flowChart);

   waitForRuns(namespace, 1, 5, 30);

   log.info("[END] FluxTest.testWebServiceAction");
 }
}

The ActionListener implementation that processes the ForecastReturn from WebServiceAction.

Let us invoke the same operation using RestAction, which uses the HTTP binding. Here we set the query parameter “ZIP” and bind the response to ForecastReturn POJO (generated using: xjc -wsdl http://wsf.cdyne.com/WeatherWS/Weather.asmx?wsdl).

  @Test
  public void testRestActionGET() throws Exception {
    log.info("[START] FluxTest.testRestActionGET");
    String namespace = "/FluxFlowChart";

    FlowChart flowChart = engineHelper.makeFlowChart(namespace);

    RestAction restAction = flowChart.makeRestAction("GET Weather Info");

    // Specify the URL the rest action is to use.
    restAction.setBaseUrl(new URL("http://wsf.cdyne.com/WeatherWS/Weather.asmx/GetCityForecastByZIP"));

    // Specify the Action type.
    restAction.setActionType(RestActionType.GET);

    // Specify the Response type.
    restAction.setResponseType(ForecastReturn.class);

    // Specify the query parameters to be sent as part of the request.
    Properties queryParams = new Properties();
    queryParams.put("ZIP", "59102");

    restAction.setQueryParameters(queryParams);

    JavaAction javaAction = flowChart.makeJavaAction("Java Action");
    javaAction.setListener(RestActionListener.class);

    restAction.addFlow(javaAction);

    engine.put(flowChart);

    waitForRuns(namespace, 1, 5, 30);

    log.info("[END] FluxTest.testRestActionGET");
  }

In this example, we are performing an XPath query that returns an array of Forecast elements.

  @Test
  public void testRestActionXPathArray() throws Exception {
    log.info("[START] FluxTest.testRestActionXPathArray");
    String namespace = "/FluxFlowChart";

    FlowChart flowChart = engineHelper.makeFlowChart(namespace);

    RestAction restAction = flowChart.makeRestAction("GET Weather Info");

    // Specify the URL the rest action is to use.
    restAction.setBaseUrl(new URL("http://wsf.cdyne.com/WeatherWS/Weather.asmx/GetCityForecastByZIP"));

    // Specify the Action type.
    restAction.setActionType(RestActionType.GET);

    // Specify the query parameters to be sent as part of the request.
    Properties queryParams = new Properties();
    queryParams.put("ZIP", "59102");

    restAction.setQueryParameters(queryParams);

    // Specify the XML namespace, if one defined
    Properties namespaces = new Properties();
    namespaces.put("ws", "http://ws.cdyne.com/WeatherWS/");
    restAction.setNamespaces(namespaces);

    // Specify the XPath expression using the defined namespace
    restAction.setXpathExpression("/ws:ForecastReturn/ws:ForecastResult/ws:Forecast");// this xpath returns an array of Forecast elements

    // Specify the XPath response type.
    restAction.setXpathResponseType(Forecast[].class);

    JavaAction javaAction = flowChart.makeJavaAction("Java Action");
    javaAction.setListener(RestActionListener.class);

    restAction.addFlow(javaAction);

    engine.put(flowChart);

    waitForRuns(namespace, 1, 5, 30);

    log.info("[END] FluxTest.testRestActionXPathArray");
  }

In this example, we are performing an XPath query that returns the first Forecast element from the array.

  @Test
  public void testRestActionXPathPOJO() throws Exception {
    log.info("[START] FluxTest.testRestActionXPathPOJO");
    String namespace = "/FluxFlowChart";

    FlowChart flowChart = engineHelper.makeFlowChart(namespace);

    RestAction restAction = flowChart.makeRestAction("GET Weather Info");

    // Specify the URL the rest action is to use.
    restAction.setBaseUrl(new URL("http://wsf.cdyne.com/WeatherWS/Weather.asmx/GetCityForecastByZIP"));

    // Specify the Action type.
    restAction.setActionType(RestActionType.GET);

    // Specify the query parameters to be sent as part of the request.
    Properties queryParams = new Properties();
    queryParams.put("ZIP", "59102");

    restAction.setQueryParameters(queryParams);

    // Specify the XML namespace, if one defined
    Properties namespaces = new Properties();
    namespaces.put("ws", "http://ws.cdyne.com/WeatherWS/");
    restAction.setNamespaces(namespaces);

    // Specify the XPath expression using the defined namespace
    restAction.setXpathExpression("/ws:ForecastReturn/ws:ForecastResult/ws:Forecast[1]");// this xpath returns the first element from the list

    // Specify the XPath response type.
    restAction.setXpathResponseType(Forecast.class);

    JavaAction javaAction = flowChart.makeJavaAction("Java Action");
    javaAction.setListener(RestActionListener.class);

    restAction.addFlow(javaAction);

    engine.put(flowChart);

    waitForRuns(namespace, 1, 5, 30);

    log.info("[END] FluxTest.testRestActionXPathPOJO");
  }

In this example, we are performing an XPath query that returns an attribute of ForecastReturn element.

  @Test
  public void testRestActionXPathAttribute() throws Exception {
    log.info("[START] FluxTest.testRestActionXPathAttribute");
    String namespace = "/FluxFlowChart";

    FlowChart flowChart = engineHelper.makeFlowChart(namespace);

    RestAction restAction = flowChart.makeRestAction("GET Weather Info");

    // Specify the URL the rest action is to use.
    restAction.setBaseUrl(new URL("http://wsf.cdyne.com/WeatherWS/Weather.asmx/GetCityForecastByZIP"));

    // Specify the Action type.
    restAction.setActionType(RestActionType.GET);

    // Specify the query parameters to be sent as part of the request.
    Properties queryParams = new Properties();
    queryParams.put("ZIP", "59102");

    restAction.setQueryParameters(queryParams);

    // Specify the XML namespace, if one defined
    Properties namespaces = new Properties();
    namespaces.put("ws", "http://ws.cdyne.com/WeatherWS/");
    restAction.setNamespaces(namespaces);

    // Specify the XPath expression using the defined namespace
    restAction.setXpathExpression("/ws:ForecastReturn/ws:City/text()");// this xpath returns the name of the city

    // Specify the XPath response type.
    restAction.setXpathResponseType(String.class);

    JavaAction javaAction = flowChart.makeJavaAction("Java Action");
    javaAction.setListener(RestActionListener.class);

    restAction.addFlow(javaAction);

    engine.put(flowChart);

    waitForRuns(namespace, 1, 5, 30);

    log.info("[END] FluxTest.testRestActionXPathAttribute");
  }

The ActionListener implementation that processes various results from the RestAction is shown below.

Here is a screenshot of the test output from Intellij.

You need to toss in the following jars from Flux distribution:

flux.jar
lib/restaction.jar
lib/webserviceaction.jar
lib/commons-logging-1.1.1.jar
examples/software_developers/web_services/lib/weatherws-generated.jar

Test dependencies include:

log4j-1.2.15.jar
junit-4.9.jar


Download the latest Flux 7.11 version here and check it out. The samples are available in flux-goodies bitbucket repository. You can also find more samples ($FLUX_HOME/examples/software_developers/web_services and $FLUX_HOME/examples/software_developers/rest_services) in the latest Flux 7.11.2 distribution. Enjoy!

Possibly Related Posts:


Automated SCP file transfers can be orchestrated in your Flux workflow using SCPAction. This action supports file uploads or downloads using SCP. SCP is basically a remote copy (rcp) through a SSH tunnel. It is easy to setup key-based SSH login for your systems, here is one article describing the steps for Mac. You could also run arbitrary system commands or scripts using this action.

Here is a screen shot showing SCPAction configuration that downloads logs from a linux host.

Here is a screen shot showing SCPAction configuration that executes a remote disk check command for a linux host.

A sample test case that shows how this action could be integrated within a Flux workflow using Java API.

Maven users could just drop this dependency in your POM to use this plugin in your project.

<dependency>
  <groupId>com.fluxcorp.plugins</groupId>
  <artifactId>scp-action</artifactId>
  <version>1.0.3.SNAPSHOT</version>
</dependency>

Make sure you include this maven repository where the plugin snapshots are deployed.

<repository>
  <id>sonatype-nexus-snapshots</id>
  <name>sonatype-nexus-snapshots</name>
  <url>https://oss.sonatype.org/content/repositories/snapshots/</url>
</repository>

There are few more interesting plugins available, you can check the project wiki for more details. Download the latest Flux 7.11 version and check out these cool plugins. Enjoy!

Possibly Related Posts:


One of our customer was interested in performing a basic HTTP file transfer function using Flux APIs. I implemented a trivial HTTP file transfer Action in Flux plugins project. Here is a sample test case that shows how this action could be integrated within a Flux workflow.

Maven users could just drop this dependency in your POM to use this plugin in your project.

<dependency>
  <groupId>com.fluxcorp.plugins</groupId>
  <artifactId>http-filetransfer-action</artifactId>
  <version>1.0.2.SNAPSHOT</version>
</dependency>

Make sure you include this maven repository where the plugin snapshots are deployed.

<repository>
  <id>sonatype-nexus-snapshots</id>
  <name>sonatype-nexus-snapshots</name>
  <url>https://oss.sonatype.org/content/repositories/snapshots/</url>
</repository>

There are few more interesting plugins available, you can check the project wiki for more details. Enjoy!

Possibly Related Posts:


Recently I started developing some custom Flux plugins I wanted to share with Flux users. It is always fun to develop custom plugins and use them within a Flux workflow. Our users are more used to the concept of custom actions and triggers. I believe plugin is a more generic and widely used terminology these days. Flux 7.10 ships over 50 built-in plugins (actions/triggers) which covers most of the common customer requirements (ex: TimerTrigger, ProcessAction, MailAction, Split, Join, etc and a slew of File based triggers and actions). Writing your own is a child’s play as Flux uses plain-old JavaBean framework to build custom actions/triggers that can be used seamlessly in your workflows. Some of our customers write their own based on their requirements and sometimes we quickly develop one for them. You just need a Java API (Ex: RabbitMQ integration) or an HTTP interface (Ex: Twitter integration) to build one yourself.

Custom plugins make it lot easier to manage your workflow integration points with external systems and enjoy load of Flux benefits like scheduling, file transfer, error handling, variable substitution, runtime data mapping, job dependencies, and the most important real-time monitoring from within a web browser.

Developing applications around Twitter is always fun with their well documented REST APIs. Twitter notification plugin supports OAuth for authentication when accessing protected resources and does all the magic behind the scenes without any user intervention for getting OAuth access token and invoking the protected resource.

The next thing I developed was a messaging plugin based on RabbitMQ. RabbitMQ is a popular high-performant AMQP implementation built using erlang. Using RabbitMQ plugin, users can publish and consume messages from its queue and it supports variety of topologies (fanout, direct, etc). RabbitMQ provides a decent Java API to interface with the broker and integrating it with Flux is no brainer. Now comes the interesting part, I need to show something so I can keep my long story short 🙂

Here is a simple Flux web designer screenshot of Twitter plugin configured to send a direct message to a user:

Here is a simple Flux web designer screenshot of RabbitMQ plugin configured to publish message to a queue:.

Here is a simple Flux web designer screenshot of Twitter plugin configured to consumes messages from a queue:

The next thing you may want to do is to try out yourself. Maven users should include these dependencies in your POM.

<dependency>
  <groupId>flux</groupId>
  <artifactId>flux</artifactId>
  <version>7.10.1</version>
</dependency>
<dependency>
  <groupId>flux</groupId>
  <artifactId>rabbitmq-action</artifactId>
  <version>1.0.0</version>
</dependency>
<dependency>
  <groupId>flux</groupId>
  <artifactId>rabbitmq-trigger</artifactId>
  <version>1.0.0</version>
</dependency>
<dependency>
  <groupId>flux</groupId>
  <artifactId>twitter-action</artifactId>
  <version>1.0.0</version>
</dependency>

And, add the following repository to your repositories tag in your POM.

<repository>
  <id>maven2-repository.dev.java.net</id>
  <name>Java.net Repository for Maven</name>
  <url>http://download.java.net/maven/2/</url>
  <layout>default</layout>
</repository>

Non-maven users can download it from the project website. If you wish to use the Java APIs, here are some examples to get started. Complete examples are available in the source repository.

Publishing an AMQP message to a RabbitMQ channel would look something like this using Java API:

FlowChart flowChart = helper.makeFlowChart("RabbitMQ Action Example");
RabbitMQActionFactory customActionFactory = (RabbitMQActionFactory) flowChart.makeFactory("RabbitMQActionFactory");
RabbitMQAction rabbitMQAction = customActionFactory.makeRabbitMQAction("Publish AMQP Message");
rabbitMQAction.setHost("localhost");
rabbitMQAction.setUsername("guest");
rabbitMQAction.setPassword("guest");
rabbitMQAction.setExchangeName("nasdaq.exchange");
rabbitMQAction.setExchangeType("direct");
rabbitMQAction.setQueueName("nasdaq.request");
rabbitMQAction.setQueueType("shared");
rabbitMQAction.setRoutingKey("nasdaq.request");
rabbitMQAction.setMessage("BUY GOOG 200");

//Schedule this job
String name = engine.put(flowChart);

Consuming a message using RabbitMQTrigger which monitors for messages in the RabbitMQ queue would look something like this using Java API:

FlowChart flowChart = helper.makeFlowChart("RabbitMQ Trigger Example");
RabbitMQTriggerFactory customTriggerFactory = (RabbitMQTriggerFactory) flowChart.makeFactory("RabbitMQTriggerFactory");
RabbitMQTrigger rabbitMQTrigger = customTriggerFactory.makeRabbitMQTrigger("Consume AMQP Message");
rabbitMQTrigger.setHost("localhost");
rabbitMQTrigger.setUsername("guest");
rabbitMQTrigger.setPassword("guest");
rabbitMQTrigger.setQueueName("nasdaq.request");

JavaAction messageProcessor = flowChart.makeJavaAction("Message Processor");
messageProcessor.setListener(MessageProcessor.class);

rabbitMQTrigger.addFlow(messageProcessor);

//loop back required for polling
messageProcessor.addFlow(rabbitMQTrigger);

//Schedule this job
String name = engine.put(flowChart);

Twitter plugin allows to access OAuth protected resources. For example, retrieving twitter mentions would look something like this using Java API:

FlowChart flowChart = helper.makeFlowChart("Twitter Action Example");
TwitterActionFactory customActionFactory = (TwitterActionFactory) flowChart.makeFactory("TwitterActionFactory");
TwitterAction twitterMentions = customActionFactory.makeTwitterAction("Twitter Action");
twitterMentions.setConsumerKey(CONSUMER_KEY);
twitterMentions.setConsumerSecret(CONSUMER_SECRET);
twitterMentions.setOAuthEnabled(true);
twitterMentions.setRequestMethod(RestActionType.GET);
twitterMentions.setResourceUrl("http://api.twitter.com/1/statuses/mentions.json");
twitterMentions.setUsername(TWITTER_USER);
twitterMentions.setPassword(TWITTER_PASS);

//Schedule this job
engine.put(flowChart);

For example, sending a direct message would look something like this using Java API:

TwitterAction directMessage = customActionFactory.makeTwitterAction("Twitter DM");
directMessage.setConsumerKey(CONSUMER_KEY);
directMessage.setConsumerSecret(CONSUMER_SECRET);
directMessage.setOAuthEnabled(true);
directMessage.setRequestMethod(RestActionType.POST);
directMessage.setResourceUrl("http://api.twitter.com/1/direct_messages/new.xml");
directMessage.setUsername(TWITTER_USER);

//add the required parameters
Properties properties = new Properties();
properties.put("screen_name", "fluxarul");
properties.put("user_id", "97133917");
properties.put("text", "Hello from Flux Twitter Action!");

directMessage.setParameters(properties);
directMessage.setPassword(TWITTER_PASS);

//Schedule this job
engine.put(flowChart);

The Flux plugins project is hosted at bitbucket.org. Feel free to check it out, shout should you have any feedback or log an issue for bugs/improvements or support for additional plugins.

Possibly Related Posts:


A video is worth 10000 words! Download today and experience it yourself.

opsconsole

Enjoy!

Possibly Related Posts:


I recently came across this interesting question on Job Scheduling in Java in the widely popular experts-exchange website. Unfortunately, you require subscription to see “expert” answers, unlike Stack Overflow. Here is my take on this FAQ:

1) Is it possible to use Sun’s own java.util.TimerTask for my complex report scheduling?

Timer facility has limited capabilities when compared to JDK 5 concurrency utilities. This is best explained in Chapter 6 (6.2.5 Delayed and periodic tasks) of Java Concurrency In Practice by Brian Goetz, the best Java concurrency book on the planet. Fortunately, you can access this sample chapter Task Execution.

Here is the gist from the book (for the impatience):

a) A Timer creates only a single thread for executing timer tasks. If a timer task takes too long to run, the timing accuracy of other TimerTasks can suffer.

b) Another problem with Timer is that it behaves poorly if a TimerTask throws an unchecked exception. The Timer thread doesn’t catch the exception, so an unchecked exception thrown from a TimerTask terminates the timer thread. Timer also doesn’t resurrect the thread in this situation; instead, it erroneously assumes the entire Timer was cancelled. In this case, TimerTasks that are already scheduled but not yet executed are never run, and new tasks cannot be scheduled.

2) What are the valid/strong reasons/limitations of java.util.TimerTask compared to other job scheduler frameworks? So that I myself have a strong belief/reason before choosing a third-party job scheduler framework.

I do not believe Timer facility can be compared with other job scheduling frameworks as it does not provide features that are critical in a typical job scheduling environment. Here is what Flux offers (this is only a subset of the total feature set) :

a) Time and Event based scheduling.
b) Agent/Agentless scheduling.
c) Built-in File monitoring or transfer (FTP, FTP over SSL, SFTP, UNC hosts).
d) Integration with JEE (JMS, EJB, Mail) and Web Services (SOAP, REST).
e) Built-in Error Handling and supports email notifications.
f) Logging and Audit Trail.
g) Web-based Designer, Swing-based Designer.
h) Operations Console (Realtime job monitoring, management and reporting).
i) Built-in Security using JAAS and supports integration with LDAP.
j) Automatic Clustering, Failover, and Load Balancing of Jobs.

For complete feature set, please refer the Flux manual.

3) There are a maximum of 100-200 Users in my application. In case, Users have scheduled reports in such a way at one time there are 100 report requests in the queue. How does the job scheduler framework OR java.util.TimerTask handle such scenarios? Do we have control over this?

In Flux, we have the concept of concurrency throttles which allows users to control how many jobs that can be executed on the Flux engine at any given instant. You can define a concurrency throttle that governs a single Flux instance, or you can define a concurrency throttle that governs all jobs in all Flux instances on your cluster. These coarse-grained concurrency throttles control the parallel execution of large numbers of jobs in one step.

You can also define fine-grained concurrency throttles. For each branch in your job tree, you can define a unique concurrency throttle that governs only the jobs in that branch. More on this can be found in our manual.

4) At any time, Users are allowed to change their report schedules. Does the job scheduler framework support this?

In Flux, this can be achieved either using the Operations Console which allows users to reschedule their job schedule using the web interface or using the Java API programatically. Operations Console provides users advanced monitoring capabilities and it allows users to view jobs as well as pause, resume, remove, interrupt, and expedite them.

5) Obviously, to run a report there are report inputs, that has to be passed to each report schedule. Do we have the flexibility/option in passing parameters to the job scheduler framework?

This is a very common requirement for any job scheduler and Flux supports it using the concept of variable managers (Java API) which allows users to pass parameters to their jobs. Flux also has a neat feature called variable substitution, which can be quite powerful when designing jobs and externalizing certain server specific information (such as host names, credentials, etc) from the job.

6) Which is the best way? Integrating job scheduler framework with web application or running it as a standalone?

Flux supports both the approaches. Quite honestly, it depends on how you would want to leverage the scheduling capabilities within your application. It is a common usecase to embed Flux within applications as Flux provides extensive Java APIs.

It takes less than 5 minutes to setup Flux up and running with an in-memory database. Can you believe that? Give it a try yourself. Let me know what you think.

Possibly Related Posts: