Get it right with streaming data sources and id columns

Theodor Stenevang Klemming_21338
edited April 2022 in Altair RapidMiner

Id columns in streaming data sources

When using a streaming data source in Panopticon Real Time, it is important to use id columns, timestamps and Real-time Limit settings correctly to achieve the right visual display of the data.

In this article, we are looking at a minimal example data source, which is a message topic publishing a new message approximately every 250 milliseconds, so 4 messages per second. In each message, there is just a single data field, called “value”, containing a numeric that goes from 1 to 8. So, if we list the content of 8 consecutive messages from this topic, we get: 1, 2, 3, 4, 5, 6, 7, 8

These 8 values are published and received during a total of 2 seconds. The topic is continuously publishing messages, looping through these eight values, so after the value 8, we get the value 1 again and so on.

This example topic could come from for example Kafka, MQTT, or some other message queue software. If you are using Panopticon Streams, you can set up an example topic like this by following these steps:

  • Create a Streams data source using the Stream Simulator connector. Set Text File Source to “Text” and enter the following as a single column: value,1,2,3,4,5,6,7,8
  • Set Playback Set Size to 1, Playback Interval (ms) to 250 and Real-time Limit (ms) to 200, and check the “Loop” box to make the values repeat over and over.
  • Create a Streams application with an Input node using the data source you created above
  • Add and output node and set Data Consumer to KafkaOutput
  • Later, you can insert a Calculation node between the Input and the Output, and add a message_id field or a timestamp field using the clock() function.

Example 1: Only a value, no id-columns

The simplest thing we can do when connecting to this topic with Panopticon Real Time, is to just go with the default settings on the connector. The connector to use will depend on the data source, for example Kafka or MQTT. This will give us a data table with a single column, named “value”. Then we apply this data table to a dashboard, for example in a Visual Table dashboard part. We add the value column to Records of the table three times over, and use different aggregation method for each: Sum, Mean and Count. This will reveal that:

  • The value sum and value mean are the same and seem to jump once per second, haphazardly between values in the range 1-8. The Count-aggregation reveals that there is just 1 value in the data table, and it stays that way, no matter how long we wait.

image

The reason for this result is that

  1. The topic contains no data that uniquely identifies each message, so Panopticon will replace the previous value with a new value, and the data table will only ever contain 1 row of data. In lack of any kind of Id column, each new value is assumed to be a replacement for the previous value.
  2. The default Real-time Limit in the connector settings is 1000 milliseconds. But, as mentioned above, the topic publishes a new message approximately every 250 seconds. During the course of 1000 ms, there will be 4 or 5 new messages that get queued up in the Panopticon server. When Panopticon does a data table refresh at each 1000 ms, it will insert each queued-up message, in the order received. But, since there is Id column in the messages, the value in each message will replace the value from the previous message, as mentioned under point (a) above. The result is that the Panopticon data table will get updated with approximately every fourth or fifth message and only those values will show in the dashboard. The reason why I write “approximately” is that the data table refresh happens after 1000 ms, and four messages pass by in 250+250+250+250 ms. Due to small extra latencies in the few milliseconds range, the number of messages that reach the data table will be 5 at each refresh, not 4 messages. It is enough to change the Real-time Limit down from 1000 ms to 900 ms, to make sure no more than 4 messages are in the queue each time and see stable alternation between for example the values 1 and 5.

So, in this scenario, where we don’t have any unique message id field, what can we do to at least see an update in the dashboard for each new message that is sent on the topic? The answer is that we must ingest new values from the queue of messages mor frequently: We need to increase the sampling rate. We do this by lowering the Real-time Limit in the Panopticon data connector down to an interval which is less than the interval by which the topic is sending messages. Since the topic sends a new message approximately every 250 milliseconds, it will be suitable to set the Real-time Limit to 200 ms. This makes the data table refresh often enough to pick up each message sent. As a result, we will see each of the values 1,2,3,4,5,6,7,8 in the dashboard, changing 4 times per second.

