Efficiently Analyze Infinitely Growing Data with Incremental Queries

Stream data processing, a type of data processing that is designed for infinite data sets, is an efficient way to deal with ever-growing, unbounded data that is increasingly common as an object of analysis for many businesses. In fact, more than 85% of Presto queries on Treasure Data are recurring queries that process growing data sets at scheduled intervals.

This post will walk through the implementation of a type of stream data processing that uses incremental queries to generate intermediate tables that speed up the data analysis of continuously growing data.

Problem at Hand

Suppose you have the IP addresses of your website visitors which you want to convert into countries and then make a daily summary of all visitors by country. You can easily collect such website visitor log data using Treasure Data JavaScript SDK by simply embedding a JavaScript snippet to your website.

For this task, you may be writing queries that look like this:

Then, that daily summary may be used for multitude of other data analytics like this:

You notice that:

  • The entire data of IP address from the first visitor to the most recent one is processed over and over again for each query
  • Each of these queries are taking over an hour to finish

These are the signs that those processes can be sped up by using stream data processing.

The inefficient query that is run repeatedly in this case looks like this:

— inefficient query. Computing everything from scratch
  TD_DATE_TRUNK(‘day’, time, ‘PST’) AS time,
  TD_IP_TO_COUNTRY_CODE(ip) country
FROM visitor_raw

The query does not specify a time frame, and as a result processes the entire data which takes time and slows down all subsequent processes.

Stream Data Processing

The cumbersome process of processing raw data (e.g. IP address to country) can be done much more efficiently by using intermediate tables where only the ‘new’ data (data that came in since the last query) is processed and appended to the rest of the processed data, rather than processing the entire data every time. The process goes:

  1. Create the basis of an intermediate table by processing all the data you have up to a certain point in time
  2. Example: query the IP address of visitors from day 1 up to July 1st 2017 to convert them to countries and output the result to the intermediate table.

  3. Schedule a query that processes new data that came in since the last query and appends to the intermediate table
  4. Example: every day, query the IP address of visitors in the past day to convert them to countries and append the result to the intermediate table

  5. Query the intermediate table for aggregating data
  6. Example: query the intermediate table to create a daily summary of all visitors by country

  7. (Bonus) Create a Treasure Workflow

The process looks like this:

ΔT = IP address of new visitors since the previous query
ΔT’ = country of new visitors since the previous query
T’ = intermediate table

This way, the queries that are run daily (in red) handle much less data than the previous method that queried the entire data set every time. As a result, analyses that used to take over an hour could be shortened to minutes.

Let’s look at each step in detail:

1. Process all data on hand

First we create the basis for the intermediate table by processing the raw data up to a certain point in time – in this case, one day before the current date. This will be the only time the entire data set would be queried.

Sample database: sample_db
Intermediate table: visitor_country
Raw data table: visitor_raw
— create basis for intermediate table
CREATE TABLE visitor_country AS SELECT
  TD_DATE_TRUNK(‘day’, time, ‘PST’) AS time,
  TD_IP_TO_COUNTRY_CODE(ip) country
FROM visitor_raw
WHERE TD_TIME_RANGE(time, null, ‘2017-07-01’, ‘PST’)

2. Schedule and append to intermediate table

Next, we schedule a query that takes in the raw data generated within one day since the last query, and outputs the processed data. The processed new data would be appended to the intermediate table. This will be repeated daily, each time updating the intermediate table with additional processed data since the previous query.

INSERT INTO visitor_country
  TD_DATE_TRUNK(‘day’, time, ‘PST’) AS time,
  TD_IP_TO_COUNTRY_CODE(ip) country
FROM visitor_raw

As a result, the resulting table consists of the website access time and originating country of each website visitor up to the point in time this incremental query is run.

3. Aggregate the intermediate table

The intermediate table can be queried to generate summaries of the data, pushed to data analytics integrations, and more. Since the time-consuming conversion from IP address to country is already being done efficiently on the intermediate table, queries that involved the conversion of IP address to country that took over an hour to process could be sped up to minutes. Here, we create the summary for the number of visitors from each country.

— Aggregation of the intermediate table
SELECT country, approx_distinct(userid) num_users
FROM vistitor_country

4. (Bonus) Create a Treasure Workflow

Steps 2 and 3 can be made into a Treasure Workflow which enables you to define repeatable sets of dependent queries. This way, the incremental data processing and the resulting data export can be organized into a set of scheduled procedure that is less prone to erroneous handling and has reduced end-to-end latency.

Things to Note

The use of incremental query and intermediate tables outlined above hopefully makes intuitive sense – just count the new data and append to the rest rather than count the entire growing data set every time. The deeper world of streaming data processing involves topics such as windowing to account for lags between event time and processing time for event-time-sensitive unbounded data, but that would be a topic for another blog post.

Request a demo
Yoko Sudo
Yoko Sudo
Yoko is a product marketing intern at Treasure Data.
Related Posts