StreamCruncher - Basics
StreamCruncher is a pure Java™ application. It requires Java 1.5 or above. Java 1.6+ is recommended. It is an application that sits on top of the Database. StreamCruncher communicates with the Database using the JDBC Driver provided by the Database. It is a full fledged, multi-threaded application. Since there is no native Database specific code, for optimum performance, the Database must ideally be located on the same machine as StreamCruncher or must be accessible through a high speed LAN. For best performance, an "In-Memory and Embedded" Database should be used. Various Databases are supported. Each Database provides a unique set of features and modes of operation.
StreamCruncher uses Database Tables which are created and maintained automatically and hidden from the User, by the Kernel. These Tables are created and used when Partitions are defined over Input Event Streams, as part of the "Running Query". Once the Input Event Stream is registered in StreamCruncher, Queries can then be registered which monitor the Event Streams. For each such "Running Query", there is another Table where the Query results are dumped into. This is again created and maintained automatically by the Kernel. StreamCruncher provides an API that is extremely easy to use. This API provides an elegant wrapper over the Kernel and the underlying Database.
In its most basic setting, StreamCruncher might be likened to running an SQL Query in a timed-loop over the Database. However, it can do a lot more than that. For starters, if the Query is setup to run against multiple Event Streams (Event Co-relation), the Query will be triggered whenever an Event arrives in any of those Input Streams. It must be noted that, when the Query is already running, an Event cannot trigger a second instance of the Query.
Event Processing Constructs
One of the most distinguishing features of StreamCruncher are the Window and Parition constructs. 3 types of Windows are supported over Event Streams.
1) Sliding Window: A fixed size moving Window
over the Stream - store last N.
2) Time Window: A fixed width Window, where the
"width" refers to the time in
Milliseconds/Seconds/Minutes/Hours after which,
the Event is pushed out of the Window -
store last N milliseconds/seconds/minutes/hours.
Any "SQL Timestamp" column in the Event Table can
be configured in the Time Window as reference, by
using the API to point out the Timestamp column.
A maximum window size can also be specified, which forces the Window to consume at each run the specified number (at most) of Events. If there is atleast one Event that was not included in a run then the Query will run immediately after that and expel the first/all Events in the Window to make room for the pending Events, even if they had not yet expired.
3) Tumbling Window: Events stay in the
Window for one cycle only and expire in the next
cycle - store latest N.
In addition to the Windows, StreamCruncher allows Events in an Event Stream to be partitioned into smaller Windows, based on a group of Columns in the Event. For example: "Recent 5 Shipping Orders Events at Country -> State -> Department level".
A simple Time Window is defined like this:
partition store last N seconds.
A heirarchy of Windows - partition
by country, state, city store last N seconds.
Apart from defining Partitions, StreamCruncher also supports Aggregates on columns. Several Aggregate functions are supported. Custom Aggregate functions can also be plugged in. An Average Aggregate function defined on a column in the Stream, with a Sliding Window would maintain a Rolling-average on the Events in the Window. Aggregates are akin to Materialized Views in RDBMS, where the Database automatically updates a View.
Here is a list of a few features provided by Partitions:
1) A Partition allows a heirarchy of Windows to be created on an Event Stream, just like how the "SQL Group By" clause groups Rows based on the Column values. Where as, a simple Window Partition provides just one single Window on the Stream
2) A Partition column also allows Aggregates to be specified for each Window in the heirarchy of Windows. Several Aggregate Functions are supported - Count, Sum, Min, Max, Geometric Mean, Kurtosis, Variance etc.
Aggregates also provide features which prove very
useful while implementing real-world problems,
by means of the pinned
and entrance only
clauses.
Aggregate Functions maintain numbers based on the
column values currently inside the Window. So,
when Events enter the Window or exit the Window
on expiry, the aggregated value changes. This
behaviour might prove to be limiting, in cases
where a simple counter-like functionality is
required.
select src_ip_address, recent_requests, total_requests
from request_events (partition by src_ip_address
store last 30 minutes
with pinned
count(id) as recent_requests,
count(id) entrance only as total_requests)
as denial_attack_check
where denial_attack_check.$row_status is new
and recent_requests > 75;
The example above maintains 2 counters on the
incoming requests to a Website at the Source
IP Address level. One counter (recent_requests)
maintains a count of requests made in the last
30 minutes. When the Event (Request) expires
after 30 minutes, the counter decrements by 1.
There is another counter (total_requests) with
the entrance only
clause, where the counter does not decrement when
the Event expires. It records only the incoming
half of Events. This keeps an account of the
total number of requests made ever since the
Query was setup. However, the Window is a "Time
Based Window" and such Windows expire when all
Events have expired and no new ones arrive. This
would reset the "total_requests" counter every
time such a thing happened. To prevent that from
happening, the pinned
clause is used to hold the Window, even if it
is empty for as long as the Query is running.
The "recent_requests" counter will reduce to 0
when the Window becomes empty, but not the
"total_requests" counter.
3) Partitions also provide a feature which makes
specific kinds of Events to be visible to the
Query. By using the $row_status is new/dead/not dead
clause, Events that got added in to the Window in
the Current query cycle, New Events and older
Events that are still inside the Window or Events
that have just been expelled from the Window,
respectively can be made visible to the Query
4) Partitions can have Pre-Filters, which are defined using the Where-clause.
select .. ..
from stream1 (partition by a, b, c
store last 30 minutes
where a > 10 and b = c)
as filtered_stream
where a > 15 and filtered_stream.$row_staus is not dead;
As can be seen in the example above, Events can be filtered even before they enter the Partitions. This is extremely useful where the Windows must not be polluted by Events that are not required. It also helps to split the incoming Event Stream and process them in different Queries, which can be thought of as Load-Distribution.
Pre-Filter expressions are limited to basic
operations such as: <, >,
!=, =, *, /, +, -, in (..), not in (..), and, or.
Functions (min, max,
current_timestamp() and others), like operator,
exists clause and other such syntax are not
supported. Although the Pre-Filter syntax is
limited, a lot can be accomplished with just these
operators. Since the (not) in
clause is supported, it can either refer to a
hard-coded list of values or a Sub-Query:
select .. ..
from stream1 (partition store last 30 minutes
where a > b and b in (100, 120, 140))
as filtered_stream
where .. .;
By making use of a Sub-Query with the in clause, the Events can
be filtered based on a dynamic condition, in the
example below - a regular Database Table containing
the details of Priority Customers. Since the reference
list is a Table, it can be updated and/or refreshed
without having to stop the Query. This feature
might also be called as Content Based Routing.
select .. ..
from stream1 (partition store last 30 minutes
where customer_id
in (select customer_id from priority_customers))
as filtered_stream
where .. .;
5) Partitions can be Chained, to hold and/or expel Events, calculate Aggregates etc in cases where a single Partition clause on a Stream does not suffice
select country, num_skus, avg_qty
from
test (partition by country store last 5 seconds
with count(item_sku) as num_skus, avg(item_qty) as avg_qty)
to
(partition by store last 5 seconds
where
$row_status is new
and
(num_skus >= 10 and num_skus <= 30))
as testStr
where testStr.$row_status is new;
The Query above shows the first Partition which maintains 2 aggregates on a 5 second Window, per Country over the Stream; which is then consumed by a second anonymous Partition, which accepts only the latest aggregate values and where the count is between 10 and 30. This is in turn retained in a 5 second Window. Events that make it into the second Partition are retrieved by the Query.
Chained Partitions must always have a Pre-Filter
Where-clause and the first condition must either
be a $row_status is new
or $row_status is dead
clause. If there are additional conditions in the
expression, then it must follow the status clause
after an and.
Ex: where $row_status is new
or a more complex $row_status
is new and ((price > 10.0 and price <= 12.5) or
symbol = 'XYZ'). The Pre-Filter Where-clause
in Chained Partitions must not use the
$row_status other than
at the beginning.
For an Input Event defined as {a, b, c, d, e}, each of the constructs will produce results with different structures:
1) Simple Partition - (partition store last 10). Structure will be {a, b, c, d, e}
3) Partition - (partition by c, d, e store last 10). Structure will still be {a, b, c, d, e}, but Windows will be maintained at "c -> d-> e" level.
4) Aggregated Partition - (partition by c, d store last 8 with avg(a) as avg_a). Structure will still be {c, d, avg_a}, and Windows maintained at "c -> d" level.
StreamCruncher also provides other Windows in Partitions that are slight variants of the Sliding Window:
a) Random Events Window: This maintains a fixed size Window of Events, where the decision to consume or discard a new Event into the Window is decided at random. If the Window is full, then the new Event that is allowed to enter the Window expels the oldest Event from the Window, just like in a Sliding Window.
b) Highest/Lowest values Window: This type of fixed
size Window requires an Event property/column to be
specified (using column-x
clause) whose highest/lowest values are used as the
basis to consume/retain/discard Events in the Window.
Highest/Lowest values Windows also provide a unique
feature called the Update Group.
If the Window is
not provided with an Update Group, then each Event
that enters the Window is treated as a unique entry.
The example below holds the sum of quantities sold
by a store over a 30 day period. The Sum, maintained
at "Country -> State -> Category -> SKU"
level is fed to another Partition in the Chain. The
second Partition holds the Top 5 highest selling items
based on the Sums calculated by the first Partition
in the Chain.
select order_country, order_state, order_category,
order_item_sku, order_total_qty
from order_events
(partition by order_country, order_state,
order_category, order_item_sku
store last 30 days with sum(order_quantity) as order_total_qty)
to
(partition by order_country, order_state,
order_category
store highest 5 using order_total_qty
with update group order_country, order_state,
order_category, order_item_sku where $row_status is new)
where order_events.$row_status is not dead;
Everytime the Sum changes, the new Sum is sent to
the second Partition. If the
with update group... clause is
not specified, then the updated Sum will be treated as
an unrelated Event and when it enters the Window, assuming
it makes it to the Top 5 and the previous Sum for that
category is already in the Top 5; both the old and the
new Sums will be displayed in the Window. This might not
be the expected behaviour. But if an Update Group, is
specified, the values of the columns provided to the
Group (Country, State, Category and SKU) will be used
to generate a Group-Key. If the Window already has another
Event with the same Group-Key, then that value will be
updated with the new value. If the new value cannot make
it to the Top 5, then both the old and new values will
be moved out of the Window. This is the expected behaviour
for the Top-Selling-Items Use Case mentioned above.
Multi-Stream Correlation/Pattern Matching
Multi-Stream Event Correlation is one of the more
advanced features offered by StreamCruncher. Where
the SQL not exists
clause for 2 Stream Correlation does not suffice,
the alert..when..using..
clause should be used to monitor specific Patterns
across multiple Event Streams.
select stg1_id, stg2_id, stg3_id, priority,
case
when stg2_id is null and stg3_id is not null
then 'Stage 2 missing!'
when stg2_id is not null and stg3_id is null
then 'Stage 3 missing!'
when stg2_id is null and stg3_id is null
then 'Stage 2 & 3 missing!'
else 'All OK!'
end as comment
from
alert
one_event.event_1_id as stg1_id,
two_event.event_2_id as stg2_id,
three_event.event_3_id as stg3_id,
one_event.event_priority as priority
using
stg1_event (partition store last 5 seconds where priority > 5)
as one_event correlate on event_1_id,
stg2_event (partition store last 5 seconds where priority > 5)
as two_event correlate on event_2_id,
stg3_event (partition store last 5 seconds where priority > 5)
as three_event correlate on event_3_id
when
present(one_event and two_event and not three_event) or
present(one_event and not two_event and three_event) or
present(one_event and two_event and three_event)
where priority < 7.5;
This clause is composed of several parts. The first,
after the alert keyword,
where columns from different Streams are projected
as a single consolidated Event. The second is the
using clause where the
Partitions are defined which consume Events from
the different Streams being correlated/monitored.
Each Partition ends with a correlate
on .. clause which indicates the Column from
that Stream/Partition to be used to Correlate Events
against the other Streams. This column must be of the
same Data Type in all the Streams. This must also be a
column whose values uniquely identify an Event.
The when .. clause is where
the Patterns to watch for are specified. A pattern is
specified by the present(..)
clause, where the Partition alias is used to indicate
whether Events from that Partition are to be monitored
as part of that Pattern or not. Multiple Patterns can
be specified by separating them with the
or clause. The
when present(..) or present...
syntax follows the Disjunctive Normal Form.
If one of the columns being projected by the
alert clause comes from a
Stream on which a present
clause has a present(..and not partitionx)
defined on it; and that Pattern happens to fire, then
that column will be null
in the composite Output Event.
The alert clause must be
surrounded by a select
clause which can introduce additional pseudo-columns
like the comment column
shown above, in addition to projecting the columns
provided by the alert
definition. An additional where
clause can be provided after the
alert clause, which acts
as a post-filter on the Patterns it fires.
If a present clause does
not have a not in it, then
that Pattern will trigger as soon as the Event Id
appears in all the Streams defined in that clause.
This can be thought of as an "Immediate Matcher".
If a present clause contains
atleast one not in it,
then for that Pattern to fire, all the Events in the
other Streams, other than the ones with
not in them must appear and stay for atleast one
cycle while the ones with the not
must not appear. While such a condition is asserted,
the Pattern fires, when one of the "present" Events
exits from the Partition. So, this can be thought of
as a "Delayed Matcher".
Also, for a Pattern with atleast one
not in it, the Pattern must stay "true" throughout
its life, to fire.
Delayed Matcher Examples: For a pattern
present(a and b and not c), the following scenarios
show when it fires.
1) (_,_,) | a->(a,_,_) | c->(a,_,c)
No fire. b did not appear and c appeared instead.
2) (_,_,) | a->(a,_,_) | b->(a,b,_) | (_,b,_)->a
Fire as soon as a exits the Partition Window.
3) (_,_,) | a->(a,_,_) | b->(a,b,_) | (a,b,_) | c->(a,b,c)
No. Appearance of c taints the Pattern.
4) (_,_,) | a->(a,_,_) | b->(a,b,_) | (a,b,_) |
c->(a,b,c) | (a,b,_)->c | (a,b,_)
No fire. Pattern tainted by appearance of c even though
it disappears in a subsequent cycle.
It must also be noted that a Pattern that has fired once will not fire again - i.e if an Event expires, while a part of the Pattern is still true and before the whole pattern expires, it gets asserted again. However, if the whole Pattern has expired and the it gets asserted again some time later, then it will fire again.
Event Weights
Events from each Stream can be assigned different weights. For example: Stream A can be setup such that it can trigger the Query only if there are 5 or more Events pending. This might be required, where Events from Stream A are very frequent, but don't carry very important information. Stream B might be less frequent, but might bring in more critical data. So, even a single Event from Stream B can be made to trigger the Query. Or, whenever the combined weight of Events across all Streams goes above One, the Query executes. Event weights are however ignored by the Query if Time Windows or Tumbling Windows have been defined. In such cases, the Weight cannot stop an Event from getting pushed out of the Window after it has expired, which in turn triggers the Query execution. The Query can also be setup to execute at fixed intervals, which ignores all other settings.
If a Partition is configured to use a Pre-Filter,
i.e if it has a where
clause as part of the partition
definition, then the Event Weight is for those
Events that make it through the Filter.
select .. from ..
(partition by store last ..
where item_sku in (select item_sku from priority_item))
as order_events
where order_events.$row_status is dead;
In the Query above, if the Event Weight is defined as 0.1, then if 10 Events arrive, out of which only 5 of them make it through the Filter (Item SKU is in the Priority Item list), then the accumulated weight will be 0.5; which is still less than 1.0 and so the Query will not get triggered.
When the Event weights are left at their default values, the Query will execute whenever an Event arrives. If several Events arrive in close succession, then Query might execute just once by consuming all the Events. This behaviour however, depends on the type of Window configured over the Stream.
1) If a Sliding Window has been configured on a Stream, then the Query is forced to execute once for each Event because the Sliding Window moves forward one Event at a time
2) If it were a Time Window, then the Query can consume all new Events at once. The Query also executes when an Event expires and has to be expunged from the Window
3) A Tumbling Window would consume the specified number of Events at once and discard them in the next cycle
4) It is also possible for the Query with say, a Sliding Window (A) and Time based Window (B) to run just once if the arrival of an Event in A and an Event expiry in B coincide. Thus, it is possible for "Query trigger conditions" to bunch up
Row status in Partitions
Windowing constructs are useful only if there is
a mechanism to identify Events that got into, out
of or still inside the Window.
The row_status clause
is useful in several situations:
1) You want to observe the Events that got
expelled from a Window in the Partition in the
current cycle.
($row_status is dead)
2) You want to observe the contents of a Window,
which will include all the Events that are still
inside the Window and the new Events that got
added in the current cycle ($row_status is not dead)
3) You want to co-relate the Events that have
been added into the Window in the current cycle
with Events/Rows from another Stream/Table. Since
you have already matched the older Events in the
Window in some previous cycle and you don't want
to re-match them again and also because using an
Indexed column on a Table with less rows as the
"driving-table" in a Join improves performance
($row_status is new).
The SLA-Failure-Alert example makes best use of
this feature.
Self Joins in Partitions
Most often, Partitions are created on multiple Streams and they are correlated using some shared characteristics like a common Customer Id, the same Bank transaction Id etc. There are some cases where the Events inside Partitions have to be compared with other Events inside the same Partition. In standard SQL, it is quite common to see such Self-Joins. On Streams, the same concept can be applied to implement some very interesting solutions.
Example: Police reports containing vehicle descriptions of suspects are sent as a continuous Stream of Events. Each report/description of a suspected vehicle is held for 30 minutes. Reports come from different Police jurisdictions. The Stream is Partitioned into several 30 minute Time Based Windows. i.e Each jurisdiction has a separate Partition/Window. Every time a new report comes in, the other Windows (other jurisdictions) are scanned for a similar vehicle description. If it matches, then the results are sent as query output. So, alerts are raised when suspects drive their vehicles and cross Police jurisdictions.
It would be very inefficient to define 2 identical
Partitions on the same Stream and perform what is
called a Self-Join in SQL. StreamCruncher provides
the self# clause where a
Partition on a Stream can be referred to with an
alias which can then be used to perform the Self-Join.
select veh_events.state, veh_events.county,
previous_veh_events.state, previous_veh_events.county,
veh_events.veh_make, ....
from veh_report (partition by state, county store last
30 minutes) as veh_events,
self#veh_events as previous_veh_events
where veh_events.$row_status is new
and previous_veh_events.$row_status is not dead
and veh_events.county != previous_veh_events.county
and veh_events.veh_make = previous_veh_events.veh_make
and veh_events.veh_year = previous_veh_events.veh_year
and veh_events.veh_color = previous_veh_events.veh_color
order by previous_veh_events.report_time desc;
The query above, matches new vehicle reports with
older reports that were made from other Counties in
the last 30 minutes. Such a report can help Police
to plan better and respond quicker. By prefixing
the alias of the original Partition with a
self# clause and then
naming the reference with a new alias, the same
Partition can be Joined with itself, thereby helping
to scan and identify Patterns within the same Stream.