Aloha from Hawaii! Shame on me to not blog for months. Well, honestly I have been quite busy during the past few months. With the big move to Hawaii in December and the recent launch of Floify kept me occupied. Although moving to a new place is fun, getting settled on an island can sometimes be more stressful. Now all that is taken care, there is no good reason to not blog from rainbow nation 🙂

Recently, I was working with an evaluator implementing a proof of concept where they had a requirement for scheduling workflows from a web service request. In Flux, this can be easily implemented as a trigger which typically waits for an event to occur. Flux supports popular triggers out of the box, which are either based on timer or database condition or file or audit trail or mail. While ago, I implemented a custom web service trigger which supports both SOAP and HTTP requests and expedites the workflow to the subsequent step. The sample testcase below shows how web service trigger can be implemented in your Flux workflows.

The sample client code that expedites the trigger is shown below.

With this plugin, your workflows can be designed to trigger based on web service requests and it also allows users to configure an embedded web server that runs part of the workflow. If this were one shot workflows, this nicely fits the bill. If the requirement were to support a recurring workflow, I would classify this approach as heavy weight mainly because it spins a web server for each run of your workflow which may not be ideal for high performance workflows. It makes more sense to reuse a single web server instance that accepts requests and triggers a workflow template from the repository. I do not believe this can be easily supported without making some core changes in Flux. But, it is not the end of the world, there is a more efficient way to implement this in Flux today by using Flux Cluster APIs.

Flux 7.11 Operations Console exposes set of APIs that allows clients to talk to Flux engine cluster via simple HTTP interface. In essence, Flux Opsconsole acts as a web service trigger for the cluster. You can find the documentation of these APIs here. The API that schedules a workflow template from repository will be available in 7.11.4. If you would like to try out this, you can request a 7.11.4 build from Flux support, we would be happy to send one on your way. The Operations Console deploys HTTP resources that can be accessed in a number of ways. If you were a Java shop, you would consider something similar shown in the Gist below. This sample uses Jersey APIs to POST a multipart request to the endpoint. You just need to toss in restaction.jar which is part of the Flux distribution.

In this example, we add three properties to a multipart request. The first one “template” is required and specifies the name of the template in the repository. The “category” and “rate” are optional variables that will be made available in your workflow instance. You can add as many data points that you would like to pass on to your workflow instance. You can also optionally customize the name of the workflow instance that you would like to spin off from the repository by setting the “namespace” property in the request.

There is another API that might interest you as well. This API schedules a given flow chart to the engine. The major difference here is your are exporting a workflow file to the engine instead of spinning an instance from existing repository template.

Enjoy developing in Flux and Mahalo for stopping by!

Possibly Related Posts:


Flux’s RestAction is the go-to construct for users orchestrating Flux workflows that involve web service integration. Be it HTTP-based services or SOAP-based services that implement HTTP binding, RestAction comes very handy for developers. Flux has a WebServiceAction which talks SOAP exclusively for services that implement WS-* such as Addressing, Security etc. RestAction has neat XPath integration, which allows users to navigate the response using XPath query and bind the result to domain model that can be seamlessly used in your workflow context.

In this post, let us look at a simple Weather Web Service which supports both SOAP and HTTP binding. I will be using Eric’s simple test framework to demonstrate the use of RestAction and WebServiceAction in your Flux workflow.

Let us see how to invoke the Weather web service using the traditional WebServiceAction that uses the WSDL. We pass the zip code as a parameter to GetCityForecastByZIP operation and the response is handled using a JavaAction. In this case the response is bound to ForecastReturn complex type.

package flux.test;

import com.cdyne.ws.weatherws.Forecast;
import com.cdyne.ws.weatherws.ForecastReturn;
import flux.*;
import org.junit.Test;

import java.net.URL;
import java.util.Properties;

public class WeatherWebServicesTest extends AbstractFluxTest {

