Blogs

Kinetica Confluent a match made in streaming heaven

Share
In this article

Every moment, trillions of entities—vehicles, stock prices, drones, weather events, and beyond—are in constant motion. Imagine the vast opportunities and insights we could uncover by monitoring these objects and detecting pivotal events as they unfold, in real time.

Such a task demands an analytical engine that can ingest high velocity data streams, execute sophisticated queries to pinpoint critical events and insights, and deliver instantaneous results to be acted upon.

This is precisely the challenge you can address with Kinetica and Confluent. Kinetica is a GPU accelerated database that excels in complex real-time analysis at scale, while Confluent, built upon Apache Kafka, provides robust data streaming capabilities. Together, they forge a powerful architecture that unlocks the full potential of streaming data.

My aim with this blog is to demonstrate the power of Kinetica and Confluent in action in three simple steps. 

  1. Stream real time GPS coordinates of 10 vehicles from Confluent into Kinetica. 
  2. Set up a materialized view inside Kinetica that uses spatial functions to detect when these vehicles get within 200 meters of certain areas in Washington DC. This view will continuously update to detect events as new GPS data streams into Kinetica.
  3. Stream geofencing events as they unfold from Kinetica to a Confluent topic for further action.

You can try all of this on your own by uploading this workbook into your free Kinetica instance. All of the data is open for access. You will need a Confluent Cloud account to complete the last step of this demo, you can set one up for free here.

Step 1: Load data from a Kafka topic on Confluent

The simulated GPS coordinates are being streamed into a Kafka topic on Confluent Cloud. Our first task is to register this data source inside Kinetica. For this we need the location of the confluent cluster, the name of the topic that we would like to ingest from and the credentials required for connecting to that cluster. I have used an optional CREATE CREDENTIAL query to store the connection credentials.

    Now that the data source for the vehicle_locations topic is created we can start loading messages from it into a table in Kinetica.

    -- Load data
    LOAD DATA INTO vehicle_locations
    FROM FILE PATH ''
    FORMAT JSON
    WITH OPTIONS (
        DATA SOURCE = 'vehicle_locations_source',
        SUBSCRIBE = TRUE,
        TYPE_INFERENCE_MODE = 'speed',
        ERROR_HANDLING = 'permissive'
    );

    Step 2: Detect when a vehicle comes close to a spatial boundary in DC

    I have created a set of arbitrary ‘fences’ in the Washington DC area. These are stored in the dc_fences table. Our next task is to detect when a vehicle comes within 200 meters of any of the fences below.

    A set of spatial fences drawn around landmarks in Washington DC

    Additionally, we would like to automate event detection so that new events are detected as they occur. We can do this easily with a materialized view that refreshes any time there is a new GPS record that streams into the vehicle_locations table in Kinetica from the vehicle_locations topic on Confluent Cloud. Inside the materialized view we use a spatial function STXY_DWITHIN to check when a vehicle’s location comes within 200 meters of any of the fences in the dc_fences table.

    This is where Kinetica shines, ingesting high velocity streams of data from Confluent Cloud and simultaneously running complex analytical queries on that data to generate real time insights. All thanks to our multi-head, lockless architecture.

    CREATE OR REPLACE MATERIALIZED VIEW fence_events 
    REFRESH ON CHANGE AS 
    SELECT 
        TRACKID, 
        wkt, 
        x, 
        y, 
        TIMESTAMP, 
        CONCAT(CHAR16(TRACKID), CONCAT(' is at ', fence_label)) as event_text 
    FROM vehicle_locations, dc_fences
    WHERE 
        STXY_DWITHIN(x, y, wkt, 200, 1) = 1 AND 
        TIMESTAMP > NOW() - INTERVAL '10' MINUTE;

    With just this one simple query we now have an always on eventing system that automatically detects spatial proximity events as they occur.

    Step 3: Stream events from Kinetica back to a new topic on Confluent

    You will need an Confluent Cloud account to complete this step. If you haven’t done so already, go ahead and set up a free account. It takes less than a few minutes to create a new topic. You can use the default settings. You also want to set up an API key that you can use to connect to your confluent cluster. Head back to Kinetica once you have an API key and a topic set up.

    Back in Kinetica let’s create a data sink that points to the topic we just created. A sink is analogous to a data source the only difference being that we are streaming data out to it from Kinetica. You should also create a separate credential object that uses the username and password from your API key (refer to the first query on how to create one).

    CREATE OR REPLACE DATA SINKconfluent_sink
    LOCATION = 'kafka://pkc-ep9mm.us-east-2.aws.confluent.cloud:9092'
    WITH OPTIONS (
        'kafka_topic_name' =  '<name of your topic>',
        credential = '<Insert your confluent credential object>'
    );
    

    The final piece is a Data Stream that will stream any changes in the materialized view we created earlier to the topic that we created on Confluent. This stream will listen for any new records (events) inserted into the materialized view we created earlier and send those to the topic .

    -- CREATE A STREAM 
    CREATE STREAM fence_events ON fence_events  
    REFRESH ON CHANGE
    WITH OPTIONS 
    (
        DATASINK_NAME = 'confluent_sink'
    );

    Now we can see these new events streaming from Kinetica into Confluent.

    Conclusion

    As we’ve seen, the combination of Kinetica’s real-time analytical power and Confluent’s robust streaming capabilities opens up new horizons in data analytics. This short demo only scratched the surface of what’s possible. 

    This combination of Confluent’s streaming capabilities and Kinetica’s complex real time analytical capabilities can be applied to a myriad of industries and scenarios, from urban planning and logistics to financial markets and emergency response systems. I encourage you to explore the potential of Kinetica and Confluent in your own projects. 

    If you have questions, thoughts, or experiences you’d like to share, please don’t hesitate to reach out or comment below. Let’s unlock the full potential of your data together .