Example 2: Can Panopticon create a unique message id?

When the messages do not contain any message id:s, is there any way we can create message id:s in Panopticon? The answer is that it depends on the data content in the messages. The option you have is to make settings in the data connector to say that you want to use the value column/columns as Id Column. This will achieve a data table where each unique value combination received will result in a new row in the data table. But in our example topic, our single-field values 1-8 are repeated in a loop and we will never get more than 8 rows of data in the data table. However, if your messages do contain ever-increasing (growing) or ever-decreasing (shrinking) values, where there is a guarantee that a previously sent value can never re-occur, then you will be able to use the value column(s) as Id Column and get a data table where message after message is added as new rows in the data table.

image

image

How can I add a unique message id to my messages?

A simplistic way of creating a message id field in the messages of your topic can be by concatenating a name label that identifies the data source, such as a sensor ID, with the system clock time, for example: “sensorXYZ_2022-01-28T11:12:13.456”. This is easy to do in Panopticon Streams, for example, by adding a Calculation in your Streams application. A unique id could also be as simple as an ever-increasing numeric counter value (i.e. 1, 2, 3 etc).

Real-time Limit is less important when you have a message id

When you have a unique identifier for each message – either in a data field as part of each message, or as an effect of having non-repeated, unique values in the messages – then the Real-time Limit setting in the data connector will not have any effect on how many of the messages you will see in your data table. Since each message has a unique identifier, no value will get replaced by a value from a newer message. Instead, all of the messages queued up will get ingested into the data table at each data table refresh (at the Real-time Limit time). Example: With our topic, sending 4 messages per second and a Real-time Limit of 1000 ms, our data table would grow by 4 new rows of data each second, provided that we have a unique identifier for each message.

image

Viewing data development over time

Let’s go back to our minimum example data source, the topic that sends 4 messages per second, with a single field in the messages, looping through 1, 2, 3, 4, 5, 6, 7, 8. What should we do to be able to view what these values look like across time? How can we view this data as a timeseries in Panopticon?

Example 3: Timeseries visualization with Automatic Time Id

In the data connectors for streaming data in Panopticon Real Time, there is a setting for Time Id Column. You have the option of setting No Time Id, or selecting a column that contains time values, or using Automatic Time Id.

image

Our example topic does not contain any timestamps – so let’s investigate what we can do with Automatic Time Id. We’ll start by looking at what happens when we use the default setting of 1000 milliseconds Real-time Limit, and a Time Window of 10 seconds. We set Time Id Column to Automatic Time Id, and then we switch on “Transform to enable time series analysis”. We save the data table, and in a dashboard, we apply this data table to a timeseries visualization: In a timeseries combination graph, we add our column “value” to Visuals and select Bar as the visualization type. We now see how new data comes in, second by second – but we are missing a lot of values. Just like in Example 1 above, our data table is getting only 1 new value per second, because our Real-time Limit is set at 1000 ms. The Automatic Time Id is created and applied at data table refresh – NOT at the point of the server receiving a message. At each data table refresh, only the newest value is added to the data table, and then timestamped with an Automatic Time Id.

 image

To get the value from each the messages into the data table, we need to assure that we do a data table refresh often enough to capture each of the messages and timestamp them before another message comes in. We need to increase the sampling rate. We do this by lowering the Real-time Limit, like we did in Example 1. However, with this approach of applying Automatic Time Id values instead of have timestamp values in the messages, we run the risk of seeing irregularities in time values. There will be a risk of occasional irregular data table refresh intervals in the range of plus/minus 100 milliseconds, which will reflect in the timestamps generated.

Example 4: Automatic Time Id and unique message id

In this example, we will further clarify and unveil how Automatic Time Id values are created and applied to the Panopticon data table. We’ll use our example topic that sends messages with values 1-8 in a loop, but also extended with a second data field that contains a unique id for each message (for example created by adding a system clock timestamp with millisecond precision to each message. While a better use for such a system clock time value would be to create an actual timestamp for each message, we use it as a unique message id for the purpose of this example). We set the Id Column setting in the data connector to our message id column, and we use Automatic Time Id. We enable timeseries transform, and apply the data to a timeseries visualization. We see that thanks to the message id field in the data, no values will get replaced by newer values. Instead, the value from each single message is added to the data table. However, the positioning aling the time axis reveals that the exact same automatic timestamp is given to more than one message. Since we get a new message every 250 ms, but ingest the queue of messages into the data table only every 1000 ms, between 4 and 5 messages (mostly 5) get the exact same Automatic Timestamp value, because they were added to the data table at the same time. This behavior is by-design and a limitation that you need to be aware of. An Automatic Timestamp value can only be added after each message has been parsed, and parsing does not happen upon message reception but at the Real-time Limit interval. At that point, all messages queued up are parsed at once – and timestamped at once.

image

A word of warning

In a real-life use case, it is not recommended to use streaming messages without timestamp and apply Automatic Time Id if the messages are being published on the topic at sub-second rates. This is because a very low Real-time Limit (frequent data table refresh) is required and leaves very little margin for latency effects, where random values can “disappear” because the data table refresh did not finish fast enough. The whole round-trip of a single data table refresh includes data queue ingestion and flushing, data timeseries transformation, data transfer from server to web browser client, and dashboard graphics re-rendering. In addition, irregularities can occur in Automatic Time Id values. In a scenario with frequent messages (sub-second), you should instead take the approach of using timestamp values included in the messages, as described in Example 5 below.

Example 5: Timeseries visualization with Time Id in the data

In this example, we have extended our streaming messages with a second data field, containing a timestamp for the value. Our Panopticon data table gets 2 columns: “value”, and “timestamp”. In the Time Id Column setting in the connector, we set the column “timestamp” instead of using Automatic Time Id. Like in example 3, we have set a Time Window of 10 seconds and we have enabled timeseries transform.

With a Real-time Limit of 1000 ms, we are now getting a data table refresh and a dashboard update once every second, and each time, we get 4 new data points on screen. Each data row in our table corresponds to one specific message sent in the topic. In our timeseries visualization, each data point is positioned on the time axis at the time specified in the “timestamp” field of the message.

image

Our dashboard screen refreshes once every second, but the Panopticon server is receiving data messages as often as they are published, and each message gets added to the data table with no risk of data loss. If we want to make our topic publish messages even more frequently, we can safely do that. The messages will be received, queued up and added to the data table, at the Real-time Limit interval.

If we want to see the dashboard screen refresh more often, we can lower the Real-time Limit a bit – say for example to 500 ms. Try it out and see – too frequent screen updates can be perceptually exhausting to look at and some people even feel seasick after a while.

Recap: Messages without message id or timestamp

When connecting to a topic that publishes messages that contain only values but no message id or timestamp, the value in each new message will replace the value received in the previous message. To assure that each message is captured and reflected in the dashboard, the Real-time Limit must be less than the interval at which messages are published, but there is no benefit added by a Real-time Limit less than half the interval of the message publication. Regardless of Real-time Limit, the lack of message id values will make the Panopticon data table contain only 1 row of data. This can be mitigated by selecting all available value columns as Id Column, as long as each message has a unique value (or unique combination of several value fields).

Recap: Automatic Time Id vs timestamps coming in the messages

There is one very important difference between Automatic Time Id and timestamp values that come in each data message from the data source. The difference is that the Automatic Time Id is added to the data at the point of refreshing the data table in Panopticon. This means that even if there is a unique id per each message, all messages queued up will be stamped with the exact same Automatic Time Id. This will make messages published at different times appear as if they were published simultaneously. For this reason, it is an advantage to include timestamp values in the messages. Sub-second rate data source should always include a timestamp from the source and not use Automatic Time Id.