Big data comprises datasets that are massive, varied, complex, and can't be handled traditionally. Big data can include both structured and unstructured data, and it is often stored in data lakes or data warehouses. As organizations grow, big data becomes increasingly more crucial for gathering business insights and analytics. The Big Data Zone contains the resources you need for understanding data storage, data modeling, ELT, ETL, and more.
The Simplest Ingredient for Developing Real-Time Front-End Applications
Real-Time Data Transfer from Apache Flink to Kafka to Druid for Analysis/Decision-Making
If you have followed any tutorial on data science in Python, chances are that you have heard about a library known as Pandas. Pandas provides a dataframe interface for your data, which is a two-dimensional tabular data structure that "shares many properties with matrices" (in the words of `R`'s definition of dataframes). Dataframes have grown over the years to be an essential component of the modern data scientist’s toolkit. In the words of Pandas creator Wes McKinney: (pandas) is what people use for data ingest, data prep, and feature engineering for machine learning models. The existence of other database systems that perform equivalent tasks isn't useful if they are not accessible to Python programmers with a convenient API. The latter part of McKinney's quote is (arguably) why Pandas and other similar models have been adopted by the data science community over more formally grounded models from the database community (e.g., relational tables). While it may be tempting to equate relational tables to dataframes, the two are fundamentally different. In Couchbase (and most other modern databases), data is conceptually represented as unordered (multi)-sets of records. The "set of records" abstraction gives databases more opportunities to execute queries more efficiently and over data LTM (larger than memory). On the other hand, (R and Pandas) dataframes are more akin to matrices in that the order of rows and columns matters. Data scientists have grown to expect support for operations like transpose (flipping the dataframe over its diagonal) and index-based access (e.g., df.iat[1, 2]), both of which are not as easy (or impossible) to perform with languages like SQL. I'll point to "Is a Dataframe Just a Table" by Yifan Wu for a deeper discussion of the two abstractions. Regardless of how dataframes have found their way into the mainstream, many Python data scientists simply prefer performing their EDA (exploratory data analysis) with Pandas. In this article, we'll explore how users more comfortable with dataframes can work with their Big Data in Couchbase's Analytics Service. We'll use the Yelp Dataset for our discussion, where we have the following collections: businesses, reviews, and checkins. The setup script can be found here. We will cover how to prepare a set of features with: the Couchbase Python SDK and Pandas (the de-facto Python dataframe standard) ; the Couchbase Python SDK and Modin (a scalable drop-in Pandas replacement) ; the Couchbase Python SDK and SQL++ ; and AFrame (an in-situ, no ETL, dataframe view for Couchbase collections). Couchbase and Pandas To start, let's assume that we want to predict whether a business is still "in business". More specifically, we want to predict the is_open flag of a business. For brevity, we omit the model training details in this article to focus on the feature engineering process. In the snippet below, we use the Couchbase Python SDK to access a local Couchbase cluster hosting an Analytics node. We use three simple SQL++ queries to load each dataset into three separate dataframes. Python from couchbase.auth import PasswordAuthenticator from couchbase.cluster import Cluster from couchbase.options import ClusterOptions import pandas as pd auth = PasswordAuthenticator(username='admin', password='password') cluster = Cluster('couchbase://127.0.0.1', ClusterOptions(auth)) result_to_df = lambda r: pd.DataFrame(list(cluster.analytics_query(r))) businesses_df = result_to_df("FROM yelp.businesses b SELECT VALUE b") checkins_df = result_to_df("FROM yelp.checkins c SELECT VALUE c") # reviews_df = result_to_df("FROM yelp.reviews r SELECT VALUE r") Note that the last line is commented out — this is its own purpose! (We will handle the review collection later.) There are approximately 7 million total reviews in our Yelp dataset and loading all reviews into memory is more than my laptop can handle. In general, McKinney suggests Pandas users should have 5-10x more RAM than the size of their dataset. Herein lies the major roadblock data scientists with Big Data must work around: the inability to elegantly reason about data LTM. Our data resides in Couchbase, and to reason about our data in Pandas we need to copy our data from Couchbase into our Python process memory. Even if we had a workstation large enough to hold our data, we must now deal with the problem of staleness. If new records arrive in Couchbase after we build our dataframes, these new records will not be used for the rest of our analysis (or more optimistically, until we rebuild our dataframes). Having addressed the elephant in the room, let us now continue our feature engineering. The goal for each section (i.e., Couchbase and Pandas, Couchbase and Modin, Couchbase and SQL++, and Couchbase and AFrame) is to find some numeric and tabular representation of all businesses that we can subsequently use as input to an ML model. Suppose that we mess around and settle on the following numeric features: An n-hot encoding of the top 10 categories for a business. The total number of check-ins a business has. The total number of reviews a business has. The lowest star rating for a review about a given business. The highest star rating for a review about a given business. The average star rating for a review about a given business. The variance in star ratings for reviews about a given business. The kurtosis in star ratings for reviews about a given business. We will start with item 1 of the list above. To find the top 10 categories shared across all businesses, we 1) convert the comma-separated categories column into a column of lists, 2) "explode/unnest" the categories columns to generate a row per list entry, 3) count the distinct values for the categories column, and 4) extract the top 10 values into a set. Python top_categories = businesses_df['categories'] \ .str.split(', ') \ .explode() \ .value_counts() \ .nlargest(10) \ .keys() top_categories = set(top_categories) top_categories Plain Text {'Active Life', 'American (New)', 'American (Traditional)', 'Arts & Entertainment', 'Auto Repair', 'Automotive', 'Bakeries', 'Bars', 'Beauty & Spas', ... 'Specialty Food'} We are now ready to encode the existence of these top 10 categories as 1/0s. Using our business dataframe, we 1) convert the comma-separated categories column into a column of lists (again), 2) filter out all categories that are not in our previously found top 50 using a set intersection, 3) "join" the set of categories (row-wise) into a pipe-separated list, and 4) use the get_dummies() function to return a dataframe where each column corresponds to an item in step 3) list. Python ml_input_1 = businesses_df['categories']\ .str.split(', ')\ .apply(lambda a: None if a is None else '|'.join(set(a).intersection(top_categories)))\ .str.get_dummies() ml_input_1.head() Plain Text Active Life American (New) American (Traditional) Arts & Entertainment ... 0 0 0 0 0 ... 1 0 0 0 0 ... 2 0 0 0 0 ... 3 0 0 0 1 ... 4 0 0 0 0 ... [5 rows x 10 columns] The next feature requires us to use the checkins_df dataframe. We 1) join our businesses_df dataframe with the checkins_df dataframe, 2) generate a list-valued column by splitting our comma-separated-string-valued field date, and 3) apply the len function to get the number of check-ins. For businesses that do not have a corresponding checkin row, we 4) set their value to 0. The (row-wise) order of ml_input_1 is identical to our result (i.e., they share the same "index"), so we can simply use the concat function along the horizontal axis (axis=1). Python result = businesses_df \ .join(checkins_df.set_index('business_id'), on='business_id') \ ['date'] \ .str.split(',') \ .map(lambda v: len(v) if type(v) is list else 0) \ .rename("Checkin Count") ml_input_2 = pd.concat([ml_input_1, result], axis=1) ml_input_2.head() Plain Text Active Life American (New) ... Skin Care Specialty Food Checkin Count 0 0 0 ... 0 0 146 1 0 0 ... 0 0 33 2 0 0 ... 0 0 19 3 0 0 ... 0 0 3 4 0 0 ... 0 0 0 [5 rows x 11 columns] Our last set of features revolves around the distribution of star ratings for a business. We define a function below called summarize which takes in a list of star ratings and returns various statistics using the scipy.stats.describe function. Python import scipy def summarize(all_stars): d = scipy.stats.describe(all_stars) return { 'Review Count': d.nobs, 'Minimum Stars': d.minmax[0], 'Maximum Stars': d.minmax[1], 'Mean Stars': d.mean, 'Star Variance': d.variance, 'Star Skewness': d.skewness, 'Star Kurtosis': d.kurtosis } As mentioned above, we cannot work with all reviews in memory. Instead, we will use the LIMIT and OFFSET clauses of SQL++ to work with chunks of 1 million reviews records. Note that this approach is not impervious to data changes at the Couchbase side. A more consistent (and performant) approach requires a duplicated reviews collection (i.e., a 3rd data copy of reviews) . In the snippet below, we end up with a dataframe (stars_df) that is row-wise aligned with the business_df dataframe and possesses a list-valued column called stars. Python import numpy as np stars_df = businesses_df[['business_id']] \ .assign(stars=[np.nan for _ in range(len(businesses_df))]) working_offset = 0 chunk_size = 1000000 while True: partial_df_1 = result_to_df(f""" FROM yelp.reviews r SELECT r.business_id, r.stars ORDER BY r.business_id ASC LIMIT {chunk_size} OFFSET {working_offset} """) if len(partial_df_1) == 0: break working_offset += chunk_size partial_df_2 = stars_df.loc[stars_df['stars'].notna()].explode('stars') partial_df_3 = pd.concat([partial_df_1, partial_df_2], axis=0) \ .groupby(['business_id']) \ .agg(list) stars_df = stars_df[['business_id']] \ .join(partial_df_3, on='business_id') Now possessing the minimum amount of review information for a given business, our final step is to apply the summarize function and concat this results in our existing feature set... Python partial_df_4 = pd.json_normalize(stars_df['stars'].apply(summarize)) ml_input_3 = pd.concat([ml_input_2, partial_df_4], axis=1) ml_input_3.head() Plain Text Active Life American (New) American (Traditional) Arts & Entertainment \ 0 0 0 0 0 1 0 0 0 0 2 0 0 0 0 3 0 0 0 1 4 0 0 0 0 ... Skin Care Specialty Food Checkin Count Review Count Minimum Stars \ 0 ... 0 0 146 70 1.0 1 ... 0 0 33 5 1.0 2 ... 0 0 19 422 1.0 3 ... 0 0 3 8 3.0 4 ... 0 0 0 9 1.0 Maximum Stars Mean Stars Star Variance Star Skewness Star Kurtosis 0 5.0 3.342857 1.619876 -0.494108 -0.624766 1 5.0 2.800000 3.200000 0.035156 -1.581055 2 5.0 4.760664 0.747808 -3.799428 13.180355 3 5.0 4.750000 0.500000 -2.267787 3.142857 4 5.0 3.111111 4.111111 -0.158662 -1.910062 [5 rows x 18 columns] ...and now we are free to use the ml_input_3 dataframe directly as training input for some model in a machine learning library (e.g., scikit-learn)! For completeness, here's a snippet of using our feature set to train a decision tree classifier: Python import sklearn x, y = ml_input_3, businesses_df['is_open'] x_train, x_test, y_train, y_test = \ sklearn.model_selection.train_test_split(x, y, test_size=0.3, random_state=1) clf = sklearn.tree.DecisionTreeClassifier() clf = clf.fit(x_train,y_train) Couchbase and Modin The Pandas feature engineering process works best if we are able to fit our data into memory. If we are unable to fit our data into memory (e.g., the reviews dataset), we need to devise specialized solutions to consider our data in chunks. The next three sections delve into cleaner alternatives that do not require us (the data scientist) to consider how large our data is for in-core and out-of-core workloads. The first approach (and the easiest to integrate for existing Pandas users) is a library called Modin. Modin is a drop-in replacement for Pandas that translates Pandas operations into computations executed using a distributed runtime engine (i.e., Ray, Dask, or MPI/Unidist). We will start with three simple steps: 1) initializing our Ray backend, 2) replacing the import pandas as pd line in our first code snippet above with import modin.pandas as pd, and 3) uncommenting the line to load our reviews_df dataframe: Python import ray ray.init() from couchbase.auth import PasswordAuthenticator from couchbase.cluster import Cluster from couchbase.options import ClusterOptions import modin.pandas as pd auth = PasswordAuthenticator(username='admin', password='password') cluster = Cluster('couchbase://127.0.0.1', ClusterOptions(auth)) result_to_df = lambda r: pd.DataFrame(list(cluster.analytics_query(r))) businesses_df = result_to_df("FROM yelp.businesses b SELECT VALUE b") checkins_df = result_to_df("FROM yelp.checkins c SELECT VALUE c") reviews_df = result_to_df("FROM yelp.reviews r SELECT VALUE r") The snippet above a) moves the data out of Couchbase into b) our Python process memory, and then our Python process memory to c) Modin's Ray backend. As we will see, the subsequent operations are more performant (thanks to Modin's task parallelism) at the cost of an even more expensive loading step. Furthermore, while out-of-core operations are possible in Modin, Modin delegates this spillage to the operating system (i.e., a workload-unaware component). The next two sections will address how we can simply operate on our data within Couchbase (in-situ) using Couchbase's built-in execution engine purposed for in-core and out-of-core distributed workloads. Having addressed the "new elephant in the second room," let us continue with our feature engineering. For all non-review-related features, our code remains nearly the same (there is a bug in the get_dummies() function for Modin, so we defer the computation of the ml_input_1 dataframe to Pandas). While we could keep our hand-crafted solution that chunks reviews (Modin is meant to be a drop-in replacement), we will showcase how Modin allows us to work out-of-core with large dataframes like reviews_df. In the snippet below, we 1) aggregate all review stars by their business_id values into list-valued columns, 2) apply our summarize function, 3) convert the application of our summarize function into 7 separate columns, 4) join this result (review_summary_df) with our original businesses_df dataframe, and 5) add these 7 new columns to our ml_input_2 dataframe to produce our final ml_input_3. Python reviews_agg_df = reviews_df[['business_id', 'stars']] \ .groupby(['business_id']) \ .agg(list) \ .reset_index() review_stars_df = pd.json_normalize( reviews_agg_df \ ['stars'] \ .apply(summarize) ) review_summary_df = pd.concat([reviews_agg_df, review_stars_df], axis=1) \ .drop(columns=['stars']) \ .set_index('business_id') business_summary_df = businesses_df[['business_id']] \ .join(review_summary_df, on='business_id') ml_input_3 = pd.concat([ml_input_2, business_summary_df], axis=1) ml_input_3.head() Plain Text Active Life American (New) American (Traditional) Arts & Entertainment \ 0 0 0 0 0 1 0 0 0 0 2 0 0 0 0 3 0 0 0 1 4 0 0 0 0 ... Skin Care Specialty Food Checkin Count Review Count Minimum Stars \ 0 ... 0 0 146 70 1.0 1 ... 0 0 33 5 1.0 2 ... 0 0 19 422 1.0 3 ... 0 0 3 8 3.0 4 ... 0 0 0 9 1.0 Maximum Stars Mean Stars Star Variance Star Skewness Star Kurtosis 0 5.0 3.342857 1.619876 -0.494108 -0.624766 1 5.0 2.800000 3.200000 0.035156 -1.581055 2 5.0 4.760664 0.747808 -3.799428 13.180355 3 5.0 4.750000 0.500000 -2.267787 3.142857 4 5.0 3.111111 4.111111 -0.158662 -1.910062 [5 rows x 18 columns] The snippet above is much more elegant than our hand-crafted solution from the previous section. Data scientists are no longer bound by their data size, and data engineers can scale up a data scientist's feature extraction code by configuring the Modin backend. Modin even offers (experimental) support for instantiating Ray clusters on AWS. The impact of data transfer between i) Couchbase, ii) Python, and iii) the Modin backend (Ray) should not be understated. The snippet above runs slower than our handcrafted approach because Modin (Ray) will naively keep the large reviews_df dataframe in its memory. If we selectively choose which review fields to pull from Couchbase... reviews_df = result_to_df("FROM yelp.reviews r SELECT r.business_id, r.stars") ...we observe a ~3x speedup in feature extraction time (3min to 1min), but we are no longer working with just the dataframe abstraction. Couchbase and SQL++ The previous two sections (more or less) cover how data scientists operate on their data using dataframes. In the first section, we illustrated how to work with Big Data in Pandas by hand-writing a chunking process ourselves. In the second section, we illustrated how we can use Modin as a drop-in replacement for Pandas to lower the barrier required for data scientists working with Big Data. In this section, we will show how data scientists can work with their Couchbase data in-situ using SQL++ instead of dataframes. As a reminder, our goal is to find some numeric and tabular representation of all businesses that we can subsequently use as input to an ML model. For the previous section, this representation was a dataframe. For this section, our goal is to build a SQL++ query that will return a Python list of numeric-valued iterables. First, let us find the top 10 categories for all businesses. The query below a) iterates over all businesses, b) splits the comma-separated string-valued field categories into an array, c) UNNESTs (a SQL++ operation equivalent to the dataframe operation explode) the array we just found, d) uses a GROUP BY clause on the split categories and the aggregate function COUNT(*) to count the number of instances per category, and e) uses the ORDER BY and the LIMIT clauses to return the top 10 keys of the groups (the category). Python results = cluster.analytics_query(""" FROM yelp.businesses b, SPLIT(b.categories, ",") c LET clean_c = TRIM(c) GROUP BY clean_c SELECT VALUE clean_c ORDER BY COUNT(*) DESC LIMIT 10 """) top_categories = list(results) top_categories Plain Text ['Restaurants', 'Food', 'Shopping', 'Home Services', 'Beauty & Spas', 'Nightlife', 'Health & Medical', 'Local Services', ... 'Skin Care'] Using the categories we just found, we will assemble a list of SELECT clause projections that return 1 if a category is found and 0 otherwise. Below, is our query result list(result) is equivalent (as a list of iterables) to the ml_input_1 dataframe from the previous two sections. Python results = cluster.analytics_query(f""" FROM yelp.businesses b SELECT {','.join(f'TO_BIGINT(CONTAINS(b.categories, "{x}")) AS `{x}`' for x in top_categories)} """) list(results)[0] JSON { "Active Life": 0, ... "Shopping": 1, "Skin Care": 0, "Specialty Food": 0 } We will now build upon the SQL++ query above and add a subquery to compute the total number of check-ins. In the subquery below, we a) iterate over the check-ins collection, b) split and UNNEST the comma-separated-string-valued field date, and c) correlate the subquery in the WHERE clause via the conjunct c.business_id = b.business_id. In contrast to standard SQL, SQL++ queries will always return a multiset of records. Consequently, we use the SQL++ function ARRAY_COUNT around our subquery to count the number of check-ins a business has. Our query result list(result) at this stage is equivalent (as a list of iterables) to the ml_input_2 dataframe from the previous two sections. Python results = cluster.analytics_query(f""" FROM yelp.businesses b SELECT {','.join(f'TO_BIGINT(CONTAINS(b.categories, "{x}")) AS `{x}`' for x in top_categories)}, ARRAY_COUNT(( FROM yelp.checkins c, SPLIT(c.date, ",") d WHERE c.business_id = b.business_id SELECT 1 )) AS `Checkin Count` """) list(results)[0] JSON { ... "Caterers": 0, "Checkin Count": 146, "Chicken Wings": 0, ... } The last seven features we are interested in pertaining to some statistics that were computed using scikit, a Python library. Couchbase users can leverage Python UDFs for Analytics (currently in developer preview) to execute Python code over their data using SQL++, but we will write a UDF in SQL++ that replicates the same functionality as our summarized function above. SQL CREATE ANALYTICS FUNCTION yelp.summarize(all_stars) { { "Review Count": ARRAY_COUNT(all_stars), "Minimum Stars": ARRAY_MIN(all_stars), "Maximum Stars": ARRAY_MAX(all_stars), "Mean Stars": ARRAY_AVG(all_stars), "Star Variance": ARRAY_VAR_POP(all_stars), "Star Skewness": ARRAY_SKEWNESS(all_stars), "Star Kurtosis": ARRAY_KURTOSIS(all_stars) } }; Finally, we will add to our previous SQL++ query to finish our feature-extracting query. We again start with a correlated subquery that JOINs reviews and businesses to return all-star ratings associated with a business. Per business, the results of this subquery are (conceptually) given to the yelp.summarize call. To "promote" the results of the summarized calls to the main SELECT, we leverage the v.* feature of SQL++ SELECT clause projections. Our query result list(result) is equivalent (as a list of iterables) to the ml_input_3 dataframe from the previous two sections, and is ready to be used directly as training input for some model in a machine learning library (e.g., scikit-learn): Python results = cluster.analytics_query(f""" FROM yelp.businesses b LET review_features = yelp.summarize(( FROM yelp.reviews r WHERE r.business_id = b.business_id SELECT VALUE r.stars )) SELECT {','.join(f'TO_BIGINT(CONTAINS(b.categories, "{x}")) AS `{x}`' for x in top_categories)}, ARRAY_COUNT(( FROM yelp.checkins c, SPLIT(c.date, ",") d WHERE c.business_id = b.business_id SELECT 1 )) AS `Checkin Count`, review_features.* """) list(results)[0] JSON { "Review Count": 70, "Mean Stars": 3.342857142857143, "Star Variance": 1.596734693877551, "Star Skewness": -0.49410799997654287, "Star Kurtosis": -0.624766331689814, "Minimum Stars": 1, "Maximum Stars": 5, "Restaurants": 1, "Food": 0, "Shopping": 0, ... "Checkin Count": 146 } Compared to our previous two sections, this feature collection process runs in the 40s as opposed to the 1-minute execution time from Modin and 3-minute execution time from Pandas. Couchbase and AFrame The approach above works best for users who prefer the SQL++ abstraction to the dataframe abstraction. All the computation is performed in-situ and there is no expensive loading step into Python. For users who prefer the dataframe abstraction and want to work with data in situ, AFrame is a viable alternative to using SQL++. In a nutshell, AFrame is a Python library that provides a dataframe syntax for collections in Couchbase Analytics. Behind the scenes, AFrame translates dataframe operations into SQL++ queries that are evaluated lazily, essentially deferring all dataframe operations to Couchbase Analytics itself. While not a drop-in replacement like Modin, AFrame gives data scientists a Pandas-esque API for users who want the performance of the previous section. To start, we will clone the AFrame repository and install AFrame using pip: Shell git clone https://github.com/psinthong/AFrame.git cd AFrame pip install . Once installed, we will import AFrame and define a dataframe for all three of our collections using the CBAnalyticsConnector connector. Python from aframe import AFrame from aframe.connector import CBAnalyticsConnector connector = CBAnalyticsConnector('http://localhost:8095', 'admin', 'password') businesses_df = AFrame(dataverse='yelp', dataset='businesses', connector=connector) checkins_df = AFrame(dataverse='yelp', dataset='checkins', connector=connector) reviews_df = AFrame(dataverse='yelp', dataset='reviews', connector=connector) We are now ready to define some features! First, let us find the top 10 categories using our businesses_df dataframe. In the snippet below, we 1) convert the comma-separated categories column into a column of lists, 2) "explode/unnest" the categories columns to generate a row per list entry, 3) count the distinct values for the categories column, and 4) extract the top 10 values into a set. Python top_categories = businesses_df['categories'] top_categories['categories'] = top_categories \ .map('split', ', ') top_categories = top_categories \ .explode('categories') \ .value_counts('categories') \ .nlargest(10, 'count') \ ['categories'] top_categories = set(top_categories) top_categories We are now ready to (again) encode the existence of these top 10 categories as 1/0s. The approach we will use with AFrame involves using a for loop to define a column for each category. The Analytics functions used here are contains and to_bigint. The ml_input_1 dataframe below (or more accurately, ml_input_1.drop('business_id').toPandas()) is equivalent to the ml_input_1 dataframes from the Pandas and Modin sections: Python ml_input_1 = businesses_df[['business_id', 'categories']] for category in top_categories: ml_input_1[category] = ml_input_1['categories'] \ .map('contains', category) \ .map('to_bigint') ml_input_1 = ml_input_1.drop('categories') ml_input_1.head() Behind the scenes, AFrame has assembled the following query to execute on Couchbase Analytics. Thanks to the composability of SQL++, AFrame (and other tools built on top of Couchbase) can define deep nested queries like the snippet below. SQL SELECT VALUE OBJECT_REMOVE(t, 'categories') FROM ( SELECT t.*, to_bigint(contains(categories, "Home Services")) AS `Home Services` FROM ( SELECT t.*, to_bigint(contains(categories, "Bars")) AS `Bars` FROM ... . . . ) t ) t In the snippet below, we 1) count the number of check-ins per business and 2) merge/join our results into the ml_input_1 dataframe from before to produce a new ml_input_2 dataframe. Again, the ml_input_2 dataframe below is equivalent to the ml_input_2 dataframe objects found in the first two sections. Python result = checkins_df result['date'] = result \ ['date'] \ .map('split', ', ') \ .map('array_count') ml_input_2 = ml_input_1 \ .merge(result, left_on='business_id', right_on='business_id', how='left') ml_input_2.head() To conclude, we will define our remaining seven features involving the reviews_df dataframe. In the snippet below, we use AFrame grouping and several Analytics aggregate functions that cover the statistics provided by yelp.summarize above. Python result = reviews_df[['business_id', 'stars']] \ .groupby('business_id') \ .agg({'stars': ['count', 'min', 'max', 'mean', 'var', 'skewness', 'kurtosis']}) \ .rename({'count_stars': 'Review Count', 'min_stars': 'Minimum Stars', 'max_stars': 'Maximum Stars', 'avg_stars': 'Mean Stars', 'var_stars': 'Star Variance', 'skewness_stars': 'Star Skewness', 'kurtosis_stars': 'Star Kurtosis'}) ml_input_3 = ml_input_2 \ .merge(result, left_on='business_id', right_on='business_id', how='left') \ .drop('business_id') ml_input_3.head() AFrame was a research project that (unfortunately) lacked the commercial support given to projects like Modin. Consequently, the performance AFrame offers here is a little under 2 minutes. While the generated SQL++ query is equivalent (or near-equivalent) to the handcrafted SQL++ query in the previous section, most databases (including Couchbase Analytics) have historically had trouble optimizing nested queries (see work from Elliott, Cheng, Thomas-Ogbuji, and Ozoyoglu here for research in the context of SPARQL to SQL)... ...that's not to say that AFrame should be abandoned though — the exercise in this section has shown that AFrame is a very capable "dataframe to SQL++ query" generator. The generated SQL++ queries can then be used to guide the authoring of cleaner SQL++ queries that are executed using the Couchbase Python SDK. While not as clean as Modin, this workflow (AFrame + Couchbase SDK) gives data scientists the resource efficiency of the previous section and the dataframe user model of the first two sections. Conclusion It seems we are still in need of that "silver bullet": an API that gives us efficient out-of-core execution with a dataframe user model. In this article, we looked at four different approaches for generating features using data stored in Couchbase Analytics: 1) Pandas, 2) Modin, 3) SQL++, and 4) AFrame. Pandas, the de facto dataframe standard for Python, has massive adoption by the data science community but suffers with Big Data; Modin, a (near) drop-in replacement for Pandas that allows users to scale their Pandas workflows at the cost of a (potentially) expensive loading step; SQL++, a non-dataframe user model that gives users the ability to express in-situ efficient execution of Couchbase data; and AFrame, a dataframe wrapper that translates dataframe operations into SQL++ queries.
In today's data-driven world, efficient data processing is paramount for organizations seeking insights and making informed decisions. Google Cloud Platform (GCP) offers powerful tools such as Apache Airflow and BigQuery for streamlining data processing workflows. In this guide, we'll explore how to leverage these tools to create robust and scalable data pipelines. Setting up Apache Airflow on Google Cloud Platform Apache Airflow, an open-source platform, orchestrates intricate workflows. It allows developers to define, schedule, and monitor workflows using Directed Acyclic Graphs (DAGs), providing flexibility and scalability for data processing tasks. Setting up Airflow on GCP is straightforward using managed services like Cloud Composer. Follow these steps to get started: Create a Google Cloud Composer environment: Navigate to the Cloud Composer section in the GCP Console and create a new environment. Choose the desired configuration options, such as the number of nodes and machine type. Install additional Python packages: Airflow supports custom Python packages for extending its functionality. You can install additional packages using the requirements.txt file or by directly installing them from within Airflow's web interface. Configure connections: Airflow uses connection objects to connect to external systems like BigQuery. Configure the necessary connections in Airflow's web interface by providing credentials and connection details. Designing Data Pipelines With Apache Airflow Once Airflow is set up, you can design data pipelines using Directed Acyclic Graphs (DAGs). A DAG represents a workflow composed of tasks, where each task performs a specific data processing operation. Here's how to design data pipelines with Airflow: Define DAGs: Create Python scripts to define DAGs in Airflow. Each DAG script should import the necessary modules and define tasks using operators provided by Airflow, such as BigQueryOperator for interacting with BigQuery. Python from airflow import DAG from airflow.operators.dummy_operator import DummyOperator from airflow.operators.bigquery_operator import BigQueryOperator from airflow.contrib.operators.bigquery_to_gcs import BigQueryToGCSOperator from datetime import datetime # Define the default arguments for the DAG default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2024, 3, 3), 'email_on_failure': False, 'email_on_retry': False, 'retries': 1 } # Instantiate the DAG object dag = DAG( 'bigquery_data_pipeline', default_args=default_args, description='A DAG for data pipeline with BigQuery tasks', schedule_interval='@daily' ) # Define tasks start_task = DummyOperator(task_id='start_task', dag=dag) end_task = DummyOperator(task_id='end_task', dag=dag) # Define BigQuery tasks bq_query_task1 = BigQueryOperator( task_id='bq_query_task1', sql='SELECT * FROM your_table', destination_dataset_table='your_project.your_dataset.output_table1', write_disposition='WRITE_TRUNCATE', dag=dag ) bq_query_task2 = BigQueryOperator( task_id='bq_query_task2', sql='SELECT * FROM your_table WHERE date > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 DAY)', destination_dataset_table='your_project.your_dataset.output_table2', write_disposition='WRITE_APPEND', dag=dag ) # Define task dependencies start_task >> bq_query_task1 >> bq_query_task2 >> end_task In this example: We define a DAG named bigquery_data_pipeline with a daily schedule interval using the schedule_interval parameter set to '@daily'. Two dummy tasks (start_task and end_task) are defined using DummyOperator. These tasks serve as placeholders and are not associated with any actual processing. Two BigQuery tasks (bq_query_task1 and bq_query_task2) are defined using BigQueryOperator. These tasks execute SQL queries on BigQuery and store the results in destination tables. Each BigQueryOperator specifies the SQL query to be executed (SQL parameter), the destination dataset, and table (destination_dataset_table parameter), and the write disposition (write_disposition parameter). Task dependencies are defined such that bq_query_task1 must run before bq_query_task2, and both bq_query_task1 and bq_query_task2 must run between start_task and end_task. By defining DAGs in this manner, you can create robust data pipelines in Apache Airflow that interact with BigQuery for data processing and analysis. Adjust the SQL queries and destination tables as needed to suit your specific use case. Configure task dependencies: Specify task dependencies within DAGs to ensure proper execution order. Airflow allows you to define dependencies using the set_upstream and set_downstream methods. Python # Define tasks task1 = DummyOperator(task_id='task1', dag=dag) task2 = DummyOperator(task_id='task2', dag=dag) task3 = DummyOperator(task_id='task3', dag=dag) task4 = DummyOperator(task_id='task4', dag=dag) # Set task dependencies task1.set_downstream(task2) task1.set_downstream(task3) task2.set_downstream(task4) task3.set_downstream(task4) In this example: We create a DAG named sample_dag with a daily schedule interval. Four tasks (task1, task2, task3, task4) are defined using DummyOperator, which represents placeholder tasks. Task dependencies are configured using the set_downstream method. In this case, task2 and task3 are downstream of task1, and task4 is downstream of both task2 and task3. This setup ensures that task1 will be executed first, followed by either task2 or task3 (as they are parallelized), and finally task4 will be executed after both task2 and task3 are completed. Set task schedules: Configure task schedules within DAGs to control when they should be executed. Airflow supports various scheduling options, including cron expressions and interval schedules. Python # Set task schedules task1_execution_time = datetime(2024, 3, 3, 10, 0, 0) # Task 1 scheduled to run at 10:00 AM task2_execution_time = task1_execution_time + timedelta(hours=1) # Task 2 scheduled to run 1 hour after Task 1 task3_execution_time = task1_execution_time + timedelta(hours=2) # Task 3 scheduled to run 2 hours after Task 1 task1.execution_date = task1_execution_time task2.execution_date = task2_execution_time task3.execution_date = task3_execution_time # Define task dependencies task1.set_downstream(task2) task2.set_downstream(task3) In this example: We create a DAG named sample_scheduled_dag with a daily schedule interval using the schedule_interval parameter set to '@daily' in configuring task Dependencies. Task schedules are configured by specifying the execution_date for each task. task1 is scheduled to run at 10:00 AM, task2 is scheduled to run 1 hour after task1, and task3 is scheduled to run 2 hours after task1. Task dependencies are set up such that task2 is downstream of task1, and task3 is downstream of task2. By configuring task schedules within the DAG, you can control when each task should be executed, allowing for precise orchestration of data processing workflows in Apache Airflow. Integrating With BigQuery for Data Processing BigQuery, offered by Google Cloud, is a fully managed and serverless data warehouse solution. It offers high-performance SQL queries and scalable storage for analyzing large datasets. Here's how to integrate BigQuery with Apache Airflow for data processing: Execute SQL queries: Using the BigQueryOperator, you can execute SQL queries on BigQuery as part of your Apache Airflow DAGs, enabling seamless integration of data processing workflows with Google BigQuery. Adjust the SQL queries and destination tables as needed to match your specific requirements. Load and export data: Airflow allows you to load data into BigQuery from external sources or export data from BigQuery to other destinations. Use operators like BigQueryToBigQueryOperator and BigQueryToGCSOperator for data loading and exporting operations. Python # Define BigQuery tasks for loading data from external source bq_load_external_data_task = BigQueryToBigQueryOperator( task_id='bq_load_external_data', source_project_dataset_table='external_project.external_dataset.external_table', destination_project_dataset_table='your_project.your_dataset.internal_table', write_disposition='WRITE_TRUNCATE', create_disposition='CREATE_IF_NEEDED', dag=dag ) # Define BigQuery tasks for exporting data to Google Cloud Storage (GCS) bq_export_to_gcs_task = BigQueryToGCSOperator( task_id='bq_export_to_gcs', source_project_dataset_table='your_project.your_dataset.internal_table', destination_cloud_storage_uris=['gs://your_bucket/your_file.csv'], export_format='CSV', dag=dag ) # Define task dependencies start_task >> bq_load_external_data_task >> bq_export_to_gcs_task >> end_task Monitor and manage jobs: Airflow provides built-in monitoring and logging capabilities for managing BigQuery jobs. Monitor job statuses, view logs, and handle job failures using Airflow's web interface or command-line tools. Here's how you can effectively monitor and manage BigQuery jobs in Airflow: 1. Airflow Web Interface DAG Runs Page: The Airflow web interface provides a "DAG Runs" page where you can view the status of each DAG run. This includes information on whether the DAG run succeeded, failed, or is currently running. Task Instance Logs: You can access logs for each task instance within a DAG run. These logs provide detailed information about task execution, including any errors or exceptions encountered. Graph View: The graph view in the Airflow UI provides a visual representation of the DAG and its task dependencies. You can use this view to understand the workflow and identify any bottlenecks or issues. 2. Command-Line Interface (CLI) airflow dags list: Use the airflow dags list command to list all available DAGs in your Airflow environment. This command provides basic information about each DAG, including its status and last execution date. airflow dags show: The airflow dags show command allows you to view detailed information about a specific DAG, including its tasks, task dependencies, and schedule intervals. airflow tasks list: Use the airflow tasks list command to list all tasks within a specific DAG. This command provides information about each task, such as its current state and execution date. airflow task logs: You can access task logs using the airflow task logs command. This command allows you to view logs for a specific task instance, helping you troubleshoot errors or failures. 3. Logging and Alerts Airflow logging: Airflow logs all task executions and DAG runs, making it easy to track job progress and identify issues. You can configure logging levels and handlers to control the verbosity and destination of logs. Alerting: Configure alerts and notifications to be triggered based on specific events, such as task failures or DAG run statuses. You can use tools like Slack, email, or PagerDuty to receive alerts and take appropriate actions. 4. Monitoring Tools Stackdriver monitoring: If you're running Airflow on Google Cloud Platform, you can use Stackdriver Monitoring to monitor the health and performance of your Airflow environment. This includes metrics such as CPU usage, memory usage, and task execution times. Prometheus and Grafana: Integrate Airflow with Prometheus and Grafana for advanced monitoring and visualization of performance metrics. This allows you to create custom dashboards and gain insights into the behavior of your Airflow jobs. By leveraging these monitoring and management capabilities provided by Apache Airflow, you can effectively monitor job statuses, view logs, and handle job failures, ensuring the reliability and efficiency of your data workflows, including those involving BigQuery. Best Practices for Streamlining Data Processing To ensure efficient data processing workflows on Google Cloud Platform, consider the following best practices: 1. Optimize Query Performance Use efficient SQL queries: Craft SQL queries that leverage BigQuery's capabilities efficiently. Optimize joins, aggregations, and filtering conditions to minimize data scanned and improve query performance. Leverage partitioning and clustering: Partition tables based on frequently filtered columns to reduce query costs and improve query performance. Utilize clustering to organize data within partitions for further optimization. Utilize query caching: Take advantage of BigQuery's caching mechanism to avoid redundant computation. Reuse cached results for identical queries to reduce query execution time and costs. 2. Scale Resources Dynamically Auto-scaling: Configure Airflow and associated resources to scale automatically based on workload demands. Use managed services like Cloud Composer on GCP, which can automatically scale Airflow clusters based on the number of active DAGs and tasks. Preemptible VMs: Utilize preemptible VMs (preemptible instances) for batch processing tasks that can tolerate interruptions. Preemptible VMs are cost-effective and can significantly reduce resource costs for non-critical workloads. 3. Implement Error Handling Task retries: Configure Airflow tasks to retry automatically upon failure. Use exponential backoff strategies to gradually increase retry intervals and avoid overwhelming downstream services. Error handling mechanisms: Implement robust error handling mechanisms within data pipelines to handle transient errors, network issues, and service interruptions gracefully. Utilize Airflow's built-in error handling features like on_failure_callback to execute custom error handling logic. Monitoring alerts: Set up monitoring alerts and notifications to proactively detect and respond to pipeline failures. Use GCP's monitoring and alerting services like Cloud Monitoring and Stackdriver Logging to monitor Airflow task execution and trigger alerts based on predefined conditions. 4. Monitor and Tune Performance Performance metrics monitoring: Monitor pipeline performance metrics, including query execution time, data processing throughput, and resource utilization. Use GCP's monitoring tools to track performance metrics in real-time and identify performance bottlenecks. Fine-tune configurations: Regularly review and fine-tune pipeline configurations based on performance monitoring data. Optimize resource allocation, adjust parallelism settings, and tweak query parameters to improve overall performance. Capacity planning: Perform capacity planning exercises to ensure that resources are provisioned optimally to meet workload demands. Scale resources up or down as needed based on historical usage patterns and projected growth. Conclusion By leveraging Apache Airflow and BigQuery on Google Cloud Platform, developers can streamline data processing workflows and build scalable data pipelines for analytics and decision-making. Follow the guidelines outlined in this developer guide to design efficient data pipelines, integrate with BigQuery, and implement best practices for optimizing performance and reliability. With the right tools and practices in place, organizations can unlock the full potential of their data assets and drive business success in the cloud.
The modern data stack represents the evolution of data management, shifting from traditional, monolithic systems to agile, cloud-based architectures. It's designed to handle large amounts of data, providing scalability, flexibility, and real-time processing capabilities. This stack is modular, allowing organizations to use specialized tools for each function: data ingestion, storage, transformation, and analysis, facilitating a more efficient and democratized approach to data analytics and business operations. As businesses continue to prioritize data-driven decision-making, the modern data stack has become integral to unlocking actionable insights and fostering innovation. The Evolution of Modern Data Stack The Early Days: Pre-2000s Companies use big, single systems to keep and manage their data. These were good for everyday business tasks but not so much for analyzing lots of data. Data was stored in traditional relational databases like Oracle, IBM DB2, and Microsoft SQL Server. The Big Data Era: Early 2000s - 2010s This period marked the beginning of a shift towards systems that could handle massive amounts of data at high speeds and in various formats. We started to see a lot more data from all over, and it was coming in fast. New tech like Hadoop helped by spreading out the data work across many computers. The Rise of Cloud Data Warehouses: Mid-2010s Cloud computing started to revolutionize data storage and processing. Cloud data warehouses like Amazon Redshift and Google BigQuery offered scalability and flexibility, changing the economics and speed of data analytics. Also, Snowflake, a cloud-based data warehousing startup, emerged, offering a unique architecture separating computing and storage. The Modern Data Stack: Late 2010s - Present The modern data stack took shape with the rise of ELT processes, SaaS-based data integration tools, and the separation of storage and compute. This era saw the proliferation of tools designed for specific parts of the data lifecycle, enabling a more modular and efficient approach to data management. Limitations of Traditional Data Systems In my data engineering career, across several organizations, I've extensively worked with Microsoft SQL Server. This section will draw from those experiences, providing a personal touch as I recount the challenges faced with this traditional system. Later, we'll explore how the Modern Data Stack (MDS) addresses many of these issues; some solutions were quite a revelation to me! Scalability Traditional SQL Server deployments were often hosted on-premises, which meant that scaling up to accommodate growing data volumes required significant hardware investments and could lead to extended downtime during upgrades. What's more, when we had less data to deal with, we still had all these extra hardware that we didn't really need. But we were still paying for them. It was like paying for a whole bus when you only need a few seats. Complex ETL SSIS was broadly used for ETL; while it is a powerful tool, it had certain limitations, especially when compared to more modern data integration solutions. Notably, Microsoft SQL Server solved a lot of these limitations in Azure Data Factory and SQL Server Data Tools (SSDT). API calls: SSIS initially lacked direct support for API calls. Custom scripting was required to interact with web services, complicating ETL processes. Memory allocation: SSIS jobs needed careful memory management. Without enough server memory, complex data jobs could fail. Auditing: Extensive auditing within SSIS packages was necessary to monitor and troubleshoot, adding to the workload. Version control: Early versions of SSIS presented challenges with version control integration, complicating change tracking and team collaboration. Cross-platform accessibility: Managing SSIS from non-Windows systems was difficult, as it was a Windows-centric tool. Maintenance Demands The maintenance of on-premises servers was resource-intensive. I recall the significant effort required to ensure systems were up-to-date and running smoothly, often involving downtime that had to be carefully managed. Integration Integrating SQL Server with newer tools and platforms was not always straightforward. It sometimes required creative workarounds, which added to the complexity of our data architecture. How the Modern Data Stack Solved My Data Challenges The Modern Data Stack (MDS) fixed a lot of the old problems I had with SQL Server. Now, we can use the cloud to store data, which means no more spending on big, expensive servers we might not always need. Getting data from different places is easier because there are tools that do it all for us, and there is no more tricky coding. When it comes to sorting and cleaning up our data, we can do it straight into the database with simple commands. This avoids the headaches of managing big servers or digging through tons of data to find a tiny mistake. And when we talk about keeping our data safe and organized, the MDS has tools that make this super easy and way less of a chore. So with the MDS, we're saving time, we can move quicker, and it's a lot less hassle all around. It's like having a bunch of smart helpers who take care of the tough stuff so we can focus on the cool part—finding out what the data tells us. Components of the Modern Data Stack MDS is made up of various layers, each with specialized tools that work together to streamline data processes. Data Ingestion and Integration The extraction and loading of data from diverse sources, including APIs, databases, and SaaS applications. Ingestion tools fivetran, stitch, airbyte, segment, etc. Data Storage Modern cloud data warehouses and data lakes offer scalable, flexible, and cost-effective storage solutions. Cloud Data Warehouses Google Bigquery, Snowflake, Redshift, etc. Data Transformation Tools like dbt (data build tool) enable transformation within the data warehouse using simple SQL, improving upon traditional ETL processes. Data Analysis and Business Intelligence The analytics and Business Intelligence tools allow for advanced data exploration, visualization, and sharing of insights across the organization. Business Intelligence Tools Tableau, Looker, Power BI, Good Data Data Extraction and Reverse ETL Enables organizations to operationalize their warehouse data by moving it back into business applications, driving action from insights. Reverse ETL tools Hightouch, Census Data Orchestration Platforms that help automate and manage data workflows, ensuring that the right data is processed at the right time. Orchestration Tools Airflow, Astronomer, Dagster, AWS Step Functions Data Governance and Security Data governance focuses on the importance of managing data access, ensuring compliance, and protecting data within the MDS. Data Governance also provides comprehensive management of data access, quality, and compliance while offering an organized inventory of data assets that enhances discoverability and trustworthiness. Data Catalog Tools Alation (for data cataloging), Collibra (for governance and cataloging), Apache Atlas. Data Quality Ensures data reliability and accuracy through validation and cleaning, providing confidence in data-driven decision-making. Data Quality Tools: Talend, Monte Carlo, Soda, Anomolo, Great Expectations Data Modeling Assists in designing and iterating database schemas easily, supporting agile and responsive data architecture practices. Modeling Tools Erwin, SQLDBM Conclusion: Embracing MDS With Cost Awareness The Modern Data Stack is pretty amazing; it's like having a Swiss army knife for handling data. It definitely makes things faster and less of a headache. But while it's super powerful and gives us a lot of cool tools, it's also important to keep an eye on the price tag. The pay-as-you-go pricing of the cloud is great because we only pay for what we use. But, just like a phone bill, if we're not careful, those little things can add up. So, while we enjoy the awesome features of the MDS, we should also make sure to stay smart about how we use them. That way, we can keep saving time without any surprises when it comes to costs.
The research company Forrester defines data streaming platforms as a new software category in a new Forrester Wave. Apache Kafka is the de facto standard used by over 100,000 organizations. Plenty of vendors offer Kafka platforms and cloud services. Many complementary open-source stream processing frameworks like Apache Flink and related cloud offerings emerged. Competitive technologies like Pulsar, Redpanda, or WarpStream try to get market share by leveraging the Kafka protocol. This blog post explores the data streaming landscape of 2024 to summarize existing solutions and market trends. The end of the article gives an outlook on potential new entrants in 2025. Join the data streaming community and stay informed about new blog posts by subscribing to my newsletter. Data Streaming Is a New Software Category Real-time data beats slow data. That's true across almost all use cases in any industry. Event-driven applications powered by data streaming are the new black. This approach increases the business value as the overall goal by increasing revenue, reducing cost, reducing risk, or improving the customer experience. Plenty of software categories and related data platforms exist to process and analyze data: Database: Store and execute transactional workloads. Data Warehouse: Processing structured historical data to create recurring reports and unique insights. Data Lake: Processing structured and semi- or unstructured big data sets with batch processing to create recurring reports and unique insights. Lakehouse: A mix of data warehouse and data lake to process all data on one platform. Data Streaming: Continuously process data in motion and provide data consistency across communication paradigms (like real-time, batch, and request-response) instead of storing and analyzing data only at rest. Of course, these data platforms often overlap a bit. I did a complete blog series exploring the use cases and how they complement each other. Data Warehouse vs. Data Lake vs. Data Streaming – Friends, Enemies, Frenemies? Data Streaming for Data Ingestion into the Data Warehouse and Data Lake Data Warehouse Modernization: From Legacy On-Premise to Cloud-Native Infrastructure Case Studies: Cloud-native Data Streaming for Data Warehouse Modernization Lessons Learned from Building a Cloud-Native Data Warehouse The Forrester Wave™: Streaming Data Platforms, Q4 2023 Forrester is a leading research and advisory company that provides insights and analysis on various aspects of technology, business, and market trends. The company is known for its in-depth analysis, market research reports, and frameworks that help organizations navigate the rapidly changing landscape of technology and business. Businesses and IT leaders often use Forrester’s research to understand market trends, evaluate technology solutions, and develop strategies to stay competitive in their respective industries. In December 2023, the research company published "The Forrester Wave™: Streaming Data Platforms, Q4 2023". Get free access to the report here. The leaders are Microsoft, Google, and Confluent, followed by Oracle, Amazon, Cloudera, and a few others. You might agree or disagree with the positions of a specific vendor regarding its offering or strategy strength. But the emergence of this new wave is proof that data streaming is a new software category; not just yet another hype or next-generation ETL / ESB / iPaaS tool. Data Streaming Use Cases by Business Value A new software category opens use cases and adds business value across all industries: Adding business value is crucial for any enterprise. With so many potential use cases, it is no surprise that more and more software vendors add Kafka support to their products. Search my blog for your favorite industry to find plenty of case studies and architectures. Or read about use cases for Apache Kafka across industries to get started. The Data Streaming Landscape of 2024 Data Streaming is a separate software category of data platforms. Many software vendors built their entire businesses around this category. The data streaming landscape shows that most vendors use Kafka or implement its protocol because Apache Kafka has become the de facto standard for data streaming. New software companies have emerged in this category in the last few years. And several mature players in the data market added support for data streaming in their platforms or cloud service ecosystem. Most software vendors use Kafka for their data streaming platforms. However, there is more than Kafka. Some vendors only use the Kafka protocol (Azure Event Hubs) or utterly different APIs (like Amazon Kinesis). The following Data Streaming Landscape 2024 summarizes the current status of relevant products and cloud services. Please note: Intentionally, this is not a complete list of frameworks, cloud services, or vendors. It is not an official research landscape. There is no statistical evidence. If your favorite technology is not in this diagram, then I did not see it in my conversations with customers, prospects, partners, analysts, or the broader data streaming community. Also, note that I focus on general data streaming infrastructure. Brilliant solutions exist for using and analyzing streaming data for specific scenarios, like time series databases, machine learning engines, or observability platforms. These are complementary and often connected out of the box to a streaming cluster. Deployment Models: Self-Managed vs. Fully Managed Different data streaming categories exist regarding the deployment model: Self-managed: Operate nodes like Kafka Broker, Kafka Connect, and Schema Registry by yourself with your favorite scripts and tools. This can be on-premise or in the public cloud in your VPC. Partially managed: Reduce the operations burden via a cloud-native platform (usually Kubernetes) and related operator tools that automate operations tasks like rolling upgrades or rebalancing Kafka Partitions. This can be on-premise or in the public cloud either in your VPC or in the vendor's VPC. Fully managed: Leverage (truly) fully managed SaaS offerings that do 100 percent of the operations and provide critical SLAs and support to focus on integration and business logic. What About Bring Your Own Cloud (BYOC)? Some vendors offer a fourth deployment model: Bring Your Own Cloud (BYOC), an approach where the software vendor operates a cluster for you in your environment. BYOC is a deployment model that sits somewhere between a SaaS cloud service and a self-managed deployment. I do NOT believe in this approach as too many questions and challenges exist with BYOC regarding security, support, and SLAs in the case of P1 and P2 tickets and outages. Hence, I put this in the category of self-managed. That is what it is, even though the vendor touches your infrastructure. In the end, it is your risk because you have to and want to control your environment. Jack Vanlightly wrote an excellent article "On The Future Of Cloud Services And BYOC." Jack explores the following myths: Myth 1: BYOC enables better security by keeping the data in your account. Myth 2: BYOC is cheaper, with a lower total cost of ownership (TCO). I summarize the story with these two drawings and highly recommend reading Jack's detailed article about BYOC and its trade-offs: Source: Jack Vanlightly Here is Jack's conclusion: "Just as customers moved away from racking their own hardware to move to the cloud, those that try BYOC will similarly migrate away to SaaS for its simplicity, reliability, scalability, and cost-effectiveness." I fully agree. Streaming Categories: Native Kafka vs. Protocol Compatibility vs. Stream Processing Apache Kafka became the de facto standard for data streaming like Amazon S3 is the de facto standard for object storage: When you explore the data streaming world, there is no way not to look at the Apache Kafka ecosystem. The data streaming landscape covers three streaming categories: Native Apache Kafka: The product or cloud service leverages the open-source framework for real-time messaging and event store. The Kafka API is 100% compliant. Not included are Kafka Streams and Kafka Connect; many vendors exclude these Kafka features. Kafka protocol: The product or cloud services implement its own code but support the Kafka API. These offerings are usually NOT 100% compliant. Usually, Kafka Connect and Kafka Streams are usually not part of the offerings. Stream processing: Frameworks and cloud services correlate data in a stateless or stateful manner. Solutions are either Kafka-native, work together with the Kafka protocol, or run completely independently. It is really tough to define these categories. For instance, I could add another section for Kafka Connect, or more generally, data integration. Another debate is how to clarify if a vendor supports the complete Kafka API (with or without Kafka Connect or Kafka Streams). But I do not want to have an endless list of solutions. Therefore, the focus is on the Kafka protocol (being the de facto standard for messaging and storage) and related stream processing with Kafka and non-Kafka technologies. Changes in the Data Streaming Landscape From 2023 to 2024 My goal is NOT a growing landscape with tens or even hundreds of vendors and cloud services. Plenty of these pictures exist. Instead, I focus on a few technologies, vendors, and serverless offerings that I really see in the field used in practice, with excitement by the broader open source and cloud community. Therefore, the following changes were made compared to the data streaming landscape 2023 published a year ago: Replaced Category: I changed "Non-Kafka" to "Stream Processing" to get the right focus on data streaming, not including core messaging solutions like Google Pub/Sub or Amazon Kinesis. Red Hat: Strategy shift at IBM (again). Red Hat shut down its cloud offering "Red Hat OpenShift Streams for Apache Kafka" and shifts conversations to IBM products. I replaced the logo with IBM which still offers Kafka cloud products. But to be clear: Red Hat AMQ Streams (i.e., self-managed Kafka + Strimzi Kubernetes operator) is still a selling product. Added WarpStream: A new entrant into fully managed cloud services using the Kafka protocol. Confluent: Added to the stream processing category with Kafka Streams, ksqlDB, and Apache Flink (via its Immerok acquisition). Ververica: Provides a Flink platform for many years. Honestly, not sure why I missed them last year. Amazon Managed Service for Apache Flink (MSF). Formerly known as Amazon Kinesis Data Analytics (KDA). Added to partially managed stream processing. Google DataFlow: Added to fully managed stream processing. Removed This is a controversial section. Hence, once again, I emphasize that this is just what I see in the field, not as a statistical research or survey. Google Pub/Sub: Focus on data streaming, not message brokers. TIBCO: Not used for data streaming (beyond TIBCO StreamBase in the financial services market for low-latency trading). DataStax: I do not see open-source Apache Pulsar much. And I did not see DataStax' Pulsar offering Luna Streaming to get any traction in the market at all. Also, it seems like the vendor did another strategic shift and focuses much more on Vector Databases and Generative AI (GenAI) now. Lenses + Conduktor: The reason I removed them is not that they are not used. These are great tools. But this landscape focuses on data streaming platforms, not complementary management, monitoring, or proxy tools. So many tools exist in the meantime around Kafka - this deserves its own landscape or comparison article. Pravega: I did not see this technology in the field a single time in 2023. Immerok: Acquired by Confluent. Hazelcast: I did not see it in the real world for data streaming scenarios. The technology is well-known as an in-memory data grid, but not for stream processing. As I mentioned the Forrester Wave above, you might realize that I did not include every "strong performer" from the report. E.g., TIBCO, SAS, or Hazelcast. Because I don't see any traction among these vendors in my conversations about event-driven architectures and stream processing. This is no statistical evidence or trying to make other tools bad. Evaluation Criteria for Data Streaming Platforms I often recommend using the following four aspects to look at different frameworks, platforms, and cloud services to evaluate a technology for your business project or enterprise architecture strategy: Cloud-native: Is the solution elastic to scale up and down? Is it fully managed / serverless, or just a bunch of server instances hosted in the cloud? Can you automate the development, operations, and testing process using DevOps, GitOps, test-driven development, and similar principles? Complete: Does the solution offer all required capabilities? Data streaming requires more than just messaging or data ingestion. Hence, does it provide connectors, data processing, governance, security, self-service, open APIs, and so on? Everywhere: Where can you use the solution? Cloud-only? Are all required cloud service providers supported? Is there an option to deploy in a data center or even at the edge (i.e., outside a data center), like a factory, cell tower, or retail store? How can you share data between regions, clouds or data centers? What use cases does it support (e.g., aggregation, disaster recovery, hybrid integration, migration, etc.)? Supported: Is the solution mature and battle-tested? Are public case studies available for your use case or industry? Does the vendor fully support the product? What are the SLAs? Are specific features excluded from commercial enterprise support? Some vendors offer data streaming cloud services (like a managed Kafka service) and exclude support in the terms and conditions (that many people don't read in public cloud services, unfortunately). Let's take a deeper look into the different data streaming categories and start with the leading technology: Native Apache Kafka... Native Apache Kafka for Data Streaming Starting with the leading data streaming technology, Apache Kafka, and related vendors and SaaS offerings. The growth of the Apache Kafka community in the last few years is impressive. Here are some statistics that Jay Kreps presented last year at the data streaming conference "Current - The Next Generation of Kafka Summit" in Austin, Texas: >100,000 organizations using Apache Kafka >41,000 Kafka meetup attendees >32,000 Stack Overflow questions >12,000 Jiras for Apache Kafka >31,000 Open job listings request Kafka skills And look at the increased number of active monthly unique users downloading the Kafka Java client library with Maven: Source: Sonatype Apache Kafka Vendors: Self-Managed vs. Cloud Offerings New software companies focus on data streaming. And even long-standing companies such as IBM and Oracle followed the trend in the past few years. On a top level — to keep it simple — three kinds of offerings exist for Apache Kafka: I made a detailed comparison of on-premise Kafka vendors and cloud services using this car analogy. Only Amazon MSK Serverless (i.e., the fully managed service, not the partially Managed MSK) was not available when writing this comparison. Hence, also read Confluent Cloud versus Amazon MSK Serverless. Here are a few notes on each technology as a summary. Apache Kafka: The de facto standard for data streaming. Open source with a vast community. All the vendors in this list rely on (parts of) this project. Confluent: Provides data streaming everywhere with Confluent Platform (self-managed) and Confluent Cloud (fully managed and available across all major cloud providers). Cloudera: Provides Kafka as a self-managed offering. Focuses on combining many data technologies like Kafka, Hadoop, Spark, Flink, NiFi, and many more. IBM: Provides Kafka as a partially managed cloud offering and self-managed Kafka on Kubernetes via OpenShift (through Red Hat). Kafka is part of the integration portfolio that includes other open-source frameworks like Apache Camel. AWS: Provides two separate products with Amazon MSK (partially managed) and Amazon MSK Serverless (fully managed). Both products have very different functionalities and limitations. Both MSK offerings exclude Kafka support (read the terms and conditions). AWS has hundreds of cloud services, and Kafka is part of that broad spectrum. Only available in public AWS cloud regions; not on Outposts, Local Zones, Wavelength, etc. Instaclustr and Aiven: Partially managed Kafka cloud offerings across cloud providers. The product portfolios offer various hosted services of open-source technologies. Instaclustr also offers a (semi-)managed offering for on-premise infrastructure. Microsoft Azure HDInsight: A piece of Azure's Hadoop infrastructure. Not intended for other use cases. Only available in public Azure cloud regions. This is no comparison. Just a list with a few notes. Make your own evaluation of your favorite vendors. Check what you need: Cloud-native? Complete? Everywhere? Supported? Keep in mind that many vendors exclude or do not focus on Kafka Streams and Kafka Connect and only offer incomplete Kafka; they want to sell their own integration and processing products instead. Don't compare apples and oranges! Open Source Frameworks and SaaS Using the Kafka Protocol A few vendors don't rely on open-source Apache Kafka but built their own implementations on top of the Kafka protocol for different reasons. Marketing will not tell you, but the Kafka protocol compatibility is limited. This can create risk in operating existing Kafka workloads against the cluster and differs in operations and execution (which can be good or bad). Here are a few notes on each technology as a summary: Apache Pulsar: A competitor to Apache Kafka. Similar story and use cases, but different architecture. Kafka is a single distributed cluster - after removing the ZooKeeper dependency in 2022. Pulsar is three (!) distributed clusters: Pulsar brokers, ZooKeeper, and BookKeeper. It is too late now to get more market traction. And Kafka catches up to some missing features like Queues for Kafka. StreamNative: The primary vendor behind Apache Pulsar. Offers self-managed and fully managed solutions. StreamNative Cloud for Kafka is still in a very early stage of maturity, not sure if it will ever strengthen. No surprise you can now also choose BYOC instead. Redpanda: A relatively new entrant into the data streaming market offering self-managed and fully managed products. Interesting approach to implementing the Kafka protocol with C++. It might take some market share if they can find the proper use cases and differentiators. Today, I don't see Redpanda as an alternative to a Kafka-native offering because of its early stage in the maturity curve and no added value for solving business problems versus the added risk compared to Apache Kafka. Azure Event Hubs: A mature, fully managed cloud service. The service does one thing, and that is done very well: Data ingestion via the Kafka protocol. Hence, it is not a complete streaming platform but is more comparable to Amazon Kinesis or Google Cloud PubSub. Only available on public Azure cloud regions. The limited compatibility with the Kafka protocol and the high cost of the service are the two blockers I hear regularly. WarpStream: A new entrant into the data streaming market. The cloud service is a Kafka-compatible data streaming platform built directly on top of S3. The worse latency is probably okay for some Kafka use cases. But only the future will show if this differentiating architecture plays a key role after other Kafka cloud services adopt Kafka's KIP-405 for Tiered Storage (which is available in early access since Kafka 3.6). Be careful about statements of vendors that reimplement the Kafka protocol. Most of these vendors oversell the Kafka protocol compatibility. Additionally, "benchmarketing" (i.e., picking a sweet spot or niche scenario where you perform better than your competitor) is the favorite marketing technique to "prove" differentiators to the real Apache Kafka. Stream Processing Technologies While Apache Kafka is the de facto standard for message and event storage, many complementary and competitive technologies exist for stream processing: Even more technologies emerge these days because of the growth of this software category across the globe and all industries. That's excellent news. Data streaming is here to stay and grow. The situation is challenging to explore as part of the data streaming landscape, as some products are complementary and competitive to the Apache Kafka ecosystem. Apache Flink Adoption and Growth Fun fact: The leading conference for Kafka was rebranded from "Kafka Summit" to "Current - The Next Generation of Kafka Summit" in 2022. Why? Because data streaming is more than Kafka. Many complementary and competitive technologies were present, including vendors, booths, demos, and customer case studies. That's a remarkable evolution of data streaming for the community and enterprises across the globe! Apache Flink is becoming the de facto standard for stream processing. The rise of Flink looks very similar to Kafka's adoption a few years ago: But don’t underestimate the power and use cases of Kafka-native stream processing with Kafka Streams. The adoption rate is massive, as Kafka Streams is easy to use. And it is part of Apache Kafka. Some Stream Processing Products Are Complementary to Kafka Each stream processing framework or cloud service has trade-offs. While Flink gets a lot of traction, there are others, too. There is no single size that fits all use cases. Here are a few mature and emerging technologies that complement Apache Kafka. Open Source Stream Processing Frameworks Kafka Streams: Part of open-source Apache Kafka. Hence, if you download Kafka from the Apache website, it always includes the library. You should always ask yourself if you need another framework besides Kafka Streams for stream processing. The significant benefit: One technology, one vendor, one infrastructure. ksqlDB (usually called KSQL, even after the rebranding): An abstraction layer on top of Kafka Streams to provide stream processing with streaming SQL. Hence, also Kafka-native. It comes with a Confluent Community License and is free to use. Sweet spot: Streaming ETL. Apache Flink: Leading open-source stream processing framework. Advanced features include a powerful scalable compute engine (separated from Kafka), freedom of choice for developers between ANSI SQL, Java, and Python, APIs for Complex Event Processing (CEP), and unified APIs for stream and batch workloads. Spark Streaming: The streaming part of the open-source big data processing framework Apache Spark. I am still not 100 percent convinced. Kafka Streams and Apache Flink are the better choices for stream processing. However, the enormous installed base of Spark clusters in enterprises broadens adoption. Stream Processing Vendors and Cloud Services Ververica: Well-known Flink company. Acquired by Chinese giant Alibaba in 2019. Not much traction in Europe and the US, but definitely an expert player for Flink in Asia. Personally, I have never seen the vendor being used outside of Asia. Decodable: A new cloud service. Very early stage. I still added it, as I think it is an excellent strategic move to build a data streaming cloud service on top of Apache Flink. Huge potential if it is combined with existing Kafka infrastructures in enterprises. But also provides pre-built connectors for non-Kafka systems. Amazon Managed Service for Apache Flink (MSF): An (almost) fully managed service by AWS that allows customers to transform and analyze streaming data in real-time with Apache Flink. It still provides some (costly) gaps for auto-scaling and is not truly serverless. Supports all Flink interfaces, i.e., SQL, Java, and Python. And non-Kafka connectors, too. Only available on AWS. Confluent Cloud: A truly serverless Flink offering that is elastic and scales to zero if not used. Only supports SQL in the beginning. Java and Python support coming in 2024. Starts on AWS, but expected to be available on GCP and Azure in early 2024. Deeply integrated with fully managed Kafka, Schema Registry, Connectors, Data Governance, and other features of Confluent. Provides a seamless end-to-end developer experience for data streaming pipelines. Databricks: Was the leading vendor behind Apache Spark. Today, nobody talks about Spark anymore (even if it is a key technology piece of Databricks' cloud platform). Trying to get much more into the business of real-time data. I like the platform for analytics, reporting, and AI/machine learning. But I am not convinced by the data lakehouse story around "doing everything within one big data lake." Most of these services work well with the solutions of other vendors. For instance, Databricks integrates easily with any Kafka environment, or Amazon MSF connects directly to Confluent's Kafka. Apache Flink (or Spark Streaming) WITHOUT Kafka? Most stream processing technologies complement Apache Kafka. But stream processing frameworks like Flink or cloud services like Databricks do NOT need Kafka as an ingestion layer. There are other options... Flink, Spark, et al. can consume data from other streaming platforms or directly from data stores like a file or database. Be careful with the latter: If you use Flink or Spark Streaming for stream processing, that's fine. But if the first thing to do is read the data from an S3 object store, well, that is data at rest. BUT: A common trend in the data streaming market is long-term storage of (some) events within the event streaming platform. Especially, introducing Tiered Storage for Kafka changed the capabilities and use cases. The support for object storage by some vendors via the S3 interface can be an entire game changer for storing and processing events in real-time with the Kafka protocol or with other analytics engines and databases in near-real-time or batch. And Apache Iceberg might be the next trend we talk about for the 2025 streaming landscape. And understand that stream processing applications can also keep state: The backend of your Kafka Streams or Flink app can store state for your tasks like enrichment purposes. A stream processing application is not just about real-time data feeds. It also correlates these real-time feeds with (already ingested) historical data. This is a common approach for metadata or business data that is updated less frequently (like from an SAP ERP system). Some Stream Processing Products Are Competitive to Kafka In some situations, you must evaluate whether Apache Kafka or another technology is the right choice. Here are a few open-source and cloud competitors: Amazon Kinesis: Data ingestion into AWS data stores. Mature product for a specific problem. Only available on AWS. Google Cloud DataFlow: Fully managed service for executing Apache Beam pipelines within the Google Cloud Platform ecosystem. Mature product for a specific problem. Only available on GCP. Many competitive startups emerge around stream processing outside of the Kafka and Flink world. Let's see if some get traction in 2024. Amazon Kinesis and Google Cloud DataFlow are excellent cloud services if you "just" want to ingest data into a specific cloud storage. If there are no other use cases, these tools might be the right choice (if pricing at scale and other limitations work for you). Apache Kafka is a much more flexible and strategic data streaming platform. Many projects still start with data ingestion and build the first pipeline. But providing access to the same stream of events to any other data sink or for powerful stream processing with tools like Kafka Streams or Apache Flink is a significant advantage. Potential for the Data Streaming Landscape 2025 Data streaming is a journey. So is the development of event streaming platforms and cloud services. Several established software and cloud vendors might get more traction with their data streaming offerings. And some startups might grow significantly. The following shows a few technologies that might evolve and see growing adoption in 2024: Additional Kafka cloud services like Digital Ocean or Oracle Cloud Infrastructure (OCI) Streaming might get more traction. I have not seen these yet in the field. But e.g., combining Oracle GoldenGate with OCI Streaming might be interesting to some organizations for some use cases. Hazelcast rebrands significantly and is part of Forrester's Streaming Data Wave. With both an on-premise and serverless cloud offering, the technology might get traction for data streaming and come back to my landscape next year. Streaming Databases like Materialize or RisingWave might become its own category. My feeling: The super early stage of the hype cycle. We will see in 2024 if and where this technology gets more broadly adopted and what the use cases are. It is hard to answer how these will compete with emerging real-time analytics databases like Apache Druid, Apache Pinot, ClickHouse, Rockset, Timeplus, et al. I know there are differences, but the broader community and companies need to a) understand these differences and b) find business problems for it. SaaS Startups like Quix and Bytewax (both stream processing with Python), DeltaStream (powered by Apache Flink), and many more have emerged. Let's see which of these gets traction in the market with an innovative product and business model. Existing multi-product enterprise extend their offerings around Kafka with separate Flink services. For instance, Aiven already has a Flink product in the meantime that might get traction like its Kafka offering. Traditional data management vendors like MongoDB or Snowflake try to get deeper into the data streaming business. I am still a fan of separation of concerns; so I think these should keep their sweet spot and (only) provide streaming ingestion and CDC as use cases, but not (try to) compete with data streaming vendors. Fun fact: The business model of almost all emerging startups is fully managed cloud services, not selling licenses for on-premise deployments. Many are based on open-source or open-core, and others only provide a proprietary implementation. The Data Streaming Journey Is a Long One... Data Streaming is not a race, it is a journey! Event-driven architectures and technologies like Apache Kafka or Apache Flink require a mind shift in architecting, developing, deploying, and monitoring applications. Legacy integration, cloud-native microservices, and data sharing across hybrid and multi-cloud setups are the norm, not an exception. The data streaming landscape 2024 shows how a new software category is emerging. We are still in an early stage. A new software category takes time to create. In most conversations with customers, partners, and the community, I hear statements like: "We see the value, but we are not there yet — we now start with building first data streaming pipelines and have a roadmap for the next years to add more advanced stream processing."
Open table formats are file formats tailored to store vast datasets in distributed data processing systems. They streamline data storage with features like: Columnar storage for analytical workloads Compression for reduced storage costs and improved performance Schema evolution for adapting to changing data structures ACID compliance, ensuring data integrity Support for transactional operations Time travel capabilities for historical data querying Seamless integration with various data processing frameworks and ecosystems These characteristics collectively enable the construction of scalable, dependable, and efficient data processing pipelines, making open table formats preferred options in contemporary data architectures and analytics workflows. Let us dive deep into the open table formats Apache Iceberg, Apachi Hudi, and Delta Lake. Apache Iceberg Apache Iceberg is an open-source table format designed for large-scale data lakes, aiming to improve data reliability, performance, and scalability. Its architecture introduces several key components and concepts that address the challenges commonly associated with big data processing and analytics, such as managing large datasets, schema evolution, efficient querying, and ensuring transactional integrity. Here's a deep dive into the core components and architectural design of Apache Iceberg: Figure 1: Apache Iceberg architecture (Source: Dremio) 1. Table Format and Metadata Management Versioned Metadata Iceberg uses a versioned metadata management system where each operation on a table creates a new snapshot of the table metadata. This approach ensures atomicity and consistency, supporting ACID transactions, whether changes are fully applied or not. Snapshot Management Each snapshot contains full table metadata, including schema information, partitioning details, and file lists. Snapshots enable time travel capabilities, allowing users to query data as it was at any point in time. Metadata Files Metadata is stored in JSON format, making it easily readable and accessible. The use of lightweight metadata files also simplifies operations like schema evolution and partitioning changes, as these operations only require metadata updates without affecting the actual data. 2. File Organization and Partitioning Partitioning Iceberg introduces a flexible partitioning system that supports partition evolution. Partitions are defined in the table metadata, allowing for changes over time without the need to rewrite data. This significantly reduces the complexity of managing evolving datasets. File Layout Data is organized into files stored in object storage. Iceberg supports multiple file formats, including Parquet, Avro, and ORC. Files are grouped into "manifests" for efficient metadata management. Hidden Partitioning Iceberg's partitioning is logical and decoupled from the physical storage, enabling optimizations like predicate pushdown for efficient data access without the need for costly directory traversals. 3. Scalability and Performance Incremental Processing Iceberg tables are designed for efficient incremental data processing. By tracking additions and deletions in snapshots, Iceberg enables consumers to process only the changes between snapshots, reducing the amount of data to scan. Scalable Metadata Operations The architecture is designed to scale metadata operations, allowing for efficient handling of large datasets. The use of compact metadata files and manifest lists helps in managing extensive datasets without performance degradation. 4. Query Engine Integration Broad Ecosystem Support Iceberg is designed to integrate seamlessly with a wide range of query engines and data processing frameworks, including Apache Spark, Trino, Flink, and Hive. This is achieved through a well-defined API that allows these engines to leverage Iceberg's features like snapshot isolation, schema evolution, and efficient file pruning. 5. Transaction Support and Concurrency ACID Transactions Iceberg provides ACID transactions to ensure data integrity, supporting concurrent reads and writes. The optimistic concurrency model allows multiple operations to proceed in parallel, with conflict detection and resolution mechanisms in place to maintain consistency. 6. Schema Evolution and Compatibility Schema Evolution Iceberg supports adding, renaming, deleting, and updating columns while maintaining backward and forward compatibility. This allows for schema changes without downtime or data migration. Apache Iceberg's architecture is designed to address the limitations of traditional data lakes by providing reliable ACID transactions, efficient metadata management, and scalable data processing capabilities. Its flexible partitioning, versioned metadata, and integration with popular query engines make it a robust solution for managing large-scale data lakes in a variety of use cases, from analytical workloads to real-time streaming. Apache Hudi Apache Hudi (short for Hadoop Upserts Deletes and Incrementals) is an open-source data management framework used to simplify incremental data processing and data pipeline development on top of data lakes like HDFS, S3, or cloud-native data services. Hudi brings stream processing to big data, providing fresh data while also efficiently storing large datasets. Here's a deep dive into the architecture and core components of Apache Hudi: Figure 2: Apache Hudi architecture (Source) 1. Core Concepts Table Types Hudi supports two types of tables: Copy on Write (CoW) and Merge on Read (MoR). CoW tables are optimized for read-heavy workloads with simpler write patterns, where each write operation creates a new version of files. MoR tables support more complex workloads with frequent reads and writes, storing data in a combination of columnar (for efficient reading) and row-based formats (for efficient upserts). Record Keys and Partitioning Hudi tables are indexed by a record key, and data is partitioned into directories on the file system based on a partition path. This structure enables efficient upserts (updates and inserts) and deletes. 2. Data Storage and Management File Size Management Hudi automatically manages file sizes and layouts to optimize read and write performance. It compacts small files and organizes data into larger ones to improve efficiency. Indexing Hudi maintains indexes to quickly locate records for updates or deletes, significantly reducing the amount of data that needs to be scanned during such operations. Log Files for MoR Tables In Merge on Read tables, Hudi uses log files to store incoming writes (upserts and deletes) efficiently. This allows for quicker writes, deferring the merging of data into columnar files until read or compaction time. 3. Transactions and Concurrency ACID Transactions Hudi provides snapshot isolation for reads and writes, enabling transactions. This ensures data integrity and consistency even in the presence of concurrent operations. Optimistic Concurrency Control Hudi employs optimistic concurrency control to manage concurrent writes. It resolves conflicts by retrying or failing operations, depending on the conflict resolution strategy. 4. Incremental Processing Change Capture and Incremental Pulls Hudi supports capturing changes to data at the record level, enabling efficient incremental data processing. Applications can query for data changes at a specific point in time, reducing the amount of data to process. 5. Query Engine Integration Wide Compatibility Hudi integrates with popular query engines like Apache Spark, Apache Flink, Presto, and Hive. This allows users to query Hudi tables using familiar tools and APIs. 6. Scalability and Performance Scalable Metadata Management Hudi is designed to handle large datasets by efficiently managing metadata. It leverages compact, serialized metadata formats and scalable indexing mechanisms to maintain performance. Data Compaction For Merge on Read tables, Hudi performs background compaction of log files into columnar formats, optimizing read performance without impacting ongoing writes. 7. Data Management Features Time Travel Hudi supports querying data as of a specific point in time, enabling time travel queries for auditing or rollback purposes. Schema Evolution Hudi handles schema changes gracefully, allowing for additions, deletions, and modifications of table schema without disrupting data processing. Apache Hudi's architecture is designed to address the complexities of managing large-scale data lakes by providing efficient upserts, deletes, and incremental processing capabilities. Its integration with popular big data processing frameworks, ACID transaction support, and optimizations for both read and write performance make it a powerful tool for building high-throughput, scalable data pipelines. Hudi's approach to data management enables faster data refreshes and simplifies handling late-arriving data, making it an essential component in modern data architecture for real-time analytics and data processing. Delta Lake Architecture Delta Lake is an open-source storage layer that brings ACID transactions to Apache Spark and big data workloads, designed to provide a more reliable and performant data lake. Delta Lake enables users to build robust data pipelines that are resilient to failures, support concurrent reads and writes, and allow for complex data transformations and analytics. Here's a deep dive into the core components and architectural design of Delta Lake: Figure 3: Delta Table (Source: Delta Lake Documentation) 1. Core Concepts and Components ACID Transactions Delta Lake ensures data integrity and consistency across reads and writes by implementing ACID transactions. This is achieved through atomic operations on the data, which are logged in a transaction log, ensuring that each operation either fully completes or does not happen at all. Delta Table A Delta table is a versioned parquet table with a transaction log. The transaction log is a record of every change made to the table and is used to ensure consistency and enable features like time travel. Schema Enforcement and Evolution Delta Lake enforces schema validation on write operations, preventing bad data from causing data corruption. It also supports schema evolution, allowing for the addition of new columns and changes to the table schema without breaking existing queries. 2. Transaction Log The transaction log (often referred to as the Delta Log) is a key component of Delta Lake's architecture. It contains JSON files that track changes to the table, including information about commits, saved as ordered, and immutable log entries. This log allows Delta Lake to: Maintain a timeline: Track all transactions and modifications to the table, supporting atomicity and consistency. Support time travel: Query previous versions of the table, enabling data rollback and auditing. Enable concurrency: Manage concurrent reads and writes efficiently using optimistic concurrency control. 3. Data Storage and Management Parquet Files Delta Lake stores data in Parquet files, leveraging its efficient, columnar storage format. Parquet files are immutable, and modifications create new versions of files, which are then tracked through the transaction log. File Management Delta Lake optimizes file storage by compacting small files and coalescing them into larger ones to improve read performance. It also supports partitioning to enhance query performance. 4. Scalability and Performance Optimized Layouts Delta Lake uses Z-ordering and data skipping to optimize the layout of data on disk, significantly reducing the amount of data scanned for queries. Streaming and Batch Processing Seamlessly integrate streaming and batch data processing within the same pipeline, ensuring that data is up-to-date and consistent across all operations. 5. Advanced Data Operations Upserts, Deletes, and Merges Delta Lake supports advanced data operations like upserts (MERGE INTO), deletes, and merges, making it easier to manage and maintain data lakes by simplifying complex transformations and updates. Incremental Processing Delta Lake allows for efficient incremental processing of data changes, enabling the building of complex ETL pipelines that can process only the data that has changed since the last operation. 6. Integration With the Data Ecosystem Delta Lake is deeply integrated with Apache Spark, and its APIs are designed to be used seamlessly with Spark DataFrames. This tight integration allows for high-performance data transformations and analytics. Additionally, Delta Lake can be used with other data processing and query engines, enhancing its versatility in a multi-tool data architecture. Delta Lake's architecture addresses many of the challenges faced by data engineers and scientists working with big data, such as ensuring data integrity, supporting complex transactions, and optimizing query performance. By providing ACID transactions, scalable metadata handling, and robust data management features, Delta Lake enables more reliable and efficient data pipelines, making it a foundational component of modern data platforms. Comparison Table Between Iceberg, Hudi, and Delta Lake Feature Apache Iceberg Apache Hudi Delta Lake Foundation Open table format for huge analytic datasets Data management framework for incremental processing Storage layer that brings ACID transactions Primary Use Case Improving data reliability, performance, and scalability in data lakes Simplifying data pipeline development on data lakes Making big data workloads more reliable and performant Data Structure Table format with versioned metadata Supports Copy on Write (CoW) and Merge on Read (MoR) tables Versioned Parquet table with a transaction log Schema Evolution Supports adding, renaming, deleting, and updating columns Handles schema changes, allowing additions, deletions, and modifications Enforces schema validation and supports evolution Partitioning Flexible partitioning system with partition evolution Indexed by record key, partitioned into directories Supports partitioning, optimized through Z-ordering ACID Transactions Yes, with atomic operations and snapshot isolation Yes, it provides snapshot isolation for reads and writes Yes, it ensures data integrity across reads and writes Concurrency and Conflict Resolution Optimistic concurrency model Optimistic concurrency control with conflict resolution Manages concurrent reads and writes efficiently Incremental Processing Designed for efficient incremental data processing Captures changes at the record level for efficient incremental pulls Allows incremental processing of data changes Time Travel Snapshot management for querying data at any point in time Snapshot management for querying data at any point in time Time travel for data rollback and auditing File Formats Supports multiple formats like Parquet, Avro, ORC Manages file sizes and layouts, compacting for efficiency Stores data in Parquet files, optimizing file management Query Engine Integration Broad support (e.g., Apache Spark, Trino, Flink) Compatible with popular query engines like Spark, Flink, Presto Deeply integrated with Apache Spark Performance Optimizations Metadata management for large datasets, hidden partitioning Scalable metadata and indexing, file size management Optimizes layout with Z-ordering, data skipping Operational Features Supports upserts, deletes, and schema evolution with minimal impact on performance Advanced data operations like upserts, deletes, merges Advanced operations like upserts, deletes, merges Conclusion Open table Formats, such as Apache Iceberg, Apache Hudi, and Delta Lake, represent a significant advancement in managing vast datasets within distributed data processing systems. These formats bring forth many features to enhance data storage, processing, and analysis in traditional significant data ecosystems like Apache Spark and Hadoop, and modern cloud-based data lakes. Key attributes include columnar storage optimization for analytical processing, data compression to minimize storage costs and boost performance, and schema evolution capabilities that adapt to changing data structures. Additionally, they ensure data integrity through ACID compliance, support transactional operations, offer time travel features for accessing historical data, and facilitate seamless integration across diverse data processing frameworks.
Snowflake, the cloud-based data warehousing platform, has gained significant traction in recent years due to its innovative features and performance optimizations. One of these key features is micro-partitioning, which enhances storage and query performance. In this article, we will delve deeper into the technical aspects of Snowflake's micro-partitioning, discuss its advantages, and provide an advanced developer guide with examples. Understanding Micro-Partitioning at a Deeper Level Micro-partitioning in Snowflake can be better understood by examining its core components: Data Ingestion and Clustering Snowflake ingests data using the COPY command or Snowpipe, both of which automatically divide data into micro-partitions based on natural clustering patterns. Micro-partitions are created using a range-based clustering algorithm that sorts input data on one or more clustering keys. This process ensures that related data is co-located within the same micro-partition, reducing the amount of data scanned during query execution. Columnar Storage Snowflake stores each micro-partition in a columnar format, where values for a single column are stored together. This format enables efficient compression and encoding schemes, such as Run-Length Encoding (RLE) and Delta Encoding, which reduce storage costs and improve query performance. Metadata Management Snowflake maintains metadata about each micro-partition, including the minimum and maximum values for each column (known as min-max pruning), the number of distinct values (NDV), and the partition's size. The Query Optimizer leverages this metadata to prune irrelevant micro-partitions and minimize data scanned during query execution. Example: Consider a table with columns A, B, and C. If a user executes a query with a filter condition "WHERE A > 100", the Query Optimizer uses the metadata for column A to identify and prune micro-partitions where the maximum value of A is less than or equal to 100. This process significantly reduces the amount of data scanned and improves query performance. Advantages of Micro-Partitioning Improved query performance: Micro-partitioning enables Snowflake to optimize query performance by minimizing the amount of data scanned during execution. This is achieved through metadata-based pruning and the co-location of related data within micro-partitions. Scalability: Micro-partitioning allows Snowflake to distribute data across multiple nodes in a cluster, enabling horizontal scaling. As your data grows, you can add more compute resources to maintain optimal query performance. Storage efficiency: The columnar storage format within micro-partitions allows for efficient compression and encoding, reducing storage costs. Data protection: Snowflake's micro-partitioning architecture provides built-in data protection features, such as automatic replication and failover, ensuring high availability and durability for your data. Advanced Developer Guide to Micro-Partitioning Load data efficiently: To maximize the benefits of Snowflake's micro-partitioning, load data in large, sorted batches using the COPY command or Snowpipe. Sorting data on one or more clustering keys before ingestion will help Snowflake create well-clustered micro-partitions. Example: Use the following COPY command to load sorted data from a CSV file into a table: SQL COPY INTO my_table FROM '@my_stage/my_data.csv' FILE_FORMAT = (TYPE = 'CSV') FORCE = TRUE; Optimize queries: Leverage Snowflake's metadata to optimize your queries, using filter predicates and join conditions that take advantage of min-max pruning and NDV-based optimizations. Monitor clustering: Regularly monitor the clustering score for your tables using the following query: SQL SELECT SYSTEM$CLUSTERING_INFORMATION('my_table', '(clustering_key_1, clustering_key_2)'); A low clustering score indicates that your data is not well-clustered within micro-partitions, and you should consider re-clustering your data using the ALTER TABLE RECLUSTER command. Leverage time travel and data sharing: Utilize Snowflake's Time Travel feature to access historical data by specifying a time offset in your queries: SQL SELECT * FROM my_table AT(TIMESTAMP => TO_TIMESTAMP('2022-01-01 00:00:00')); Use Data Sharing to securely share data with other organizations by creating shares and granting access to specific objects: SQL CREATE SHARE my_share; GRANT USAGE ON DATABASE my_database TO SHARE my_share; GRANT SELECT ON TABLE my_table TO SHARE my_share; Conclusion By delving deeper into the technical aspects of Snowflake's micro-partitioning and following the advanced developer guide provided in this article, you can harness the full potential of this powerful feature to optimize your data warehousing and analysis processes. With improved query performance, scalability, storage efficiency, and data protection, Snowflake's micro-partitioning technology is a game-changer in the world of data management.
Over the past few years, Apache Kafka has emerged as the leading standard for streaming data. Fast-forward to the present day: Kafka has achieved ubiquity, being adopted by at least 80% of the Fortune 100. This widespread adoption is attributed to Kafka's architecture, which goes far beyond basic messaging. Kafka's architecture versatility makes it exceptionally suitable for streaming data at a vast "internet" scale, ensuring fault tolerance and data consistency crucial for supporting mission-critical applications. Flink is a high-throughput, unified batch and stream processing engine, renowned for its capability to handle continuous data streams at scale. It seamlessly integrates with Kafka and offers robust support for exactly-once semantics, ensuring each event is processed precisely once, even amidst system failures. Flink emerges as a natural choice as a stream processor for Kafka. While Apache Flink enjoys significant success and popularity as a tool for real-time data processing, accessing sufficient resources and current examples for learning Flink can be challenging. In this article, I will guide you through the step-by-step process of integrating Kafka 2.13-3.7.0 with Flink 1.18.1 to consume data from a topic and process it within Flink on the single-node cluster. Ubuntu-22.04 LTS has been used as an OS in the cluster. Assumptions The system has a minimum of 8 GB RAM and 250 GB SSD along with Ubuntu-22.04.2 amd64 as the operating system. OpenJDK 11 is installed with JAVA_HOME environment variable configuration. Python 3 or Python 2 along with Perl 5 is available on the system. Single-node Apache Kafka-3.7.0 cluster has been up and running with Apache Zookeeper -3.5.6. (Please read here how to set up a Kafka cluster.). Install and Start Flink 1.18.1 The binary distribution of Flink-1.18.1 can be downloaded here. Extract the archive flink-1.18.1-bin-scala_2.12.tgz on the terminal using $ tar -xzf flink-1.18.1-bin-scala_2.12.tgz. After successful extraction, directory flink-1.18.1 will be created. Please make sure that inside it bin/, conf/, and examples/ directories are available. Navigate to the bin directory through the terminal, and execute $ ./bin/start-cluster.sh to start the single-node Flink cluster. Moreover, we can utilize Flink's web UI to monitor the status of the cluster and running jobs by accessing the browser at port 8081. The Flink cluster can be stopped by executing $ ./bin/stop-cluster.sh. List of Dependent JARs The following .jars should be included in the classpath/build file: I've created a basic Java program using Eclipse IDE 23-12 to continuously consume messages within Flink from a Kafka topic. Dummy string messages are being published to the topic using Kafka's built-in kafka-console-publisher script. Upon arrival in the Flink engine, no data transformation occurs for each message. Instead, an additional string is simply appended to each message and printed for verification, ensuring that messages are continuously streamed to Flink. Java package com.dataview.flink; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import com.dataview.flink.util.IKafkaConstants; public class readFromKafkaTopic { public static void main(String[] args) throws Exception { StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment(); KafkaSource<String> source = KafkaSource.<String>builder() .setBootstrapServers(IKafkaConstants.KAFKA_BROKERS) .setTopics(IKafkaConstants.FIRST_TOPIC_NAME) .setGroupId(IKafkaConstants.GROUP_ID_CONFIG) .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build(); DataStream<String> messageStream = see.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source"); messageStream.rebalance().map(new MapFunction<String, String>() { private static final long serialVersionUID = -6867736771747690202L; @Override public String map(String value) throws Exception { return "Kafka and Flink says: " + value; } }).print(); see.execute(); } } The entire execution has been screen-recorded. If interested, you can watch it below:< I hope you enjoyed reading this. Please stay tuned for another upcoming article where I will explain how to stream messages/data from Flink to a Kafka topic.
In Part 1 of this series, we looked at MongoDB, one of the most reliable and robust document-oriented NoSQL databases. Here in Part 2, we'll examine another quite unavoidable NoSQL database: Elasticsearch. More than just a popular and powerful open-source distributed NoSQL database, Elasticsearch is first of all a search and analytics engine. It is built on the top of Apache Lucene, the most famous search engine Java library, and is able to perform real-time search and analysis operations on structured and unstructured data. It is designed to handle efficiently large amounts of data. Once again, we need to disclaim that this short post is by no means an Elasticsearch tutorial. Accordingly, the reader is strongly advised to extensively use the official documentation, as well as the excellent book, "Elasticsearch in Action" by Madhusudhan Konda (Manning, 2023) to learn more about the product's architecture and operations. Here, we're just reimplementing the same use case as previously, but using this time, using Elasticsearch instead of MongoDB. So, here we go! The Domain Model The diagram below shows our *customer-order-product* domain model: This diagram is the same as the one presented in Part 1. Like MongoDB, Elasticsearch is also a document data store and, as such, it expects documents to be presented in JSON notation. The only difference is that to handle its data, Elasticsearch needs to get them indexed. There are several ways that data can be indexed in an Elasticsearch data store; for example, piping them from a relational database, extracting them from a filesystem, streaming them from a real-time source, etc. But whatever the ingestion method might be, it eventually consists of invoking the Elasticsearch RESTful API via a dedicated client. There are two categories of such dedicated clients: REST-based clients like curl, Postman, HTTP modules for Java, JavaScript, Node.js, etc. Programming language SDKs (Software Development Kit): Elasticsearch provides SDKs for all the most used programming languages, including but not limited to Java, Python, etc. Indexing a new document with Elasticsearch means creating it using a POST request against a special RESTful API endpoint named _doc. For example, the following request will create a new Elasticsearch index and store a new customer instance in it. Plain Text POST customers/_doc/ { "id": 10, "firstName": "John", "lastName": "Doe", "email": { "address": "john.doe@gmail.com", "personal": "John Doe", "encodedPersonal": "John Doe", "type": "personal", "simple": true, "group": true }, "addresses": [ { "street": "75, rue Véronique Coulon", "city": "Coste", "country": "France" }, { "street": "Wulfweg 827", "city": "Bautzen", "country": "Germany" } ] } Running the request above using curl or the Kibana console (as we'll see later) will produce the following result: Plain Text { "_index": "customers", "_id": "ZEQsJI4BbwDzNcFB0ubC", "_version": 1, "result": "created", "_shards": { "total": 2, "successful": 1, "failed": 0 }, "_seq_no": 1, "_primary_term": 1 } This is the Elasticsearch standard response to a POST request. It confirms having created the index named customers, having a new customer document, identified by an automatically generated ID ( in this case, ZEQsJI4BbwDzNcFB0ubC). Other interesting parameters appear here, like _version and especially _shards. Without going into too much detail, Elasticsearch creates indexes as logical collections of documents. Just like keeping paper documents in a filing cabinet, Elasticsearch keeps documents in an index. Each index is composed of shards, which are physical instances of Apache Lucene, the engine behind the scenes responsible for getting the data in or out of the storage. They might be either primary, storing documents, or replicas, storing, as the name suggests, copies of primary shards. More on that in the Elasticsearch documentation - for now, we need to notice that our index named customers is composed of two shards: of which one, of course, is primary. A final notice: the POST request above doesn't mention the ID value as it is automatically generated. While this is probably the most common use case, we could have provided our own ID value. In each case, the HTTP request to be used isn't POST anymore, but PUT. To come back to our domain model diagram, as you can see, its central document is Order, stored in a dedicated collection named Orders. An Order is an aggregate of OrderItem documents, each of which points to its associated Product. An Order document references also the Customer who placed it. In Java, this is implemented as follows: Java public class Customer { private Long id; private String firstName, lastName; private InternetAddress email; private Set<Address> addresses; ... } The code above shows a fragment of the Customer class. This is a simple POJO (Plain Old Java Object) having properties like the customer's ID, first and last name, email address, and a set of postal addresses. Let's look now at the Order document. Java public class Order { private Long id; private String customerId; private Address shippingAddress; private Address billingAddress; private Set<String> orderItemSet = new HashSet<>() ... } Here you can notice some differences compared to the MongoDB version. As a matter of fact, with MongoDB, we were using a reference to the customer instance associated with this order. This notion of reference doesn't exist with Elasticsearch and, hence, we're using this document ID to create an association between the order and the customer who placed it. The same applies to the orderItemSet property which creates an association between the order and its items.The rest of our domain model is quite similar and based on the same normalization ideas. For example, the OrderItem document: Java public class OrderItem { private String id; private String productId; private BigDecimal price; private int amount; ... } Here, we need to associate the product which makes the object of the current order item. Last but not least, we have the Product document: Java public class Product { private String id; private String name, description; private BigDecimal price; private Map<String, String> attributes = new HashMap<>(); ... } The Data Repositories Quarkus Panache greatly simplifies the data persistence process by supporting both the active record and the repository design patterns. In Part 1, we used the Quarkus Panache extension for MongoDB to implement our data repositories, but there is not yet an equivalent Quarkus Panache extension for Elasticsearch. Accordingly, waiting for a possible future Quarkus extension for Elasticsearch, here we have to manually implement our data repositories using the Elasticsearch dedicated client. Elasticsearch is written in Java and, consequently, it is not a surprise that it offers native support for invoking the Elasticsearch API using the Java client library. This library is based on fluent API builder design patterns and provides both synchronous and asynchronous processing models. It requires Java 8 at minimum. So, what do our fluent API builder-based data repositories look like? Below is an excerpt from the CustomerServiceImpl class which acts as a data repository for the Customer document. Java @ApplicationScoped public class CustomerServiceImpl implements CustomerService { private static final String INDEX = "customers"; @Inject ElasticsearchClient client; @Override public String doIndex(Customer customer) throws IOException { return client.index(IndexRequest.of(ir -> ir.index(INDEX).document(customer))).id(); } ... As we can see, our data repository implementation must be a CDI bean having an application scope. The Elasticsearch Java client is simply injected, thanks to the quarkus-elasticsearch-java-client Quarkus extension. This way avoids lots of bells and whistles that we would have had to use otherwise. The only thing we need to be able to inject the client is to declare the following property: Properties files quarkus.elasticsearch.hosts = elasticsearch:9200 Here, elasticsearch is the DNS (Domain Name Server) name that we associate with the Elastic search database server in the docker-compose.yaml file. 9200 is the TCP port number used by the server to listen for connections.The method doIndex() above creates a new index named customers if it doesn't exist and indexes (stores) into it a new document representing an instance of the class Customer. The indexing process is performed based on an IndexRequest accepting as input arguments the index name and the document body. As for the document ID, it is automatically generated and returned to the caller for further reference.The following method allows to retrieve the customer identified by the ID given as an input argument: Java ... @Override public Customer getCustomer(String id) throws IOException { GetResponse<Customer> getResponse = client.get(GetRequest.of(gr -> gr.index(INDEX).id(id)), Customer.class); return getResponse.found() ? getResponse.source() : null; } ... The principle is the same: using this fluent API builder pattern, we construct a GetRequest instance in a similar way that we did with the IndexRequest, and we run it against the Elasticsearch Java client. The other endpoints of our data repository, allowing us to perform full search operations or to update and delete customers, are designed the same way. Please take some time to look at the code to understand how things are working. The REST API Our MongoDB REST API interface was simple to implement, thanks to the quarkus-mongodb-rest-data-panache extension, in which the annotation processor automatically generated all the required endpoints. With Elasticsearch, we don't benefit yet from the same comfort and, hence, we need to manually implement it. That's not a big deal, as we can inject the previous data repositories, shown below: Java @Path("customers") @Produces(APPLICATION_JSON) @Consumes(APPLICATION_JSON) public class CustomerResourceImpl implements CustomerResource { @Inject CustomerService customerService; @Override public Response createCustomer(Customer customer, @Context UriInfo uriInfo) throws IOException { return Response.accepted(customerService.doIndex(customer)).build(); } @Override public Response findCustomerById(String id) throws IOException { return Response.ok().entity(customerService.getCustomer(id)).build(); } @Override public Response updateCustomer(Customer customer) throws IOException { customerService.modifyCustomer(customer); return Response.noContent().build(); } @Override public Response deleteCustomerById(String id) throws IOException { customerService.removeCustomerById(id); return Response.noContent().build(); } } This is the customer's REST API implementation. The other ones associated with orders, order items, and products are similar.Let's see now how to run and test the whole thing. Running and Testing Our Microservices Now that we looked at the details of our implementation, let's see how to run and test it. We chose to do it on behalf of the docker-compose utility. Here is the associated docker-compose.yml file: YAML version: "3.7" services: elasticsearch: image: elasticsearch:8.12.2 environment: node.name: node1 cluster.name: elasticsearch discovery.type: single-node bootstrap.memory_lock: "true" xpack.security.enabled: "false" path.repo: /usr/share/elasticsearch/backups ES_JAVA_OPTS: -Xms512m -Xmx512m hostname: elasticsearch container_name: elasticsearch ports: - "9200:9200" - "9300:9300" ulimits: memlock: soft: -1 hard: -1 volumes: - node1-data:/usr/share/elasticsearch/data networks: - elasticsearch kibana: image: docker.elastic.co/kibana/kibana:8.6.2 hostname: kibana container_name: kibana environment: - elasticsearch.url=http://elasticsearch:9200 - csp.strict=false ulimits: memlock: soft: -1 hard: -1 ports: - 5601:5601 networks: - elasticsearch depends_on: - elasticsearch links: - elasticsearch:elasticsearch docstore: image: quarkus-nosql-tests/docstore-elasticsearch:1.0-SNAPSHOT depends_on: - elasticsearch - kibana hostname: docstore container_name: docstore links: - elasticsearch:elasticsearch - kibana:kibana ports: - "8080:8080" - "5005:5005" networks: - elasticsearch environment: JAVA_DEBUG: "true" JAVA_APP_DIR: /home/jboss JAVA_APP_JAR: quarkus-run.jar volumes: node1-data: driver: local networks: elasticsearch: This file instructs the docker-compose utility to run three services: A service named elasticsearch running the Elasticsearch 8.6.2 database A service named kibana running the multipurpose web console providing different options such as executing queries, creating aggregations, and developing dashboards and graphs A service named docstore running our Quarkus microservice Now, you may check that all the required processes are running: Shell $ docker ps CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES 005ab8ebf6c0 quarkus-nosql-tests/docstore-elasticsearch:1.0-SNAPSHOT "/opt/jboss/containe…" 3 days ago Up 3 days 0.0.0.0:5005->5005/tcp, :::5005->5005/tcp, 0.0.0.0:8080->8080/tcp, :::8080->8080/tcp, 8443/tcp docstore 9678c0a04307 docker.elastic.co/kibana/kibana:8.6.2 "/bin/tini -- /usr/l…" 3 days ago Up 3 days 0.0.0.0:5601->5601/tcp, :::5601->5601/tcp kibana 805eba38ff6c elasticsearch:8.12.2 "/bin/tini -- /usr/l…" 3 days ago Up 3 days 0.0.0.0:9200->9200/tcp, :::9200->9200/tcp, 0.0.0.0:9300->9300/tcp, :::9300->9300/tcp elasticsearch $ To confirm that the Elasticsearch server is available and able to run queries, you can connect to Kibana at http://localhost:601. After scrolling down the page and selecting Dev Tools in the preferences menu, you can run queries as shown below: In order to test the microservices, proceed as follows: 1. Clone the associated GitHub repository: Shell $ git clone https://github.com/nicolasduminil/docstore.git 2. Go to the project: Shell $ cd docstore 3. Checkout the right branch: Shell $ git checkout elastic-search 4. Build: Shell $ mvn clean install 5. Run the integration tests: Shell $ mvn -DskipTests=false failsafe:integration-test This last command will run the 17 provided integration tests, which should all succeed. You can also use the Swagger UI interface for testing purposes by firing your preferred browser at http://localhost:8080/q:swagger-ui. Then, in order to test endpoints, you can use the payload in the JSON files located in the src/resources/data directory of the docstore-api project.Enjoy!
In the landscape of data management and analytics, data lakes and data warehouses stand out as two foundational technologies. They serve distinct purposes and offer different advantages, each fitting various needs of organizations in handling big data. Understanding their differences, benefits, and trade-offs is essential for making informed decisions about which to use for specific data storage, management, and analysis needs. Data Lake A data lake is a centralized repository that allows for the storage of structured, semi-structured, and unstructured data at any scale. It can store data in its raw form without needing to first structure the data, making it highly flexible and scalable. Data lakes adopt a “schema-on-read” approach, meaning the data’s structure is not defined until the data is queried. This allows for storing vast amounts of raw, unstructured data from various sources, offering flexibility and adaptability for data analysis and discovery tasks. Data Lake representation Benefits Flexibility in data types and structures: Data lakes can store data in various formats, including logs, XML, JSON, and more. This versatility makes it ideal for organizations dealing with a wide array of data sources. Scalability and cost-effectiveness: With the ability to store vast amounts of data, data lakes leverage the scalability of cloud storage solutions, which can be more cost-effective than traditional data storage options. Advanced analytics and machine learning: Data lakes support big data analytics, machine learning models, and real-time analytics, providing deep insights and enabling data-driven decision-making. Trade-Offs Complex data management: Without proper governance and management, data lakes can become “data swamps,” where unorganized and outdated data makes it challenging to find and utilize information. Security and compliance risks: Managing access and ensuring security for a wide variety of data types can be complex, requiring sophisticated security measures to protect sensitive information. Data Warehouse A data warehouse is a system used for reporting and data analysis, acting as a repository of structured data extracted from various sources. The data is processed, transformed, and loaded into a structured format, making it suitable for querying and analysis. Data warehouses use a “schema-on-write” methodology, where data is cleansed, structured, and defined before storage. This ensures that the data is ready for querying and analysis, facilitating fast and reliable reporting but requiring upfront data modeling efforts. Data Warehouse representation Benefits Structured for easy access: Data is organized into schemas and optimized for SQL queries, making it easier for users to perform complex analyses and generate reports. High performance: Data warehouses are designed to handle complex queries efficiently. They support large volumes of data and numerous simultaneous queries, providing quick and reliable access to insights. Historical data analysis: They excel in storing historical data, enabling trend analysis over time, and helping in forecasting and decision-making. Data integrity and quality: The process of transforming data into a structured format ensures consistency, accuracy, and reliability of the data stored in data warehouses. Trade-Offs Constraints on data types: Data warehouses are less adaptable to unstructured data, requiring data to be converted into a structured format before it can be stored and analyzed. Cost and complexity in scaling: Traditional data warehouses can be expensive and complex to scale, especially as data volume grows. To understand this point, you can read my paper on the CAP theorem, which explains how databases are classified and their inherent limitations: Navigating the CAP Theorem: In search of the perfect database Longer setup and integration time: Setting up a data warehouse and integrating various data sources can be time-consuming, requiring significant upfront investment in planning and development. Conclusion Both data lakes and data warehouses offer valuable capabilities for data storage, management, and analysis. The choice between them depends on the specific needs of an organization, such as the types of data being dealt with, the intended use of the data, and the desired balance between flexibility and structure. For organizations prioritizing flexibility in handling various data types and formats, and focusing on advanced analytics, a data lake might be the more suitable option. On the other hand, for those requiring fast, reliable access to structured data for reporting and historical analysis, a data warehouse could be the better choice. In many cases, organizations find value in utilizing both technologies in a complementary manner, leveraging the strengths of each to meet their comprehensive data management and analysis needs. This hybrid approach ensures that businesses can harness the power of their data effectively, driving insights and decisions that propel them forward.
Columnar storage is a commonly used storage technique. Often, it implies high performance and has basically become a standard configuration for today’s analytical databases. Understanding Columnar Storage The basic principle of columnar storage is reducing the amount of data retrieved from the hard disk. A data table can have a lot of columns, but the computation may use only a very small number of them. With columnar storage, useless columns do not need to be retrieved, while with row-wise storage, all columns need to be scanned. When the retrieved columns only take up a very small part of the total, columnar storage has a big advantage in terms of IO time, and computation seems to get much faster. But the columnar storage also has another side – it isn’t the fastest for any scenario. Implementation Challenges of Columnar Storage Implementing columnar storage is much more complex than implementing row-wise storage because, for a data table, the number of columns can be determined in advance, but the number of rows will not stop growing. With row-wise storage, we write and append data to the table according to the order of records. It is easy to store the data table as a single file. But this does not work for data stored in columnar format. As there will be data appending, we cannot know the number of rows beforehand, and it is thus impossible to finish writing a column and then the next. Generally, we divide the storage space into a number of blocks, write a fixed number of rows (represented by N) to one block, and then move to the next when finish the writing. Later, data will be retrieved block by block. In each block, data is stored in the column-wise format, while between blocks, data can be regarded as stored row-wise. A special management module, where a table of contents is used to record information on the continuously growing data blocks and every column they store, is needed, causing a lot of inconveniences. So, it is difficult to implement columnar storage in a single data file. The storage schema is usually adopted by special data warehouse products. However, the block storage mechanism is unfriendly to the implementation of parallel processing when the data amount is not large. Parallel processing requires that data be divided into multiple segments. To be able to do this, there are two conditions: an almost equal amount of data in each segment (equal processing load for each thread) and the ability for flexible segmentation (the number of parallel tasks cannot be determined beforehand). Row-wise data can be segmented according to the number of rows, and parallel processing becomes feasible even for a very small amount of data. Column-wise data can only be divided into blocks, where data cannot be further divided. The number of records (the above-mentioned N) should not be too small; otherwise, too many resources will be wasted due to the existence of the smallest disk retrieval unit. In the extreme case of N=1, the storage schema is equal to row-wise storage. When N is too small, and the total amount of data involved is huge, the table of contents becomes very large and overburdens the content management. So, N is usually specified as one million or above. In order to segment data flexibly, there need to be at least hundreds of data blocks. That is to say, the parallel computation on column-wise data becomes smooth only when the total amount reaches at least hundreds of millions of data rows. esProc SPL offers the double increment segmentation strategy to make N grow as the data amount increases while maintaining the same number of data blocks. This way, the size of the table of contents can also be fixed, the columnar storage can be conveniently implemented in a single file, and flexible segmentation can be implemented for performing parallel computation on a small amount of data. According to the principle of columnar storage, the storage schema brings an obvious advantage only when the computation involves a relatively small number of columns. Many performance test cases (such as TPCH used as the international standard) choose such computing scenarios so they are convenient for bringing out the advantages of columnar databases. Those are only a part of the real-life business scenarios. In the finance industry, it is not rare that a computation involves most of the columns in a table having over one hundred columns. In that case, columnar storage only gives half-play to its advantage. Even if columnar storage has a higher compression ratio and a smaller amount of data retrieved than row-wise storage, its advantage is not that noticeable when many columns participate in the computation. After all, the process of retrieving data stored column-wise is much more complex than that of retrieving row-wised stored data. Therefore, when a real-world computation does not have as good performance as the test case gets, it is normal, and this does not mean that the test result is fake. Performance Considerations in Columnar Storage Columnar storage also leads to random disk accesses. Data in each column is stored continuously, but data in different columns isn’t. The more columns that are retrieved, the more serious the degree of randomness resulting from the retrieval, even with a single-thread task. For SSDs, it isn’t a very serious problem because when data in each column is continuous, and the above-mentioned N is big enough, the retrieval cost takes up a very small proportion, and the SSD does not have the seek time. But for HDDs that have the seek time, the problem becomes disastrous. When a lot of columns are accessed, it is probable that the performance is not even as good as that of the row-wise storage. Both concurrency and parallel processing will worsen the problem. On the other hand, increasing the size of the cache to alleviate the problem will occupy too much memory space. Be cautious when you try to use the columnar storage on HDDs. Another big problem with columnar storage is that it has a much lower indexing performance than row-wise storage. As we said, the index table stores ordered key values and positions of their corresponding records in the original table. For row-wise storage, the position of a record can be represented by one number, but for columnar storage, each column in a record has a different position, and, in principle, these positions should all be recorded. This creates an index table almost as big as the original table, which leads to heavy storage utilization and high retrieval costs. There isn’t much difference between this and the method of copying the original table and sorting it. Choosing the Right Storage Schema Of course, no one will do that in real-world practices. The general approach is still the previously mentioned block storage mechanism. The index only stores ordinal numbers of the records. The search reads an ordinal number from the index table, locates the corresponding block, “counts” from the first record to the one with the corresponding ordinal number in the block, and retrieves the column value. The “count” action is performed on each column. In the best-case scenario, a number of disk units equal to the number of columns will be read; if you are not lucky, the whole block will be scanned. By contrast, an index for row-wise storage generally only needs to read one or two disk units (determined by the space the records occupy). The amount of data retrieved under columnar storage is dozens of, even one hundred, times more than that under row-wise storage. With HDDs, there is also the unbearable seek time. Therefore, the columnar storage basically cannot handle the high-concurrency query requirements. Use the columnar storage for traversal and the row-wise storage for search. For data on which both traversal and search require high performance, it is even necessary to store two copies of data redundantly. The data platform should permit programmers to adopt the most suitable storage schema for each computing scenario rather than making the same decision for all scenarios. Users have the flexibility to select the most suitable storage schema based on their needs, along with implementing efficient indexing strategies to enhance search capabilities within their database systems. Conclusion In conclusion, while columnar storage offers advantages like reduced data retrieval and improved IO time, its implementation complexity, challenges with random disk accesses, and lower indexing performance compared to row-wise storage require careful consideration. Organizations should choose storage schemas based on their specific needs, balancing the benefits of columnar storage for traversal and row-wise storage for search to optimize performance effectively. Understanding these principles and navigating implementation challenges are key to leveraging columnar storage effectively for analytical database needs.
Miguel Garcia
VP of Engineering,
Nextail Labs
Gautam Goswami
Founder,
DataView