Datasports on Software Development

Articles and updates from Datasports about the craft of software

Datasports DirectoryWatcher Adapter

with one comment

1. Overview

NOTE: This article documents a component that is available via the StreamBase Component Exchange (SBX).

The Datasports DirectoryWatcher adapter is a general-purpose solution to perform a common task – watch a directory structure and raise events (in the form of StreamBase tuples) when changes occur in that directory structure. These changes include creation, update, deletion, and re-naming of files and directories.

The adapter makes use of the JNotify library, a cross-platform Java library released under the GNU Library or Lesser General Public License (LGPL) which uses a set of native libs to hook into the OS on Windows, Linux, and Mac OS.

This article will document the creation and usage of the adapter. The source for the adapter and a test application which uses it is available from the StreamBase Component Exchange. To load it, choose File | StreamBase Component Exchange…, and type DirectoryWatcher in the “Find in Title” field. Check the checkbox beside the title of the project, and choose Finish. This will import the project with all source into your current StreamBase Studio Workspace.

Importing the DirectoryWatcher project

Importing the DirectoryWatcher project

2. Notes on Creating an Adapter

The Adapter wizard provided by StreamBase is very helpful, but it does generate some misleading and unnecessary code, as well as leaving out some helpful functionality or explanation. This section will discuss some tips on how to write an adapter and work around some of the oddities of the wizard-generated code.

2.1 Ports and Schemas

To add an adapter to a StreamBase project, we use the wizard by choosing File | New | StreamBase Embedded Adapter. From there, we can specify some high-level information about the adapter, including the type, Java package, name, port count, and properties:

The wizard assumes that an input adapter will not have any input ports, and similarly that an output adapter will not have any output ports. There are good reasons why you might want an input port on an input adapter (such as for run-time control over the adapter’s behavior), or an output port on an output adapter (such as for logging, publishing status of the outbound connection, etc.), and to accomplish either you need to write some code of your own. This section will discuss adding an optional input port to an input adapter, and validating the schema of that input port.

Also, the wizard creates a property of type Schema, named schema0, and does not give you a means to specify what the schema of your output port will be. This section will also discuss how to correct this in your code.

2.1.1 Input Port and Schema Validation

A good practice is to use named constants for the number and identities of the input and output ports, as well as for the fields in your input and output schemas. In the file, we find code such as the following:

private static final int INPUT_PORT_COUNT = 1;
private static final int OUTPUT_PORT_COUNT = 1;

private static final int ADAPTER_CONTROL_PORT = 0;
private Schema adapterControlSchema;
private static class ADAPTER_CONTROL_SCHEMA 
	public static String Enabled = "Enabled";
	public static String WatchSubDirs = "WatchSubDirs";
	public static String DirectoryPath = "DirectoryPath";
	public static String WatchEvents = "WatchEvents";

These constants and the adapterControlSchema member variable will be used to specify the input schema in code.

The wizard-generated code will add a call to setPortHints() in the constructor, and a related call to requireInputPortCount() in the typecheck() method. In both cases, the port counts are hard-coded to values entered or assumed in the wizard. In the Datasports DirectoryWatcher class, we see that the code has been changed to use our named constants, as well as to specify the input and output schemas, to validate the input schema, and apply conditional logic based on the Use Control Port property.

The Datasports DirectoryWatcher adapter can be configured through its Use Control Port property to have 0 or 1 input ports, and the way that this works is clear from the code. Since the default value of the Use Control Port property (bound to the _controlPort member variable) is false, the constructor calls setPortHints() specifying 0 as the number of input ports. The logic around adding and removing the input port occurs in 2 other places.

In typecheck(), we specify the number of input ports in the call to requireInputPortCount() based on the value of _controlPort, and only call SchemaValidator.validateSchema() in the case where the adapter is configured to actually have an input port. This is a helper function I wrote and included with this sample – the source is straightforward, have a look at it and ask any questions in the comments section below. If an input is connected without the specified fields, the application will show a formatted typecheck error that includes helpful information to address the problem.