 @Test
 public void testWebServiceAction() throws Exception {
   log.info("[START] FluxTest.testWebServiceAction");
   String namespace = "/FluxFlowChart";

   FlowChart flowChart = engineHelper.makeFlowChart(namespace);

   WebServiceAction webServiceAction = flowChart.makeWebService("GET Weather Info");

   // Specify the WSDL the web service action is to use.
   webServiceAction.setWsdl(new URL("http://wsf.cdyne.com/WeatherWS/Weather.asmx?WSDL"));

   // Specify the signature of the method to call.
   webServiceAction.setListenerSignature("GetCityForecastByZIP(String)");

   // Specify the arguments to pass to the method.
   webServiceAction.setArgument(0, "59102");

   JavaAction javaAction = flowChart.makeJavaAction("Java Action");
   javaAction.setListener(WebServiceActionListener.class);

   webServiceAction.addFlow(javaAction);

   engine.put(flowChart);

   waitForRuns(namespace, 1, 5, 30);

   log.info("[END] FluxTest.testWebServiceAction");
 }
}

The ActionListener implementation that processes the ForecastReturn from WebServiceAction.

Let us invoke the same operation using RestAction, which uses the HTTP binding. Here we set the query parameter “ZIP” and bind the response to ForecastReturn POJO (generated using: xjc -wsdl http://wsf.cdyne.com/WeatherWS/Weather.asmx?wsdl).

  @Test
  public void testRestActionGET() throws Exception {
    log.info("[START] FluxTest.testRestActionGET");
    String namespace = "/FluxFlowChart";

    FlowChart flowChart = engineHelper.makeFlowChart(namespace);

    RestAction restAction = flowChart.makeRestAction("GET Weather Info");

    // Specify the URL the rest action is to use.
    restAction.setBaseUrl(new URL("http://wsf.cdyne.com/WeatherWS/Weather.asmx/GetCityForecastByZIP"));

    // Specify the Action type.
    restAction.setActionType(RestActionType.GET);

    // Specify the Response type.
    restAction.setResponseType(ForecastReturn.class);

    // Specify the query parameters to be sent as part of the request.
    Properties queryParams = new Properties();
    queryParams.put("ZIP", "59102");

    restAction.setQueryParameters(queryParams);

    JavaAction javaAction = flowChart.makeJavaAction("Java Action");
    javaAction.setListener(RestActionListener.class);

    restAction.addFlow(javaAction);

    engine.put(flowChart);

    waitForRuns(namespace, 1, 5, 30);

    log.info("[END] FluxTest.testRestActionGET");
  }

In this example, we are performing an XPath query that returns an array of Forecast elements.

  @Test
  public void testRestActionXPathArray() throws Exception {
    log.info("[START] FluxTest.testRestActionXPathArray");
    String namespace = "/FluxFlowChart";

    FlowChart flowChart = engineHelper.makeFlowChart(namespace);

    RestAction restAction = flowChart.makeRestAction("GET Weather Info");

    // Specify the URL the rest action is to use.
    restAction.setBaseUrl(new URL("http://wsf.cdyne.com/WeatherWS/Weather.asmx/GetCityForecastByZIP"));

    // Specify the Action type.
    restAction.setActionType(RestActionType.GET);

    // Specify the query parameters to be sent as part of the request.
    Properties queryParams = new Properties();
    queryParams.put("ZIP", "59102");

    restAction.setQueryParameters(queryParams);

    // Specify the XML namespace, if one defined
    Properties namespaces = new Properties();
    namespaces.put("ws", "http://ws.cdyne.com/WeatherWS/");
    restAction.setNamespaces(namespaces);

    // Specify the XPath expression using the defined namespace
    restAction.setXpathExpression("/ws:ForecastReturn/ws:ForecastResult/ws:Forecast");// this xpath returns an array of Forecast elements

    // Specify the XPath response type.
    restAction.setXpathResponseType(Forecast[].class);

    JavaAction javaAction = flowChart.makeJavaAction("Java Action");
    javaAction.setListener(RestActionListener.class);

    restAction.addFlow(javaAction);

    engine.put(flowChart);

    waitForRuns(namespace, 1, 5, 30);

    log.info("[END] FluxTest.testRestActionXPathArray");
  }

In this example, we are performing an XPath query that returns the first Forecast element from the array.

  @Test
  public void testRestActionXPathPOJO() throws Exception {
    log.info("[START] FluxTest.testRestActionXPathPOJO");
    String namespace = "/FluxFlowChart";

    FlowChart flowChart = engineHelper.makeFlowChart(namespace);

    RestAction restAction = flowChart.makeRestAction("GET Weather Info");

    // Specify the URL the rest action is to use.
    restAction.setBaseUrl(new URL("http://wsf.cdyne.com/WeatherWS/Weather.asmx/GetCityForecastByZIP"));

    // Specify the Action type.
    restAction.setActionType(RestActionType.GET);

    // Specify the query parameters to be sent as part of the request.
    Properties queryParams = new Properties();
    queryParams.put("ZIP", "59102");

    restAction.setQueryParameters(queryParams);

    // Specify the XML namespace, if one defined
    Properties namespaces = new Properties();
    namespaces.put("ws", "http://ws.cdyne.com/WeatherWS/");
    restAction.setNamespaces(namespaces);

    // Specify the XPath expression using the defined namespace
    restAction.setXpathExpression("/ws:ForecastReturn/ws:ForecastResult/ws:Forecast[1]");// this xpath returns the first element from the list

    // Specify the XPath response type.
    restAction.setXpathResponseType(Forecast.class);

    JavaAction javaAction = flowChart.makeJavaAction("Java Action");
    javaAction.setListener(RestActionListener.class);

    restAction.addFlow(javaAction);

    engine.put(flowChart);

    waitForRuns(namespace, 1, 5, 30);

    log.info("[END] FluxTest.testRestActionXPathPOJO");
  }

In this example, we are performing an XPath query that returns an attribute of ForecastReturn element.

  @Test
  public void testRestActionXPathAttribute() throws Exception {
    log.info("[START] FluxTest.testRestActionXPathAttribute");
    String namespace = "/FluxFlowChart";

    FlowChart flowChart = engineHelper.makeFlowChart(namespace);

    RestAction restAction = flowChart.makeRestAction("GET Weather Info");

    // Specify the URL the rest action is to use.
    restAction.setBaseUrl(new URL("http://wsf.cdyne.com/WeatherWS/Weather.asmx/GetCityForecastByZIP"));

    // Specify the Action type.
    restAction.setActionType(RestActionType.GET);

    // Specify the query parameters to be sent as part of the request.
    Properties queryParams = new Properties();
    queryParams.put("ZIP", "59102");

    restAction.setQueryParameters(queryParams);

    // Specify the XML namespace, if one defined
    Properties namespaces = new Properties();
    namespaces.put("ws", "http://ws.cdyne.com/WeatherWS/");
    restAction.setNamespaces(namespaces);

    // Specify the XPath expression using the defined namespace
    restAction.setXpathExpression("/ws:ForecastReturn/ws:City/text()");// this xpath returns the name of the city

    // Specify the XPath response type.
    restAction.setXpathResponseType(String.class);

    JavaAction javaAction = flowChart.makeJavaAction("Java Action");
    javaAction.setListener(RestActionListener.class);

    restAction.addFlow(javaAction);

    engine.put(flowChart);

    waitForRuns(namespace, 1, 5, 30);

    log.info("[END] FluxTest.testRestActionXPathAttribute");
  }

The ActionListener implementation that processes various results from the RestAction is shown below.

Here is a screenshot of the test output from Intellij.

You need to toss in the following jars from Flux distribution:

flux.jar
lib/restaction.jar
lib/webserviceaction.jar
lib/commons-logging-1.1.1.jar
examples/software_developers/web_services/lib/weatherws-generated.jar

Test dependencies include:

log4j-1.2.15.jar
junit-4.9.jar


Download the latest Flux 7.11 version here and check it out. The samples are available in flux-goodies bitbucket repository. You can also find more samples ($FLUX_HOME/examples/software_developers/web_services and $FLUX_HOME/examples/software_developers/rest_services) in the latest Flux 7.11.2 distribution. Enjoy!

Possibly Related Posts:


Automated SCP file transfers can be orchestrated in your Flux workflow using SCPAction. This action supports file uploads or downloads using SCP. SCP is basically a remote copy (rcp) through a SSH tunnel. It is easy to setup key-based SSH login for your systems, here is one article describing the steps for Mac. You could also run arbitrary system commands or scripts using this action.

Here is a screen shot showing SCPAction configuration that downloads logs from a linux host.

Here is a screen shot showing SCPAction configuration that executes a remote disk check command for a linux host.

A sample test case that shows how this action could be integrated within a Flux workflow using Java API.

Maven users could just drop this dependency in your POM to use this plugin in your project.

<dependency>
  <groupId>com.fluxcorp.plugins</groupId>
  <artifactId>scp-action</artifactId>
  <version>1.0.3.SNAPSHOT</version>
</dependency>

Make sure you include this maven repository where the plugin snapshots are deployed.

<repository>
  <id>sonatype-nexus-snapshots</id>
  <name>sonatype-nexus-snapshots</name>
  <url>https://oss.sonatype.org/content/repositories/snapshots/</url>
</repository>

There are few more interesting plugins available, you can check the project wiki for more details. Download the latest Flux 7.11 version and check out these cool plugins. Enjoy!

Possibly Related Posts:


One of our customer was interested in performing a basic HTTP file transfer function using Flux APIs. I implemented a trivial HTTP file transfer Action in Flux plugins project. Here is a sample test case that shows how this action could be integrated within a Flux workflow.

Maven users could just drop this dependency in your POM to use this plugin in your project.

<dependency>
  <groupId>com.fluxcorp.plugins</groupId>
  <artifactId>http-filetransfer-action</artifactId>
  <version>1.0.2.SNAPSHOT</version>
</dependency>

Make sure you include this maven repository where the plugin snapshots are deployed.

<repository>
  <id>sonatype-nexus-snapshots</id>
  <name>sonatype-nexus-snapshots</name>
  <url>https://oss.sonatype.org/content/repositories/snapshots/</url>
</repository>

There are few more interesting plugins available, you can check the project wiki for more details. Enjoy!

Possibly Related Posts:


Note: If you planning to co-locate JDK 5 alongside JDK 6 on Mac OS X Lion 10.7, refer this post. It worked like a charm on the recent 10.7.3

I recently installed Java for Mac OS X 10.6 Update 4 from Mac Software Update. This update basically installs 1.6.0_24 that contains fixes for security vulnerabilities from 1.6.0_22. I hesitated to apply this update for a while as I did not want to invite any trouble to my existing JDK 5 environment that was co-located with JDK 6 (shipped with Snow Leopard). We use JDK 5 to build Flux 7.10 as it requires 1.5. Now that I am mostly into Flux 7.11 development, which requires JDK 6, I switch back and forth on need basis.

I believe JDK 5 was not officially distributed in Snow Leopard, but JDK 5 was still popular and widely used when Snow Leopard first arrived. JDK 5 was officially retired for quite some time now, and I believe it is widely used in many production environments today. I had to find a way to have both 1.5 and 1.6 co-located on my development environment, and there were so many experiences shared by Java developers across the spectrum. The one which came to my rescue was from OneSwarm. It worked for me just fine and I have been living with that until I decided to upgrade to the latest Java update earlier this month. I knew I was going to mess with my development environment, but not a huge risk though. The upgrade went just fine and as expected the upgrade ripped off my JDK 1.5 and set the system Java preferences to JDK 1.6. I was not worried too much as I was using JDK 6 for Flux 7.11 development. So, I did not bother to look at setting up JDK 5 at that moment.

Today, I had to build Flux 7.10, so I had no other option than fixing my JDK mess. I had to dig through my notes to redo those steps. I wanted to share that experience so if any one runs into a similar requirement, you have a decent solution and update to the latest available JDK 1.5 distribution from Apple that is included in Mac OS X 10.5 updates.

Here are the three simple steps:

1. Download “Java for Mac OS X 10.5 Update 9” from Apple.

2. Assuming all your JDK versions point to your CurrentJDK which is JDK 1.6 under “/System/Library/Frameworks/JavaVM.framework/Versions/”.

Here is how mine looks:

Now, follow these instructions carefully:

cd /System/Library/Frameworks/JavaVM.framework/Versions/
sudo rm 1.5.0

3. You may want to install Pacifist (shareware app), which is handy for extracting OS X packages. Unlike other similar tools, this one allows users to select individual files and folders from the package to be extracted.

Right click on the DMG file (JavaForMacOSX10.5Update9.dmg) that you downloaded in Step 1 and select Open With -> Pacifist. Once the Pacifist opens up, just navigate to System -> Library -> Frameworks -> JavaVM.framework -> Versions -> 1.5.0 in the directory tree and right click on 1.5.0 directory and select “Install to Default Location”.

Here is a screen shot of this step:

After JDK 1.5.0 directory is extracted to the default location, the symlinks under “/System/Library/Frameworks/JavaVM.framework/Versions/” would look different, here is how mine looks after the update:

This shows that JDK 1.5.0 is no longer pointing to your CurrentJDK (JDK 1.6). You may want to reset the symlinks to point to the new 1.5.0 directory.

sudo rm 1.5
sudo ln -s 1.5.0 1.5

Now, you can point your JAVA_HOME to JDK 1.5.0 by setting the JAVA_HOME. For example,

export JAVA_HOME=/System/Library/Frameworks/JavaVM.framework/Versions/1.5/Home

java -version should print:

Voila! JDK 5 is ready to rock on your Mac along side JDK 6. Just switch between the latest and greatest versions you need.

Possibly Related Posts:


Recently I started developing some custom Flux plugins I wanted to share with Flux users. It is always fun to develop custom plugins and use them within a Flux workflow. Our users are more used to the concept of custom actions and triggers. I believe plugin is a more generic and widely used terminology these days. Flux 7.10 ships over 50 built-in plugins (actions/triggers) which covers most of the common customer requirements (ex: TimerTrigger, ProcessAction, MailAction, Split, Join, etc and a slew of File based triggers and actions). Writing your own is a child’s play as Flux uses plain-old JavaBean framework to build custom actions/triggers that can be used seamlessly in your workflows. Some of our customers write their own based on their requirements and sometimes we quickly develop one for them. You just need a Java API (Ex: RabbitMQ integration) or an HTTP interface (Ex: Twitter integration) to build one yourself.

Custom plugins make it lot easier to manage your workflow integration points with external systems and enjoy load of Flux benefits like scheduling, file transfer, error handling, variable substitution, runtime data mapping, job dependencies, and the most important real-time monitoring from within a web browser.

Developing applications around Twitter is always fun with their well documented REST APIs. Twitter notification plugin supports OAuth for authentication when accessing protected resources and does all the magic behind the scenes without any user intervention for getting OAuth access token and invoking the protected resource.

The next thing I developed was a messaging plugin based on RabbitMQ. RabbitMQ is a popular high-performant AMQP implementation built using erlang. Using RabbitMQ plugin, users can publish and consume messages from its queue and it supports variety of topologies (fanout, direct, etc). RabbitMQ provides a decent Java API to interface with the broker and integrating it with Flux is no brainer. Now comes the interesting part, I need to show something so I can keep my long story short 🙂

Here is a simple Flux web designer screenshot of Twitter plugin configured to send a direct message to a user:

Here is a simple Flux web designer screenshot of RabbitMQ plugin configured to publish message to a queue:.

Here is a simple Flux web designer screenshot of Twitter plugin configured to consumes messages from a queue:

The next thing you may want to do is to try out yourself. Maven users should include these dependencies in your POM.

<dependency>
  <groupId>flux</groupId>
  <artifactId>flux</artifactId>
  <version>7.10.1</version>
</dependency>
<dependency>
  <groupId>flux</groupId>
  <artifactId>rabbitmq-action</artifactId>
  <version>1.0.0</version>
</dependency>
<dependency>
  <groupId>flux</groupId>
  <artifactId>rabbitmq-trigger</artifactId>
  <version>1.0.0</version>
</dependency>
<dependency>
  <groupId>flux</groupId>
  <artifactId>twitter-action</artifactId>
  <version>1.0.0</version>
</dependency>

And, add the following repository to your repositories tag in your POM.

<repository>
  <id>maven2-repository.dev.java.net</id>
  <name>Java.net Repository for Maven</name>
  <url>http://download.java.net/maven/2/</url>
  <layout>default</layout>
</repository>

Non-maven users can download it from the project website. If you wish to use the Java APIs, here are some examples to get started. Complete examples are available in the source repository.

Publishing an AMQP message to a RabbitMQ channel would look something like this using Java API:

FlowChart flowChart = helper.makeFlowChart("RabbitMQ Action Example");
RabbitMQActionFactory customActionFactory = (RabbitMQActionFactory) flowChart.makeFactory("RabbitMQActionFactory");
RabbitMQAction rabbitMQAction = customActionFactory.makeRabbitMQAction("Publish AMQP Message");
rabbitMQAction.setHost("localhost");
rabbitMQAction.setUsername("guest");
rabbitMQAction.setPassword("guest");
rabbitMQAction.setExchangeName("nasdaq.exchange");
rabbitMQAction.setExchangeType("direct");
rabbitMQAction.setQueueName("nasdaq.request");
rabbitMQAction.setQueueType("shared");
rabbitMQAction.setRoutingKey("nasdaq.request");
rabbitMQAction.setMessage("BUY GOOG 200");

//Schedule this job
String name = engine.put(flowChart);

Consuming a message using RabbitMQTrigger which monitors for messages in the RabbitMQ queue would look something like this using Java API:

FlowChart flowChart = helper.makeFlowChart("RabbitMQ Trigger Example");
RabbitMQTriggerFactory customTriggerFactory = (RabbitMQTriggerFactory) flowChart.makeFactory("RabbitMQTriggerFactory");
RabbitMQTrigger rabbitMQTrigger = customTriggerFactory.makeRabbitMQTrigger("Consume AMQP Message");
rabbitMQTrigger.setHost("localhost");
rabbitMQTrigger.setUsername("guest");
rabbitMQTrigger.setPassword("guest");
rabbitMQTrigger.setQueueName("nasdaq.request");

JavaAction messageProcessor = flowChart.makeJavaAction("Message Processor");
messageProcessor.setListener(MessageProcessor.class);

rabbitMQTrigger.addFlow(messageProcessor);

//loop back required for polling
messageProcessor.addFlow(rabbitMQTrigger);

//Schedule this job
String name = engine.put(flowChart);

Twitter plugin allows to access OAuth protected resources. For example, retrieving twitter mentions would look something like this using Java API:

FlowChart flowChart = helper.makeFlowChart("Twitter Action Example");
TwitterActionFactory customActionFactory = (TwitterActionFactory) flowChart.makeFactory("TwitterActionFactory");
TwitterAction twitterMentions = customActionFactory.makeTwitterAction("Twitter Action");
twitterMentions.setConsumerKey(CONSUMER_KEY);
twitterMentions.setConsumerSecret(CONSUMER_SECRET);
twitterMentions.setOAuthEnabled(true);
twitterMentions.setRequestMethod(RestActionType.GET);
twitterMentions.setResourceUrl("http://api.twitter.com/1/statuses/mentions.json");
twitterMentions.setUsername(TWITTER_USER);
twitterMentions.setPassword(TWITTER_PASS);

//Schedule this job
engine.put(flowChart);

For example, sending a direct message would look something like this using Java API:

TwitterAction directMessage = customActionFactory.makeTwitterAction("Twitter DM");
directMessage.setConsumerKey(CONSUMER_KEY);
directMessage.setConsumerSecret(CONSUMER_SECRET);
directMessage.setOAuthEnabled(true);
directMessage.setRequestMethod(RestActionType.POST);
directMessage.setResourceUrl("http://api.twitter.com/1/direct_messages/new.xml");
directMessage.setUsername(TWITTER_USER);

//add the required parameters
Properties properties = new Properties();
properties.put("screen_name", "fluxarul");
properties.put("user_id", "97133917");
properties.put("text", "Hello from Flux Twitter Action!");

directMessage.setParameters(properties);
directMessage.setPassword(TWITTER_PASS);

//Schedule this job
engine.put(flowChart);

The Flux plugins project is hosted at bitbucket.org. Feel free to check it out, shout should you have any feedback or log an issue for bugs/improvements or support for additional plugins.

Possibly Related Posts:


In Flux, we have made improvements in recent versions to handle database or network failures and recover from such failures gracefully without needing to restart Flux. Sometimes, it would be nice to notify administrators about these failures and they can act upon them in case of an unscheduled outage. Flux is dependent on the database for maintaining job states and schedules, so fixing this problem is critical.

BoneCP, a popular JDBC Connection Pool has an interesting and useful feature which allows the connection pool to automatically recover from such DB outages and it replays the transactions when a healthy connection becomes available. This feature became available since its 0.6.5 release and I had tested this before and found this to be very useful when configured with Flux.

In BoneCP, we can implement a connection hook which gets triggered when database failures occur and it is easy to configure in Flux to use BoneCP as the datasource pool provider. In case of any database failure, you may want to send an email notification or initiate a SNMP trap so other downstream systems can be handled accordingly.

Here is a basic DatabaseShutdownHook would look like:

import com.jolbox.bonecp.ConnectionHandle;
import com.jolbox.bonecp.hooks.AbstractConnectionHook;
import com.jolbox.bonecp.hooks.AcquireFailConfig;

import java.util.Date;

public class DatabaseShutdownHook extends AbstractConnectionHook {
    @Override
    public boolean onConnectionException(ConnectionHandle connection, String state, Throwable t) {
        // handle notifications here: SNMP or SMTP
        System.out.println("Database down at " + new Date());
        return super.onConnectionException(connection, state, t);
    }

    @Override
    public boolean onAcquireFail(Throwable t, AcquireFailConfig acquireConfig) {
        // handle notifications here: SNMP or SMTP
        System.out.println("Failure to acquire connection at " + new Date() + ". Retry attempts remaining : " + acquireConfig.getAcquireRetryAttempts());
        return super.onAcquireFail(t, acquireConfig);
    }

}

Let us now see how to configure BoneCP as a Data source in Flux.

import com.jolbox.bonecp.BoneCPDataSource;
import flux.Configuration;
import flux.DatabaseType;
import flux.Engine;
import flux.Factory;

import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.sql.Connection;
import java.sql.SQLException;

public class FluxEngine {

    private static Context initialContext;

    static {
        try {
            System.setProperty(Context.INITIAL_CONTEXT_FACTORY, "org.apache.naming.java.javaURLContextFactory");
            System.setProperty(Context.URL_PKG_PREFIXES, "org.apache.naming");
            initialContext = new InitialContext();
            initializeDataSource();
        } catch (NamingException e) {
            // log exception
        } catch (SQLException e) {
            // log exception
        }
    }

    public static void initializeDataSource()  throws NamingException, SQLException {
        BoneCPDataSource ds = new BoneCPDataSource();
        ds.setJdbcUrl("jdbc:mysql://localhost:3306/flux710?relaxAutoCommit=true");
        ds.setUsername("flux");
        ds.setPassword("secret");
        ds.setMinConnectionsPerPartition(10);
        ds.setMaxConnectionsPerPartition(50);
        ds.setPartitionCount(1);
        ds.setConnectionHook(new DatabaseShutdownHook());// Required only if you need notifications.
        ds.setTransactionRecoveryEnabled(true);// Important: This should be enabled
        ds.setAcquireRetryAttempts(10);//default is 5
        ds.setAcquireRetryDelay(10000);// default is 7 secs
        ds.setReleaseHelperThreads(5);

        Connection con = ds.getConnection();
        if (con != null) {
            initialContext.rebind("FluxDataSource", ds);
            con.close();
        }
        System.out.println("DataSource configured.");
    }

    public static void main(String[] args) throws Exception {
        Factory f = Factory.makeInstance();
        Configuration c = f.makeConfiguration();
        c.setDatabaseType(DatabaseType.MYSQL);
        c.setDataSource("FluxDataSource");
        Engine engine = f.makeEngine(c);
        engine.start();
        System.out.println("Engine started.");
    }
}

Data sources should be made available via JNDI for Flux to use them. So, in this example I used Tomcat JNDI support to expose the BoneCP datasource via JNDI. There are also other ways to expose this via JNDI when running Flux as a standalone server. Tomcat JNDI was easy to configure though as you can see from the code above. The BoneCP data source should be configured for transaction recovery and you can set the number of recovery attempts and retry delay.

I have created a simple flow chart which has a Timer Trigger followed by a Java Action. The timer is configured to fire every 15 seconds for 5 times as shown below.

process_data.png

Here is a sample output of running this job in Flux configured with BoneCP and MySQL database. I shutdown MySQL server when this job was running and you can see the BoneCP recovery attempts and after a while I brought back the MySQL server and BoneCP successfully recovered. Now, you can see Flux execute the last occurrence of this job successfully.

DataSource configured.
Engine started.
Done processing data

Done processing data

Done processing data

Done processing data

Database down at Sun Oct 10 12:18:12 MDT 2010
Oct 10, 2010 12:18:12 PM com.jolbox.bonecp.ConnectionHandle markPossiblyBroken
SEVERE: Database access problem. Killing off all remaining connections in the connection pool. SQL State = 08007
Oct 10, 2010 12:18:12 PM com.jolbox.bonecp.MemorizeTransactionProxy invoke
SEVERE: Connection failed. Attempting to recover transaction on Thread #70
Oct 10, 2010 12:18:14 PM com.jolbox.bonecp.hooks.AbstractConnectionHook onAcquireFail
SEVERE: Failed to acquire connection Sleeping for 10000ms and trying again. Attempts left: 10. Exception: java.net.ConnectException: Connection refused: connect
Failure to acquire connection at Sun Oct 10 12:18:14 MDT 2010. Retry attempts remaining : 10
Oct 10, 2010 12:18:26 PM com.jolbox.bonecp.hooks.AbstractConnectionHook onAcquireFail
SEVERE: Failed to acquire connection Sleeping for 10000ms and trying again. Attempts left: 9. Exception: java.net.ConnectException: Connection refused: connect
Failure to acquire connection at Sun Oct 10 12:18:26 MDT 2010. Retry attempts remaining : 9
Oct 10, 2010 12:18:38 PM com.jolbox.bonecp.hooks.AbstractConnectionHook onAcquireFail
SEVERE: Failed to acquire connection Sleeping for 10000ms and trying again. Attempts left: 8. Exception: java.net.ConnectException: Connection refused: connect
Failure to acquire connection at Sun Oct 10 12:18:38 MDT 2010. Retry attempts remaining : 8
Oct 10, 2010 12:18:48 PM com.jolbox.bonecp.MemorizeTransactionProxy invoke
SEVERE: Recovery succeeded on Thread #70
Done processing data

The following BoneCP dependencies are required in the classpath:
bonecp-0.7.0.jar
guava-r07.jar
slf4j-api-1.6.1.jar
slf4j-jdk14-1.6.1.jar (Note: You can use any logger bindings supported by slf4j. Using JDK logger for simplicity and one less jar.)

To configure Tomcat JNDI, you need to have these jars in classpath:
catalina.jar
tomcat-juli.jar

Let me know if you have any trouble setting up this in Flux.

(Update: 10/13): Updated to reference the latest stable 0.7.0 release which now uses Google guava instead of the retired Google collections library and fixed slf4j dependency requirement.

Possibly Related Posts:


A video is worth 10000 words! Download today and experience it yourself.

opsconsole

Enjoy!

Possibly Related Posts: