StreamCruncher - API and Configuration
Here is a list of classes provided by StreamCruncher as part of the API:
streamcruncher.api.StreamCruncher streamcruncher.api.StreamCruncherException streamcruncher.api.StartupShutdownHook streamcruncher.api.InputSession streamcruncher.api.OutputSession streamcruncher.api.ParsedQuery streamcruncher.api.ParserParameters streamcruncher.api.QueryConfig streamcruncher.api.QueryConfig.QuerySchedulePolicy streamcruncher.api.ResultSetCacheConfig streamcruncher.api.TimeWindowSizeProvider streamcruncher.api.WindowSizeProvider streamcruncher.api.DBName streamcruncher.api.artifact.RowSpec
The JavaDocs can also be referred which are provided in the downloadable package. Test Cases are provided which also serve as demonstrations of the features.
streamcruncher.api.StreamCruncher
is the main class. The Input and Output Event
Streams are accessed using streamcruncher.api.InputSession
and streamcruncher.api.OutputSession,
respectively.
The "Running Query" is wrapped and handled using
streamcruncher.api.ParsedQuery and
streamcruncher.api.ParserParameters.
The classes in the streamcruncher.api.artifact
package should be used to provide hints to the
Kernel about the SQL/RDBMS equivalent data types
of the Events' properties.
Starting StreamCruncher (and stopping)
java -Dsc.config.file=.\resources\sc_config_mysql.properties
com.myproduct.CruncherClient
..
public class CruncherClient{
public static void main(String[] args){
String prop = System.getProperty("sc.config.file");
StreamCruncher cruncher = new StreamCruncher();
cruncher.start(prop);
//Option 1:
//----------
//Spawn a Thread to perform Event registration,
//handling and other real work.
.. ..
//Main Thread just waits for Console message.
cruncher.keepRunning();
//Option 2:
//----------
//Perform Event registration, handling and other
//real work.
.. ..
//Stops the Kernel.
cruncher.stop();
}
.. ..
}
StreamCruncher supports a few Database products that can be used as the underlying datastore. These are configured in a Properties file that has to be provided to the Kernel during startup through the API.
Registering an Input Event Stream
String[] columnNames = { "country", "state", "city", "item_sku", "item_qty",
"order_time", "order_id" };
//DB specific. MySQL shown here.
String[] columnTypes = { java.lang.String.class.getName() + ":SIZE=15",
java.lang.String.class.getName() + ":SIZE=15",
java.lang.String.class.getName() + ":SIZE=15",
java.lang.String.class.getName() + ":SIZE=15",
java.lang.Integer.class.getName(),
java.sql.Timestamp.class.getName(),
java.lang.Long.class.getName() };
//6 - Id column position. 5 - Timestamp column position.
RowSpec rowSpec = new RowSpec(columnNames, columnTypes, 6, 5);
String inputStreamName = "test";
cruncher.registerInStream(inputStreamName, rowSpec);
Registering the "Running Query"
String rql = "select country, state, city, item_sku, item_qty, order_time,
order_id from test (partition by item_sku store
last 5 seconds max 5) as testStr where testStr.$row_status
is not dead";
//DB specific. MySQL shown here.
String[] resultColumnTypes = {java.lang.String.class.getName() + ":SIZE=15",
java.lang.String.class.getName() + ":SIZE=15",
java.lang.String.class.getName() + ":SIZE=15",
java.lang.String.class.getName() + ":SIZE=15",
java.lang.Integer.class.getName(),
java.sql.Timestamp.class.getName(),
java.lang.Long.class.getName()};
String queryName = "test_res_rql";
ParserParameters parameters = new ParserParameters();
parameters.setQuery(rql);
parameters.setQueryName(queryName);
parameters.setResultColumnTypes(resultColumnTypes);
ParsedQuery parsedQuery = cruncher.parseQuery(parameters);
QueryConfig config = parsedQuery.getQueryConfig();
//Processing policy.
config.setQuerySchedulePolicy(new QueryConfig.QuerySchedulePolicyValue(
QuerySchedulePolicy.ATLEAST_OR_SOONER,
5000));
cruncher.registerQuery(parsedQuery);
Inserting Events
It must be noted that the property of the Event that is used as the Id must always be unique and must ideally, increase monotonically.
InputSession inputSession = cruncher.createInputSession("test");
inputSession.start();
for(; /* Run in a loop, to insert multiple Events */ ;) {
//Create the Event.
Object[] event =
....
//Send the Events to StreamCruncher.
inputSession.submitEvent(event);
}
inputSession.close();
Retrieving the generated/Output Stream Events
It is recommended that Input and Output processing be handled by separate Threads to maximize throughput.
String queryName = "test_res_rql";
OutputSession outputSession = cruncher.createOutputSession(queryName);
outputSession.start();
for(; /* Keep polling until more results are expected */ ;) {
List<Object[]> events = outputSession.readEvents(10, TimeUnit.SECONDS);
if (events.size() == 0) {
continue;
}
.. ..
System.out.println("Got few Output Events");
System.out.println("Retrying for more results...");
}
outputSession.close();
Customization
Custom Aggregate Functions can be created and
plugged into the Kernel. It involves extending 2
Classes - streamcruncher.api.aggregator.AbstractAggregator
and streamcruncher.api.aggregator.AbstractAggregatorHelper.
The Query below uses a custom Function called
test_fn
select country, sum_qty, test_fn_val
from
test (partition by country store last 5 seconds
with sum(item_qty) as sum_qty,
custom(test_fn, order_id, J) as test_fn_val)
as testStr
where testStr.$row_status is new;
The first element within the brackets of the
custom(..) keyword is the name
of the Function under which it is registered with the Kernel.
It can take any number of comma-separated elements after that,
which get passed on to the Function at run-time.
The example Function has a return type of varchar(2), which is indicated by the Helper class.
public class TestAggregatorHelper extends AbstractAggregatorHelper {
public TestAggregatorHelper() {
super("test_fn", TestAggregator.class);
}
@Override
public String getAggregatedColumnDDLFragment(DBName dbName,
String[] params,
LinkedHashMap<String, String> columnNamesAndTypes)
throws Exception {
return "varchar(2)";
}
}
TestAggregator shown here,
is configured in the Query to use the values in the
order_id column.
Obviously meant for demonstration purposes, the example
is rather contrived.
It uses the integer part of the column values and
then divides it with 26 and converts the resulting
reminder into an ASCII character. This is then
concatentaned with the "J" that is supplied in the Query.
Thus the Function returns a 2 character String by
peforming some calculation on the values in the Window.
The Kernel can create any number of instances of the Function.
Instances should not be shared by way of Singletons etc.
Each instance is initialized by the Kernel and
provides the instance with all the Column names and their
Database types present in the Stream.
TestAggregator uses this to locate the position of
the order_id column it has
been configured to use.
The instance is invoked if the Window contents change, by the end of every Query execution. It is invoked with column values of Events that got expelled from the Window as well as the ones that were added in the current run.
public class TestAggregator extends AbstractAggregator {
private int columnPosition;
private int sumOfOrderIds;
private char suffix;
@Override
public void init(String[] params,
LinkedHashMap<String, String> columnNamesAndTypes,
AggregationStage aggregationStage) {
super.init(params, columnNamesAndTypes, aggregationStage);
for (String column : columnNamesAndTypes.keySet()) {
if (params[0].equalsIgnoreCase(column)) {
break;
}
columnPosition++;
}
suffix = params[1].charAt(0);
}
@Override
public String aggregate(List<Object[]> removedValues,
List<Object[]> addedValues) {
if (removedValues != null &&
getAggregationStage() != AggregationStage.ENTRANCE) {
for (Object[] objects : removedValues) {
Object object = objects[columnPosition];
// Consider only non-nulls.
if (object != null && object instanceof Long) {
Long l = (Long) object;
sumOfOrderIds = sumOfOrderIds - l.intValue();
}
}
}
if (addedValues != null) {
for (Object[] objects : addedValues) {
Object object = objects[columnPosition];
// Consider only non-nulls.
if (object != null && object instanceof Long) {
Long l = (Long) object;
sumOfOrderIds = sumOfOrderIds + l.intValue();
}
}
}
int i = (sumOfOrderIds % 26) + 65;
char c = (char) i;
return new String(new char[] { c, suffix });
}
}
Before the Query that uses the custom Function is registered, the Helper class must be registered with the Kernel.
StreamCruncher provides a variety of Aggregate Functions.
In some cases, the change (Difference) in the value of the
Aggregate is more useful than the Aggregate itself. For
most in-built Aggregates other than the
max and min
Functions, an additional directive called $diff
, when specified tells the Function to spit out the difference
between the current value and the previous aggregate value.
In some cases, this simple difference will not suffice. For
example in some situations like in Temperature control systems,
it is more useful to use a highest value that is considered
safe and use that as a baseline against which the latest
aggregate is compared. Or the highest value in the last 1 hour for
Stock value changes. StreamCruncher allows custom
Baseline Providers to be plugged in.
select columns..
from ping_event (partition by levela, levelb
store last 5 seconds
with pinned count(event_id $diff) as change,
count(event_id $diff 'DiffBaseline/CountBL') as change2,
count(event_id) as plain_count) as heartbeat
where ..;
Each Baseline Provider must be registered with the Kernel before the Query is registered.
cruncher.registerProvider("DiffBaseline/CountBL", CountDiffBaselineProvider.class);
cruncher.registerProvider("DiffBaseline/AvgBL", StatsDiffBaselineProvider.class);
public class CountDiffBaselineProvider
extends DiffBaselineProvider<Integer> {
public CountDiffBaselineProvider() {
super();
System.out.println("Using CountDiffBaselineProvider");
}
/**
* @return Fixed Baseline. Always Zero.
*/
@Override
public Integer getBaseline(Integer oldValue, Integer newValue) {
return 0;
}
}
public class StatsDiffBaselineProvider
extends DiffBaselineProvider<Double> {
public StatsDiffBaselineProvider() {
super();
System.out.println("Using StatsDiffBaselineProvider");
}
/**
* @return Fixed Baseline. Always Zero.
*/
@Override
public Double getBaseline(Double oldValue, Double newValue) {
return 0.0;
}
}
Baseline Providers must extend the
streamcruncher.api.aggregator.DiffBaselineProvider.
Baseline Providers for the count function
must be based on the java.lang.Integer type.
Similarly, java.lang.Double for other Statistical
functions.
StreamCruncher allows different Windows in the same
Partition to have different sizes. Ex: For a Query
such as this .. from test (partition
by country, state, city store last 10 seconds max 5) as
stream1.., into which 2 Events stream in
- ("US", "California", "San Jose", "warp-drive",
.. more properties) and ("India", "Karnataka",
"Bangalore", "force-field" .. other props), the
US > California > San Jose
sub-Window can have a "20 second Window size with max
7 Events" while the India >
Karnataka > Bangalore sub-Window can have a
"10 second Window size with max 5 Events". The values
defined in the Query will be used as default values.
select ..columns.. from test (partition by item_sku store last 10 seconds max 5 'TimeWindowSize/MyProv') as testStr where ..;
A custom WindowSize Provider is declared in the Query, as part of the Window definition clause. The provider must be given a logical name like "TimeWindowSize/MyProv" within single quotes when declared in the Query. The actual Provider has to be registered with the Kernel before the Query is registered.
2 kinds of Window Size Providers exist -
streamcruncher.api.TimeWindowSizeProvider
and
streamcruncher.api.WindowSizeProvider
, based on whether the Partition is a Time Based Window or
not. The Query still needs a default size to be specified
along with the name of the Provider.
A custom Provider would look like this:
public class CustomWindowSizeProvider extends
TimeWindowSizeProvider {
/**
* @param levelValues
* Partition level values.
*/
@Override
public long provideSizeMillis(Object[] levelValues) {
if (levelValues[0].equals("US") &&
levelValues[1].equals("California") &&
levelValues[2].equals("San Jose")) {
// 20 second Window.
return 20 * 1000;
}
// Default for the others.
return super.provideSizeMillis(levelValues);
}
/**
* @param levelValues
* Partition level values.
*/
@Override
public int provideSize(Object[] levelValues) {
if (levelValues[0].equals("US") &&
levelValues[1].equals("California") &&
levelValues[2].equals("San Jose")) {
// 7 size Window.
return 7;
}
// Default for the others.
return super.provideSize(levelValues);
}
}
The Kernel instantiates Providers using the empty Constructor. The Kernel creates a new instance for each level of Partition, where it is declared.
Persistence and Event purging
StreamCruncher can be shutdown and restarted without having to re-register the Queries and Event Streams. They will be loaded automatically when the Kernel starts again. However, the artifacts in the Database will be re-created and as a result the data will be deleted by StreamCruncher, if the Database retains the artiacts after a restart.
StreamCruncher does not store the Input and Output Streams for longer than it is required by the Kernel. As a result, the Events should not be expected to be available after the Kernel has consumed/generated them. If the Events have to be persisted, then it must be done outside the Kernel, by the User.
We have already seen how Partitions create smaller Windows based on the columns/properties. If the Partition has been specified to use Sliding Windows or Random Events Windows or Highest/Lowest values Windows, then these Windows will stay in Memory as long as the Kernel is running. In Time based or Tumbling Windows however, the Window is destroyed if all the Events in the Window expire. Such Windows are re-created by the Kernel if new Events arrive that need to be placed in the Window that was destroyed earlier. Aggregates defined on such Windows also follow the lifecycle of their respective Windows.
Query Configuration and Tuning
Each Query that is registered has a Configuration
class called streamcruncher.api.QueryConfig that
can be used to fine tune the behaviour of the
Query. It can be retrieved through the
StreamCruncher API using the
getQueryConfig(queryName) method or from the
ParsedQuery object
(ParsedQuery.getQueryConfig())
that was created during Query registration. All the
settings can be changed even while the Query is
running.
The first option sets up the Scheduling policy for the query. There are 2 options:
1) Fixed, where the Query executes every 'x' milliseconds. This setting makes the Query run irrespective of whether the Events arrive or expire. This option is similar to scheduling a simple SQL Query in the Database. This setting is not suitable for Time and Tumbling Windows as the Query may not execute immediately after an Event's expiry.
2) Atleast-or-sooner, allows the Query to execute whenever an Event arrives or expires from a Window. If a Window has atleast one or more Events that are pending consumption, then the Query runs in immediate succession. The consumption pattern is however decided by the nature of the Window. If none of these scenarios arise, the Query will execute at the intervals specified.
getQuerySchedulePolicy()
setQuerySchedulePolicy(querySchedulePolicyValue)
enum QuerySchedulePolicy {
FIXED, ATLEAST_OR_SOONER;
}
QuerySchedulePolicyValue(policy, timeMillis)
If the Query is running with the Atleast-or-sooner option, then the Event-weight option allows finer control over Query triggering. For each Event Stream that feeds Events into a Query that has been registered, weights can be specified for each Event coming from a Stream. Whenever the combined weight of all pending Events goes above zero, the Query is triggered.
For example: Let's pretend that Query QRY1 consumes Events from 2 Streams A and B. Events from Stream A are more frequent but the information carried by 5 of them have to be held and co-related with 1 Event from B. So, those 5 Events will either have to be held in the Window or can be made to wait for the main Event from B. The Events can be held back from triggering the Query individually by setting A's Event weight to -1 and B's to 6. So, 5 Events from A add up to -5 and "(5 x A) + B = (5 x -1) + 6 = 1" and so the Query fires when A arrives because the total weight goes above 1.0. But, if the Query has a Time based Window or Tumbling Window, then the Weights do not stop the Query from executing when the Events expire from those Windows.
Another option would be to assign a weight of 0 to A and 1 to B. This way, Events from A are not allowed to trigger the Query.
float getUnprocessedEventWeight(key)
setUnprocessedEventWeight(key, weight)
If a Query is scheduled to run within the next few Milliseconds (Atleast-or-sooner only), then this setting prevents an Event from triggering the Query if the time left is less than the margin specified.
long getForceScheduleMarginMsecs()
setForceScheduleMarginMsecs(forceScheduleMarginMsecs)
The StreamCruncher Kernel is multi-threaded. If a Query execution takes too long, then StreamCruncher starts issuing warnings and after a few warnings, kills the Query's current run because it assumes that something has gone wrong. The Query's longest execution should be analysed and set to prevent terminations.
getStuckJobInterruptionTimeMsecs()
setStuckJobInterruptionTimeMsecs(stuckJobInterruptionTimeMsecs)
Queries can be paused and resumed at will. When a Query is paused, then every few seconds, the Kernel keeps checking if that Query has been resumed. If the Query is going to be paused for a long time, then the resume-check interval must be increased suitably to conserve CPU resources.
long getResumeCheckTimeMsecs()
setResumeCheckTimeMsecs(resumeCheckTimeMsecs)
Sliding Windows move one Event at a time. So, if the Event Stream floods the System with too many Events, then the System will slow down to a crawl because the Query's Sliding Window has to consume and execute one Event at a time. If the older events can be dropped without fear of losing too much information, then this option allows the Query to keep up with the incoming Stream of Events by jumping towards the most recent Events. So, if there are 1000 pending Events, and the pending-events-allowed is set to 30 and the Sliding Window's size is 50, then the first 919 Events are skipped. The Window consumes Events from 920 to 970 at once and the remaining 971 to 1000 are consumed one at a time.
int getAllowedPendingEvents(key)
setAllowedPendingEvents(key, events)
Since StreamCruncher uses an RDBMS underneath, all the performance tips that one would normally use to write SQL queries apply here too. StreamCruncher automatically creates an Index on the Primary-key/Event Id column of the Output Event Stream/Table.
1) The $row_status column in Partitions,
internally relies on an Indexed column. Such
Indexed columns must be used to speed up
Queries
2) The where-clause on the Stream must have as the first condition, one which filters out as many rows as possible
3) When Events are being co-related, which are similar to SQL Joins, the "Driver Table/Event Stream" should be the Table/Stream which presents the smallest number of Rows to the Join operation
For example:
The fulfillment_events.$row_status
is new picks up only the latest events for the Join
operation with order_events.
Pre-Filters can be attached to an Input Event
Stream and they can also lookup dynamic data
using the (not) in ..
clause. Since such reference data does not change
frequently, the Kernel caches such results and
shares the cached data among other Queries that
use the same SQL in their Pre-Filters.
The Kernel therefore provides a way to specify
how often the data must be refreshed from the
Database.
ResultSetCacheConfig getResultSetCacheConfig(sqlSubQuery) long getRefreshIntervalMsecs() setRefreshIntervalMsecs(refreshIntervalMsecs)
The Kernel provides an API to retrieve the SQL Sub-Queries in a "Running Query" that will be cached by the Kernel, when it gets registered. This will be useful to retrieve the Cache configuration object to adjust the refresh times.
ParsedQuery parsedQuery = cruncher.parseQuery(parameters);
//Iterate through the list. Could be empty if
//there are no Pre-Filters with Sub-Queries.
for (String sql : parsedQuery.getCachedSubQueries()) {
ResultSetCacheConfig cacheConfig = cruncher.getResultSetCacheConfig(sql);
cacheConfig.setRefreshIntervalMsecs( /* set to some interval */ );
}
Kernel Configuration and Tuning
StreamCruncher allows various aspects of the Kernel to be tuned. It splits the whole process of Query processing into stages and each stage relies on a Thread-pool and some of these stages are processed by multiple Threads in parallel. The Kernel itself is multi-threaded and so all the Queries that are registered in the System are handled in parallel. This helps the Kernel to scale well on multi-core/multi-processor Hardware. It also uses a Connection Pool to interact with the underlying Database.
Some of the properties are specific to the Database being used. There are templates for each of the Databases that are supported by StreamCruncher. These settings can be tuned and the path to the file has to be sent to the API during startup.
The Database Driver, Id and Password are also specified
in this file. If the Database has been configured
to preserve the artifacts such as Tables, Indexes etc
even after a Server restart (if such an option is available)
then the db.preservesartifacts.onshutdown
property in the configuration must be set to true.
This is an indication to the Kernel to drop and
re-create them while restarting. If the Database is
configured to run in a private, embedded mode, then
some such Databases require at least one connection
to be kept open throughout the life of the Program,
otherwise the Database disappears. The
db.privatevolatile.instance
option is used to let the Kernel know that it has to
keep one connection open until it is shutdown.
In addition to the Database Driver, User name and other
properties, the db.schema
can be used to specify the Database Schema that the
Kernel must use to create its internal artifacts in.
The tuning parameters listed below must be carefully tested and measured before they are to be applied. The same applies to the Query, Input and Output Stream tuning.
The maximum number of Connections that will be
created and held in the Pool -
db.connectionpool.maxsize.
Event Ids that are pumped into the System are
processed by the "instreameventprocessor" module.
The Threads in this module keep checking for new
Events and notify the Query Scheduler module which
is next in line. Everytime this Thread is woken up
on Event arrival, it stays awake for sometime,
looking for new Events from all registered Streams
and then goes back to sleep for a while. That is
what the 3 options are for -
instreameventprocessor.threads.num,
instreameventprocessor.thread.empty-runs-before-pause,
instreameventprocessor.thread.pause.msecs.
The Scheduling and Execution are also performed
separately and can be configured using these 2 -
queryscheduler.threads.num,
queryrunner.threads.num. Each of these 2 settings
should be close to the total number of Queries that
will be running in the Kernel. Another setting -
jobexecution.threads.num is used to handle the
thread-pool which handles parallel execution of the
Partitions in a Query.
Since StreamCruncher can automatically purge old
Events from the Input Streams, if they have been
configured to, this option -
rowdisposer.thread.pause.msecs
indicates how often this Auto-purge task should be run
by the Kernel.
Since Pre-Filters with Sub-Queries are cached by the
Kernel, the number of Threads that must be used to
maintain/refresh these Cached Queries can be changed
from the default value if the volume of cached data
and the number of unique Queries is large.
cacherefresh.threads.num
should be used to change the number of these
processing Threads.
The Kernel uses the System clock as a reference to check if the Events are going to expire and also to stamp the generated Output Events with the time of occurance. If the Events arriving at the Kernel are out of sync with the computer clock on which the Kernel is running or if for some reason the Kernel-stamped Events have to be in sync with another system, then the Kernel can be set with a bias that gets added to the System clock's time everytime the Kernel needs the latest time.
setTimeBiasMsecs(timeBiasMsecs)
long getTimeBiasMsecs()