Thirdly, we implement getPortCounts() in the DirectoryWatcher class. This method is called by StreamBase Studio to determine how many input and output ports to render for the adapter on the canvas.

The last change that must be applied is the addition of a processTuple() method, which has the following signature:

public void processTuple(int inputPort, Tuple input) throws StreamBaseException

This method is the entry point for tuples delivered to your component via an incoming stream. All logic related to processing tuples arriving on an input port happens in this method.

2.1.2 Output Port and Schema Specification

The adapter as generated by the wizard includes a property of type Schema, named schema0. The inclusion of this property results in the inclusion of a schema specification UI on the StreamBase Properties tab for the adapter, and it is this schema which is used in code to specify the output schema of the adapter. This is problematic since it would require every user of the adapter to know the output schema and enter it correctly on the adapter instance’s property page.

To correct this, we first remove references to the schema0 property – they are found in the class’ member variables, typecheck(), getSchema0() and setSchema0() methods, and in the adapter’s BeanInfo class. The wizard generates 2 classes – the adapter class, plus a BeanInfo class, which is named by appending BeanInfo to the name. So if we specify TestAdapter as the adapter name, we will have and

The properties specified in the BeanInfo class are mapped to public get and set methods via reflection. The details of how this works at run time is not important, but looking at the code generated by the wizard, and at the supplied Datasports DirectoryWatcher sample shows that the property names found in the BeanInfo class correspond to get and set methods in the adapter class.

To specify an output schema, we create an instance of a Schema object in the adapter class’ constructor, and use it in the call to setOutputSchema() in the typecheck() method. This ensures that all downstream components will pick up the correct schema definition for instances of the adapter.

2.2 Properties

As noted above in Section 2.1.2 Output Port and Schema Specification, properties are specified and implemented via as many as 4 elements: a member variable in the adapter class, a get/set method pair also in the adapter class, and an entry in the BeanInfo class’ PropertyDescriptor array. In addition, some properties will have enablement logic stubs provided as well, a public boolean method with the property name prefixed with shouldEnable. Experiment with adding properties via the wizard, and adding and removing properties through code.

NOTE: Because the property binding works via reflection, sometimes changes made in code will not show up correctly in the StreamBase Properties tab for an adapter instance unless you do a full rebuild, via Project | Clean…, followed by a manual build via Project | Build Project. In fact, in some instances, it may be necessary to close and re-open StreamBase Studio to force all changes to take effect. If you have made changes that affect Properties or the number of input or output ports and are seeing unexpected behavior related to those changes, re-start Studio as part of your troubleshooting process.

There are some other items to keep in mind when working with adapter or operator properties in StreamBase.

2.2.1 Setting Defaults

It’s not clear how to specify a blank string in the wizard to use as the default value of a property. Once the wizard has completed, have a look in the adapter class’ constructor to see what literal or constant values are used to initialize properties to default values.

2.2.2 “Expression on port” Properties

There is an option to associate a property with an expression on a specified port. I haven’t been able to figure out what this means or how to make it work – it might be useful but it’s not clear how. It’s possible that proper use of this functionality would be better behavior than the control port implemented in the Datasports DirectoryWatcher adapter.

NOTE: If you do check the “Expression on port” checkbox in the “New Adapter Property” dialog, it can have some non-obvious side effects. Specifying this when adding a property means that the BeanInfo class will use the SBExpressionPropertyDescriptor type rather than SBPropertyDescriptor, and SBExpressionPropertyDescriptor indicates a property that is set before each call to processTuple(), and not before. So if your adapter needs to do any processing that depends on the proper initialization of its properties before it is guaranteed to receive a tuple on an input port, then you may not use the “Expression on port” setting for those properties.

2.2.3 Enablement Logic

In some cases, you may have a property that is only valid when another property has a particular value. For example, an adapter could have an IncludeTimestamp property, and an optional TimestampFormat property. The TimestampFormat property is only meaningful if IncludeTimestamp is set to TRUE. Enablement logic allows you to implement the code to enable and disable properties in the StreamBase Studio UI. In addition to the get/set methods, the wizard generates a shouldEnable method for each property, and this method is also bound to the property via reflection. If you don’t need custom enablement logic, just delete the method.

2.2.4 Custom Property Widget

For some properties, it may make sense to use a richer or custom UI for setting it. When adding properties via the wizard, there is an option to specify a custom UI widget. This option is beyond the scope of this article.

2.3 Main Loop

An input adapter can (and generally should) run in its own thread. There is a setting for the threading model on the second page of the wizard called “Use a background thread”, and the default value is true.

That thread is started in the StreamBase container when the adapter is instantiated, and it enters the adapter instance via the run() method. The thread terminates (and the adapter stops all processing) when the run() method returns. While running, the adapter is responsible for periodically calling shouldRun() to indicate that it remains alive, and to learn when it should stop.

The wizard generates an adapter with a run() method that does not make either of these things clear. The generated run() implementation simply calls fillAndOutputTuple(), which will run until interrupted, sending a tuple out every 250ms. The run() implementation of DirectoryWatcher perhaps makes the correct logic more clear:

public void run() {
	if (shouldRun())
	try {
		while (shouldRun())
	} catch (InterruptedException e) {
		logger.debug("sleep interrupted, shutting down.");

Code to generate and send tuples should be called in response to the receipt of messages or events that the adapter is processing (JNotify callbacks in the case of the Datasports DirectoryWatcher adapter).

3. JNotify Library

NOTE: if you choose to use the Datasports DirectoryWatcher adapter in any commercial product, it is your responsibility to ensure that you are not violating the licensing terms for JNotify (found in the file jnotify-license.txt).

The bulk of the work in the Datasports DirectoryWatcher adapter is done with the JNotify library. Documentation for this library can be found at:

Because the OS-level operations to monitor a folder in the file system are not possible through standard Java, JNotify uses a set of native libraries to work on Windows (32 and 64 bit), Linux, and Mac OS. Adding a Java library with native dependencies is not straightforward for those of us not from a Java background, or unfamiliar with Eclipse (on which StreamBase Studio is based).

To add the reference, we go through 3 steps:

  1. Import the folder containing the native libraries into the project by choosing File | Import…, and selecting File System, and navigating to the folder containing the native libraries. This will create a copy of the folder and its selected contents under the root folder of your project.
  2. Add the JAR file by choosing Project | Properties | Java Build Path | Libraries | Add External JARs…, and select the JAR file for the library to add. This will create a copy of the JAR in the lib folder, and will add a reference to it.
  3. Still under the Libraries view, expand the entry for the added JAR, and select “Native library location”, and then press the “Edit…” button. In the dialog that appears, press the “Workspace” button and navigate to the folder you imported into your project in Step 2.

3.1 JNotifyListener Implementation

The key to receiving notifications from JNotify is the implementation of the JNotifyListener interface. Looking at the definition of the DirectoryWatcher adapter class, we see JNotifyListener in the implements clause:

public class DirectoryWatcher extends InputAdapter
implements Parameterizable, Runnable, JNotifyListener {

This interface consists of 4 methods: fileCreated(), fileDeleted(), fileModified(), and fileRenamed(). The implementations of those methods handle the events from JNotify and repackage them as tuples to be sent out via the output port, using the private sendEvent() method.

4. Control Interface

As noted above, the Datasports DirectoryWatcher adapter has an optional input port. This port is enabled or disabled via the Use Control Port property.

This input port’s schema represents a control interface used to change or configure the adapter’s behavior at run-time, as well as to enable and disable it. The input tuple includes fields which correspond to the adapter’s properties (except Use Control Port), and specifying a value for a field updates that property. Any field may be left null. After processing all non-null fields from the input tuple, the changes are applied and the JNotify watch is updated.

Using this pattern provides a clear mechanism for a configuration management subsystem to retrieve adapter and operator configuration values from some source of configuration information (CSV file, XML file, DB, webservice, etc.), as well as for an administrative or control console UI to make run-time changes to a system’s operation. Where such a mechanism is not required, set Use Control Port to false and set all properties in StreamBase Studio at design time.

NOTE: if you set Use Control Port to false, then you must set Enabled to true, otherwise there will be no way for the adapter to ever start watching for updates. If Use Control Port is true, then the adapter can be started and stopped at run time via tuples on the input port.

The root of all tuple processing occurs in the processTuple() method. Any adapter or operator which accepts tuples must implement this method. In the case of the Datasports DirectoryWatcher, processTuple() has a simple implementation – the tuple is parsed for property values, which are applied if they are found.

5. Usage

The Datasports DirectoryWatcher adapter has 5 properties:

  1. (boolean) Use Control Port: defaults to false. Set this property to true to enable the input port used to control the adapter’s operation at run-time.
  2. (boolean) Enabled: defaults to true. Set this property to true to enable the adapter so that tuples will be sent when the watched directory is changed.
  3. (boolean) Watch Sub Directories: defaults to true. Specifies whether or not to trigger events in the sub directories below the root directory path. Note that on some OS’s, changes in a sub-folder are notified as an UPDATED event on the containing folder.
  4. (String) Directory Path: no default value. The root directory to watch.
  5. (String) Watch Events: defaults to all events. A comma-separated list of events to which to subscribe. Set this field to blank to subscribe to all events, or pick a subset from:
    1. CREATED
    2. UPDATED
    3. DELETED
    4. RENAMED

Note that different applications and OS’s will result in a different subset of events raised in response to what may appear to be one operation. For example, under Windows, creating a new file under a sub directory via the New | Text File operation in the Explorer right-click context menu results in the following:

  1. CREATED event for the file
  2. UPDATED event for the containing folder
  3. RENAMED event for the file when the user enters a new name in Explorer and presses enter
  4. UPDATED event for the containing folder
  5. UPDATED event for the renamed file

To use this adapter properly, some experimentation should be done on all relevant platforms to see the pattern of events that will be raised in response to the conditions of interest. In general, careful setting of the Watch Events property will be required, and so will some downstream business logic to filter or conflate the events.

The tuples sent out from the Datasports DirectoryWatcher adapter contain the following fields:

  1. (String) EventID: string literal corresponding to the event that occurred. Valid values are the same as for the Watch Events property discussed above.
  2. (String) DirectoryPath: same as the Directory Path property. Guaranteed to end with a separator character. This represents the root directory being watched.
  3. (String) FileName: The name of the file or folder associated with the event. It is safe to append this value to the value in DirectoryPath to yeild the complete path to the affected file.
  4. (String) OldFileName: The old name of a renamed file or folder. This will be blank for any event other than RENAMED.

6. Conclusion

In this article, we discussed how to create an input adapter using a combination of the adapter wizard’s user interface, generated code, modified code, a utility class, and external JAR with a dependency on a native library. The sample covers processing inbound tuples, and generating outbound tuples in response to file system events as reported by JNotify.

It is my hope that this article is somehow interesting or relevant to members of the StreamBase development community, and that it (and future articles) will spark some discussion around best practices and design and implementation of StreamBase solutions.

You are free to use the Datasports DirectoryWatcher adapter in whole or in part however you choose, but please ensure that you respect the terms of the JNotify license (found in the file jnotify-license.txt). Also, please take some time to post your comments, questions, or suggestions on this blog.

Phil Martin
Datasports Inc.


Written by datasports

Sep 27, 2010 at 5:06 PM

Posted in Adapters, SBX, Tutorials

One Response

Subscribe to comments with RSS.

  1. […] of what’s involved in writing an adapter, you can also have a look at my tutorials for the Datasports DirectoryWatcher Adapter and the Datasports WebServiceListener Adapter. NOTE: Do not turn your evaluation of the […]

Leave a Reply

Fill in your details below or click an icon to log in: Logo

You are commenting using your account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: