Datasports on Software Development

Articles and updates from Datasports about the craft of software

Thoughts on Encapsulation Schemes, Part 2

with 3 comments

1. Introduction

As discussed in the Part 1 of this article, StreamBase provides a variety of hosting mechanisms for .sbapp modules. Each of these mechanisms has its own set of options related to synchronicity and concurrency, which will be the focus of this article.

A big part of the win with the visual editing of StreamBase EventFlow logic is the ease with which threading models can be specified, modified, and tuned. In general, little more is required than to select a component or module and specify some concurrency parameters. Moving logic into or out of different processing threads is as simple as moving logic into and out of modules. In my experience, this is a huge productivity gain over traditional programming environments such as .NET or J2EE. Developers can begin by getting logic correct, then make targeted changes to tune the concurrency/threading settings to maximize performance where needed.

Of course, with this great power comes great responsibility. The ease of changing these settings means that no application is more than a few checkboxes away from bad performance due to thread-thrashing, or logic errors due to race conditions. This article will survey the different options and present some guidelines, but it will always be the responsibility of the application developer to gather the metrics, analyze the logic, and provide the insight necessary to ensure correct and high-performance behavior. Those analytical skills are not so different from what would be applied in other environments, but the big win with StreamBase is that the plumbing is all taken care of for you.

2. Concurrency Settings

This section covers the variety of synchronicity and concurrency options available for different module hosting mechanisms. Some suggestions are made about how and when to use them, and what to look out for.

2.1 Synchronous

A cross-container reference can be marked synchronous, and similarly a reference to a contained module will be synchronous if the module is not set to run in its own thread.

NOTE: This synchronous behavior applies to the delivery of the tuple to its target only. There is no way to guarantee that all downstream processing of the tuple will happen synchronously, as the receiving module may implicitly or explicitly transfer control to other threads, or trigger the processing of a tuple from a non-obvious queue.

By careful analysis of the logic of the receiving module, it can be determined how much of the processing of the received tuple will be done in the context of the originating thread.

2.2 Asynchronous

By default, cross-container connections are asynchronous, and likewise anytime a tuple crosses a boundary into a component or module running in its own thread (or threads), delivery and downstream processing will be asynchronous. For asynchronously delivered tuples, processing in the originating thread will continue as soon as the tuple has been successfully enqueued for delivery to the target thread.

2.3 Multi-Threaded

The options for enabling multi-threading in Module References and Extension Points are similar but slightly different in their presentation and meaning.

By selecting “Run this component in its own thread” on the Concurrency tab, a Module Reference or Extension Point becomes multi-threaded. The logic within the contained module or modules will process tuples asynchronously from the parent module’s processing.

For a Module Reference, there is also a setting for Multiplicity, which is set independently of the “run in own thread” setting. If the Multiplicity value is set to a value greater than 1, and multi-threading is enabled, then there will be a thread created for every instance of the module (as reflected in the fact that the checkbox text for multi-threading changes to “Run each instance of this component in its own thread”).

For an Extension Point, the meaning of this setting is less clear. The multi-threading checkbox on the Concurrency tab only says “Run this component in its own thread”, but if you have multiple module instances hosted within the Extension Point, then each instance will get its own thread.

The following tuple delivery settings apply to Extension Points and Module References, as indicated below.

2.3.1 Broadast (Extension Point and Module Reference)

This setting is just what it sounds like: any tuple on the given input stream will be delivered to every module instance. The behavior is the same for Extension Points and for Module References.

This setting should be used sparingly. Except for very specific types of operations, if you have multiple module instances receiving every tuple as Broadcast, then this is probably a sign that the multiple instances are duplicating work and making inefficient use of system resources. A good use case for this would be to update the module instances on changes in system state or relevant configuration items that affect all running module instances.

2.3.2 Round Robin (Module Reference only)

Successive tuples will be routed to the module instances in order. It is not exposed anywhere, but each instance has an instance ID number, starting from 0. So if the Multiplicity for a Module Reference is set to 4, then the first tuple on an input stream set to use Round Robin dispatch will go to instance 0. The second will go to instance 1, then 2, 3, and 0 for the following tuples. The Nth tuple will go to instance (N-1) modulo M, where M is the number of module instances.

Use this dispatch style if the processing is stateless (i.e. it depends only on the information contained within the tuple), or if the relevant state information is guaranteed to be available to all module instances (such as via a shared placeholder table, or by instance-private data that is kept in sync via Broadast updates).

NOTE: Stateless processing is the ideal use case for this dispatch style, since the requirements to share or synchronize state across the multiple instances could lead to resource contention or duplicated effort. In general, this would be used for computationally-intensive operations, since file or network I/O operations would often lead to resource contention as well.

For stateless, computationally-intensive operations, Round Robin dispatching will generally give very even load balancing.

2.3.3 Numeric (Module Reference only)

As noted above, each module instance has an instance ID number, starting from 0. For Numeric dispatching, a Dispatch Expression that resolves to an integer value must be specified. This may make use of the built-in functions (such as hash()), or it could be calculated or looked up in logic before hitting the Module Reference. For a Numeric dispatch value of N, going to a Module Reference with M instances, the tuple will be delivered to instance N modulo N.

NOTE: A common example for Numeric dispatch is to partition some processing based on symbol, using hash(Symbol) as the Dispatch Expression. This will ensure that all tuples for a given symbol will hit the same module instance, and it will ensure that the symbols will be distributed evenly across those instances. However, it will not ensure that the load will be balanced evenly, since a small number of the most liquid symbols often account for a large percentage of activity.

When partitioning by symbol, it is strongly recommended to calculate the Dispatch Expression using some mechanism other than hash(Symbol) which will tend to balance the load (such as a lookup or a calculation from symbol ranges).

While all instances in a given Module Reference must have the same values for their Module Parameters, using Numeric dispatch on an interface used for setting instance-specific values (stored in Query Tables or Dynamic Variables), it is possible to set particular values for specific instances. This can be a very useful pattern, so we will work through a simple example.

Setting Params Example

Setting Params Example (Click for full size)

TriggerInit is a Once Input adapter which causes the module instance configs to be inited from the values specified in SetConfigParams. We will name the 4 instances ONE, TWO, THREE, and CLYDE. The application Child.sbapp has 2 input streams, both with dispatch set to Numeric, as shown here:

Concurrency settings for child module

Concurrency settings for child module (click for full size)

The parameters passed in are each sent to a specific instance via the InstanceID field, and we can see the results in the OutParams stream’s output. The requests for processing are dispatched based on a hash of the field Key. From the OutResults stream’s output, we can see that Z is set to the sum of X and Y, and that the name of the instance which did the processing is included in the result. We also see that for a given value of Key, the request will always be dispatched to the same instance (as evidenced by the InstanceName value on the output).

The implementation of Child.sbapp is shown here:

Implementation of Child.sbapp

Implementation of Child.sbapp

NOTE: This can be a very useful pattern, but it must only be used with care. In general, where you want to have different parameter values for individual module instances, you are better off to use an Extension Point, which provides a clear mechanism for this through Module Parameters.

2.3.4 Name (Extension Point only)

When you add an Extension Point to your application, each module instance must be assigned a string literal name.

NOTE: This name is a string literal. It can not use expressions or reference global or module parameter values. Even so, it is a good practice to define constants with corresponding values and use them whenever you need to specify an instance name outside of the Extension Point configuration.

The Name Dispatch Style is very similar to Numeric, with the following exceptions:

  1. The Dispatch Expression must resolve to a string literal, not an integer value.
  2. The Dispatch Expression must match a specific instance name, otherwise an exception will be thrown indicating the failure to dispatch the tuple.

This places additional burden on the containing application to include some lookup or calculation logic to choose the specific module instance to process the tuple. It also generally means that the instance name will be a field in the schema for streams that are to be dispatched by instance name.

NOTE: Using Extension Points plus the Name Dispatch Style is a very flexible and powerful approach. Use this where different module instances have different behaviors or configurations, and logic outside the module can determine the desired processing behavior based on some combination of current configuration and the contents of the module.

A good practice is to encapsulate that routing logic (whose end result is to decorate the tuple with the name of the target instance) into its own module.

3. Concurrency Surprises

Sometimes new threads or changes in execution order can arise in surprising ways. This section will give some guidance on how to recognize the potential for these surprises and how to handle them accordingly.

3.1 Adapters

Many input adapters will run in a separate thread, or start their own worker thread(s) to communicate with the external systems. It is up to the implementation of the adapter how control is passed between the worker thread(s) and the containing module’s main execution thread(s).

NOTE: Adapters or Operators which create their own threads will not generally have the circle highlight drawn on the EventFlow canvas which we associate with components running in their own threads. Just because a component is not drawn with the circle highlight does not mean that all of its operation is synchronous.

Probably the most common example of this is the CSV File Reader Input Adapter. Consider the following logic:

CSV File Reader Example

CSV File Reader Example (Click for full size)

ReadIndex reads a catalog of items (PK is ItemID) and writes them into the qtIndex Query Table. ReadOrders reads a list of orders (each containing an ItemID and a Quantity). Each order is decorated with the ItemName value pulled out of qtIndex based on the ItemID, then sent out the OutOrders stream.

The Split operator ensures that ReadIndex is started before ReadOrders. However, because of details of the CSV File Reader’s implementation, the actual reading of the file and the emission of the tuples is done in another thread. This means that ReadIndex is not guaranteed to have finished reading its file when ReadOrders starts reading. This results in a race condition where items referenced in orders may not appear in qtIndex at the time that GetItemName is trying to look them up.

NOTE: When you use any adapter or operator, especially one that does some I/O, read the documentation carefully to determine how best to synchronize the behavior. A good guideline is that any Adapter that has an optional event port (like the CSV File Reader) probably does its work asynchronously.

Here is how the correct logic looks:

Correct Synchronizing of CSV Reading

Correct Synchronizing of CSV Reading (Click for full size)

Here we have the optional event port enabled, and we use a Filter to ensure that we only proceed once the Index file has been completely processed.

3.2 Cycles

NOTE: Many thanks to Steve Barber for his comments to this article pointing out that my understanding of execution order for cycles was incomplete. This section has been revised as a result of his clarification.

The definitive source of information is: StreamBase Execution Order and Concurrency. I strongly suggest you read that article and experiment with this type of logic yourself.

Another area to watch for concurrency surprises is cycles in your EventFlow logic. Consider the following simple application:

Cycle in EventFlow Logic (Click for full size)

Note that this logic does not typecheck, as there is a circular dependency. The output schema from StartLoop isn’t known because one of the inputs to StartLoop isn’t known. StreamBase Studio provides a very handy mechanism to resolve this, the link “Use the schema that StreamBase Studio detected…” will explicitly set the schema of that arc from IsCountZero to StartLoop. Click that link, and presto! You’ll have something that looks like this:

Resolved and Typechecked Cycle

Resolved and Typechecked Cycle (Click for full size)

The arc is highlighted in blue to indicate that it has an explicitly-set schema. It is no surprise that the schema for that arc has been explicitly set, since we set it explicitly by clicking that link in SB Studio. What is surprising is that this has implications for the order in which tuples are processed in some cases.

NOTE: When an application includes a blue arc (with explicitly set schema), the StreamBase
Platform implicitly creates a queue associated with that arc, in order to prevent stack overflows as a result of recursive calls. As the developer, you have no control over which queue will be serviced at any point, so execution order may not be what you expect.

In the simple example above, the behavior is deterministic and it matches naive expectations. A given tuple dequeued from InCalcRequest will be processed to completion (in this case, fired through the loop decrementing the counter and adding up Sum, and sent out OutCalcDone) before any other tuples are dequeued from InCalcRequest.

To clarify, for the above sample application, the following inputs:

RequestID="Big Bird", Count=4
RequestID="Grover", Count=42
RequestID="Mr. Hooper", Count=99

We get the following outputs in this order (same as entry order):

RequestID="Big Bird", Count=0, Sum=10
RequestID="Grover", Count=0, Sum=903
RequestID="Mr. Hooper", Count=0, 4950

With a slightly more complicated example including a Split operator within the loop, we can see the impact of the queues created for the blue arcs. Consider this application:

Cycle with Split

Cycle with Split (Click for full size)

A single tuple in InCalcRequest results in 6 tuples sent out OutResults. For an input tuple with RequestID set to “Cookie Monster”, we will see the following outputs:

RequestID="Cookie Monster",Message="Parent1:Child1:Grandchild1",Complete=true
RequestID="Cookie Monster",Message="Parent1:Child1:Grandchild2",Complete=true
RequestID="Cookie Monster",Message="Parent1:Child1:Grandchild3",Complete=true
RequestID="Cookie Monster",Message="Parent1:Child2:Grandchild1",Complete=true
RequestID="Cookie Monster",Message="Parent1:Child2:Grandchild2",Complete=true
RequestID="Cookie Monster",Message="Parent1:Child2:Grandchild3",Complete=true

Based only on the inputs and outputs, we do not infer anything surprising about the processing order. The Split operator sends 3 grandchildren from the first child through the loop and out the output, then another 3 grandchildren from the second child through the loop and out the output stream. However, digging a bit deeper with some logging we do find that things are not necessarily as they seem.

Consider this log excerpt:

UnionIn1 - (RequestID="Cookie Monster",Message="Parent1",Completed=false)
LogUnionOut - (RequestID="Cookie Monster",Message="Parent1",Completed=false)
UnionIn2 - (RequestID="Cookie Monster",Message="Parent1:Child1:Grandchild1",Completed=true)
UnionIn2 - (RequestID="Cookie Monster",Message="Parent1:Child1:Grandchild2",Completed=true)
UnionIn2 - (RequestID="Cookie Monster",Message="Parent1:Child1:Grandchild3",Completed=true)
UnionIn3 - (RequestID="Cookie Monster",Message="Parent1:Child2:Grandchild1",Completed=true)
UnionIn3 - (RequestID="Cookie Monster",Message="Parent1:Child2:Grandchild2",Completed=true)
UnionIn3 - (RequestID="Cookie Monster",Message="Parent1:Child2:Grandchild3",Completed=true)
LogUnionOut - (RequestID="Cookie Monster",Message="Parent1:Child1:Grandchild1",Completed=true)
LogUnionOut - (RequestID="Cookie Monster",Message="Parent1:Child1:Grandchild2",Completed=true)
LogUnionOut - (RequestID="Cookie Monster",Message="Parent1:Child1:Grandchild3",Completed=true)
LogUnionOut - (RequestID="Cookie Monster",Message="Parent1:Child2:Grandchild1",Completed=true)
LogUnionOut - (RequestID="Cookie Monster",Message="Parent1:Child2:Grandchild2",Completed=true)
LogUnionOut - (RequestID="Cookie Monster",Message="Parent1:Child2:Grandchild3",Completed=true)

Here we see that the 3 grandchildren from the first branch of the SendChildren Split operator are all enqueued at StartLoop, then the 3 grandchildren from the second branch are also enqueued, then all 6 are dequeued, in the order that they were enqueued.

Rule 6 from StreamBase Execution Order and Concurrency states: When multiple tuple sources are ready to be processed, the one to process is selected arbitrarily. Note that this applies to tuple sources, which are clarified as: Tuple sources include input streams, Metronome operators, adapters, and so on. Note that this list does not include the queues created for blue arcs in a single-threaded application. In my experimentation, tuples enqueued into the ports on StartLoop were always dequeued in the same order that they were sent, regardless of which port the tuples were sent on, or the number of tuples sent.

NOTE: If you have similar cycles in your logic and you need to guarantee that results are processed in a specific order that differs from how the queues are processed, then you must implement your own synchronizing mechanism that will store requests and trigger the calculation of the next request once the previous request has completed. This is non-trivial and beyond the scope of this article.

For most real-time low-latency systems the processing order isn’t important, what’s important is the speed of processing. If you have a case where processing order is important, take a step back to see if some change in your logic can make it unimportant. This may mean moving state information out of Dynamic Variables and Query Tables into tuples to make the processing logic stateless, thereby making the processing of each tuple an independent operation.

4. Conclusion

In this article, we have presented an overview of the different explicit and implicit concurrency settings, as well as some guidance on when and how to use the different approaches. This is a very rich subject worthy of deeper discussion, but hopefully this article will arm both new and experienced StreamBase developers with some helpful tools in designing, tuning, and debugging their systems.

If you have found this article interesting or helpful, please take some time to post your comments, questions, or suggestions on this blog.

Thanks,
Phil Martin
Datasports Inc.

Advertisements

Written by datasports

Oct 5, 2011 at 7:02 PM

Posted in Best Practices

3 Responses

Subscribe to comments with RSS.

  1. Hi Phil,

    This is a terrific article, thanks for posting it.

    I want to point out something about how StreamBase processes tuples in loops. The StreamBase runtime does guarantee the order in which tuples are processed in loops. All the tuples queued at the StartLoop Union in your example in your section 3.2 will be processed to completion before another tuple is processed from the upstream tuple source, in this case the input stream InCalcRequest. This rule makes the processing order within a loop totally deterministic (assuming there’s no other parallel region introduced by the components that make up the loop’s body).

    This gurarantee is now made in writing in Rule 5 of the newly revised Rules of Execution Order on the StreamBase Execution and Concurrency page of the documentation set. The newly re-documented Rules were just released as part of StreamBase 7.1.3 a few weeks ago. These rule revisions are documentation revisions, not implementation revisions — the idea was to finish documenting all the execution order rules that have been in place in the StreamBase runtime for a long, long time.

    Sometimes it is possible to observe output tuples displayed in a different order than they were actually processed, especially in the StreamBase Studio Application Output View, which (sometimes unhelpfully) sorts its display by default on a local timestamp truncated to the nearest second. This display order is not necessarily the order the tuples were generated or even dequeued by the client in Studio and can be misleading. If you want to get a feel for the actual order of processing, try using tracing and the Trace Debugger, or maybe strategically placed Log Output Adapters in the flow. sbc dequeue from the command line is good, too, with respect to the sequence of tuples produced by a single output stream, but inter-stream ordering may not be preserved on the client side. In general, be a little skeptical when using any client as a way to infer the order of processing within the server.

    • Steve, thanks very much for the clarification on processing order, I really appreciate it. I love having discussions that lead me into the dark corners of SB.

      The case with a cycle where I was observing some processing order surprises did actually involve multiple streams of 3 separate referenced modules contained within the same parent. I put together this simple example without realizing that it would be deterministic. I will correct the article this weekend.

      Thanks again,
      Phil.

      datasports

      Oct 7, 2011 at 7:52 PM

    • Thanks again for this comment and the further clarification offline. I have made significant updates to the discussion of cycles/loops in EventFlow logic, and added links to the documentation set you mentioned.

      datasports

      Oct 10, 2011 at 11:50 AM


Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: