Big data comprises datasets that are massive, varied, complex, and can't be handled traditionally. Big data can include both structured and unstructured data, and it is often stored in data lakes or data warehouses. As organizations grow, big data becomes increasingly more crucial for gathering business insights and analytics. The Big Data Zone contains the resources you need for understanding data storage, data modeling, ELT, ETL, and more.
Any trustworthy data streaming pipeline needs to be able to identify and handle faults. Exceptionally while IoT devices ingest endlessly critical data/events into permanent persistence storage like RDBMS for future analysis via multi-node Apache Kafka cluster. (Please click here to read how to set up a multi-node Apache Kafka Cluster). There could be scenarios where IoT devices might send fault/bad events due to various reasons at the source points, and henceforth appropriate actions can be executed to correct it further. The Apache Kafka architecture does not include any filtering and error handling mechanism within the broker so that maximum performance/scale can be yielded. Instead, it is included in Kafka Connect, which is an integration framework of Apache Kafka. As a default behavior, if a problem arises as a result of consuming an invalid message, the Kafka Connect task terminates, and the same applies to JDBC Sink Connector. Kafka Connect has been classified into two categories, namely Source (to ingest data from various data generation sources and transport to the topic) and Sink (to consume data/messages from the topic and send them eventually to various destinations). Without implementing a strict filtering mechanism or exception handling, we can ingest/publishes messages inclusive of wrong formatted to the Kafka topic because the Kafka topic accepts all messages or records as byte arrays in key-value pairs. But by default, the Kafka Connect task stops if an error occurs because of consuming an invalid message, and on top of that JDBC sink connector additionally won’t work if there is an ambiguity in the message schema. The biggest difficulty with the JDBC sink connector is that it requires knowledge of the schema of data that has already landed on the Kafka topic. Schema Registry must therefore be integrated as a separate component with the exiting Kafka cluster to transfer the data into the RDBMS. Therefore, to sink data from the Kafka topic to the RDBMS, the producers must publish messages/data containing the schema. You could read here to learn streaming Data via Kafka JDBC Sink Connector without leveraging Schema Registry from the Kafka topic. Since Apache Kafka 2.0, Kafka Connect has integrated error management features, such as the ability to reroute messages to a dead letter queue. In the Kafka cluster, a dead letter queue (DLQ) is a straightforward topic that serves as the destination for messages that, for some reason, were unable to reach their intended recipients, especially for JDBC sink connector, tables in RDBMS There might be two major reasons why the JDBC Kafka sink connector stops working abruptly while consuming messages from the topic: Ambiguity between data types and the actual payload Junk data in payload or wrong schema There is no complicacy of DLQ configuration in the JDBC sink connector. The following parameters need to be added in the sink configuration file (.properties file): errors.tolerance=allerrors.deadletterqueue.topic.name= <<Name of the DLQ Toic>>errors.deadletterqueue.topic.replication.factor= <<No of replication>>Note:- No of replication should be equal or less then the number of Kafka broker in the cluster. The DLQ topic would be created automatically with the above-mentioned replication factor when we start the JDBC sink connector for the first time. When an error occurs, or bad data is encountered by the JDBC sink connector while consuming messages from the topic, these unprocessed messages/bad data would be forwarded straightly to the DLQ. Subsequently, correct messages or data will send to the respective RDBMS tables continuously and again in between. If bad messages are encountered, then the same would be forwarded to the DLQ and so on. After landing the bad or erroneous messages on the DLQ, we will have two options either manually introspect each message to understand the root cause of the error or implement a mechanism to reprocess the bad messages and push them eventually to the consumers for JDBC sink connector the destination should be RDBMS tables. Dead letter queues are not enabled by default in Kafka Connect due to the above reason. Even though Kafka Connect supports several error-handling strategies, such as dead letter queues, silently ignoring, and failing quickly, the adoption of DLQ would be the best approach while configuring the JDBC sink connector. Decoupling completely the bad/error messages handling from the normal messages/data transportation from the Kafka topic would boost the overall efficiency of the entire system as well as allow the development team to develop an independent error handling mechanism from easy maintainability perspectives. Hope you have enjoyed this read. Please like and share if you feel this composition is valuable.
As data becomes increasingly important for businesses, the need for scalable, efficient, and cost-effective data storage and processing solutions is more critical than ever. Data Lakehouses have emerged as a powerful tool to help organizations harness the benefits of both Data Lakes and Data Warehouses. In the first article, we highlighted key benefits of Data Lakehouses for businesses, while the second article delved into the architectural details. In this article, we will focus on three popular Data Lakehouse solutions: Delta Lake, Apache Hudi, and Apache Iceberg. We will explore the key features, strengths, and weaknesses of each solution to help you make an informed decision about the best fit for your organization's data management needs. Data Lakehouse Innovations: Exploring the Genesis and Features of Delta Lake, Apache Hudi, and Apache Iceberg The three Data Lakehouse solutions we will discuss in this article — Delta Lake, Apache Hudi, and Apache Iceberg — have all emerged to address the challenges of managing massive amounts of data and providing efficient query performance for big data workloads. Although they share some common goals and characteristics, each solution has its unique features, strengths, and weaknesses. Delta Lake was created by Databricks and is built on top of Apache Spark, a popular distributed computing system for big data processing. It was designed to bring ACID transactions, scalable metadata handling, and unification of batch and streaming data processing to Data Lakes. Delta Lake has quickly gained traction in the big data community due to its compatibility with a wide range of data platforms and tools, as well as its seamless integration with the Apache Spark ecosystem. Apache Hudi (Hadoop Upserts Deletes and Incrementals) is an open-source project developed by Uber to efficiently manage large-scale analytical datasets on Hadoop-compatible distributed storage systems. Hudi provides upserts and incremental processing capabilities to handle real-time data ingestion, allowing for faster data processing and improved query performance. With its flexible storage and indexing mechanisms, Hudi supports a wide range of analytical workloads and data processing pipelines. Apache Iceberg is an open table format for large-scale, high-performance data management, initially developed by Netflix. Iceberg aims to provide a more robust and efficient foundation for data lake storage, addressing the limitations of existing storage solutions like Apache Hive and Apache Parquet. One of its most significant innovations is the use of a flexible and powerful schema evolution mechanism, which allows users to evolve table schema without rewriting existing data. Iceberg also focuses on improving metadata management, making it scalable and efficient for very large datasets. Each of these solutions has evolved in response to specific needs and challenges in the big data landscape, and they all bring valuable innovations to the Data Lakehouse concept. In the following sections, we will delve into the technical aspects of each solution, examining their data storage and file formats, data versioning and history, data processing capabilities, query performance optimizations, and the technologies and infrastructure required for their deployment. Navigating Delta Lake: Key Aspects of Data Storage, Processing, and Access Delta Lake employs the open-source Parquet file format, a columnar storage format optimized for analytical workloads. It enhances the format by introducing an ACID transaction log, which maintains a record of all operations performed on the dataset. This transaction log, combined with the file storage structure, ensures reliability and consistency in the data. Data versioning and history are essential aspects of Delta Lake, enabling users to track changes and roll back to previous versions if necessary. The transaction log records every operation, thus providing a historical view of the data and allowing for time-travel queries. Delta Lake ensures efficient query performance by implementing various optimization techniques. One such technique is data compaction, which combines small files into larger ones to improve read performance. Furthermore, it employs a mechanism called Z-Ordering to optimize the organization of data on disk, which reduces the amount of data read during queries. For data access, Delta Lake provides a simple and unified API to read and query data from the tables. You can use time-travel queries to access historical versions of your data or perform complex analytical operations using the supported query engines. To store data in Delta Lake format, data must first be processed and saved in the appropriate file format. Here's an example code snippet for writing data to a Delta Lake table using Apache Spark with following reading: Python from pyspark.sql import SparkSession spark = SparkSession.builder \ .appName("Delta Lake Write and Read Example") \ .config("spark.jars.packages", "io.delta:delta-core_2.12:2.3.0") \ .getOrCreate() # Read data from a source, e.g., a CSV file data = spark.read.format("csv").load("/path/to/csv-file") # Write data to a Delta Lake table data.write.format("delta") \ .mode("overwrite") \ .save("/path/to/delta-lake-table") # Read data from the Delta Lake table delta_data = spark.read.format("delta") \ .load("/path/to/delta-lake-table") # Perform some transformations and actions on the data result = delta_data.filter("some_condition").groupBy("some_column").count() result.show() In the code snippet above, we use the "delta" format to write and read data to and from a Delta Lake table. The Delta Lake library is included in the Spark session by adding the "io.delta:delta-core_2.12:2.3.0" package to the "spark.jars.packages" configuration. Delta Lake supports a wide range of query engines, including Apache Spark, Databricks Runtime, and Presto. It also provides APIs for programming languages such as Scala, Python, SQL, and Java, enabling seamless integration with existing data processing pipelines. Delta Lake integrates with various data platforms and tools, such as Apache Hive, Apache Flink, and Apache Kafka. In terms of deployment, it can be utilized in on-premises environments, as well as in cloud platforms like AWS, Azure, and GCP. For storage, Delta Lake can work with distributed file systems like HDFS or cloud-based storage services such as Amazon S3, Azure Data Lake Storage, and Google Cloud Storage. Data Management in Apache Hudi: Exploring Its Core Components Apache Hudi is another powerful Data Lakehouse solution that provides efficient data storage and querying capabilities. Like Delta Lake, it also uses Parquet as its underlying file format and adds a transaction log for ACID compliance. Hudi's storage management system enables upserts, incremental processing, and rollback support, allowing for efficient data ingestion and access. One of the key aspects of Apache Hudi is its built-in support for data partitioning, which helps optimize query performance by reducing the amount of data scanned during query execution. Hudi also provides a mechanism called "indexing" to enable fast record-level lookups, updates, and deletes. Hudi supports various query engines, including Apache Spark, Apache Hive, and Presto, and offers APIs for languages like Scala, Python, SQL, and Java. This flexibility ensures seamless integration with your existing data processing infrastructure. To write and read data using Apache Hudi, you can use the following code snippet: Python from pyspark.sql import SparkSession spark = SparkSession.builder \ .appName("Apache Hudi Write and Read Example") \ .config("spark.jars", "path/to/hudi-spark-bundle.jar") \ .getOrCreate() # Read data from a source, e.g., a CSV file data = spark.read.format("csv").load("/path/to/csv-file") # Write data to an Apache Hudi table data.write.format("org.apache.hudi") \ .options(get_hudi_options()) \ .mode("overwrite") \ .save("/path/to/hudi-table") # Read data from the Apache Hudi table hudi_data = spark.read.format("org.apache.hudi") \ .load("/path/to/hudi-table/*") # Perform some transformations and actions on the data result = hudi_data.filter("some_condition").groupBy("some_column").count() result.show() In the example above, the "org.apache.hudi" format is specified for writing data to an Apache Hudi table. The required Hudi library is added to the Spark session by specifying the "hudi-spark-bundle.jar" path in the "spark.jars" configuration. Apache Iceberg Basics: A Journey Through Data Management Fundamentals Apache Iceberg is a relatively new addition to the Data Lakehouse landscape. It is an open table format that provides strong consistency, snapshot isolation, and efficient query performance. Like Delta Lake and Apache Hudi, Iceberg also uses Parquet as its underlying file format and builds additional features on top of it. Iceberg's schema evolution mechanism is one of its most significant innovations. It allows users to evolve table schema without the need to rewrite existing data. This capability makes it possible to add, delete, or update columns in a table while preserving the existing data layout. Another key aspect of Iceberg is its scalable and efficient metadata management system. It uses a combination of manifest files and metadata tables to store information about table data, making it easier to manage large datasets. Iceberg optimizes query performance by employing techniques like predicate pushdown, which reduces the amount of data read during query execution. Iceberg supports a variety of query engines, including Apache Spark, Apache Flink, and Trino (formerly known as PrestoSQL). It also provides APIs for programming languages such as Scala, Python, SQL, and Java, ensuring seamless integration with your existing data processing infrastructure. To write and read data using Apache Iceberg, you can use the following code snippet: Python from pyspark.sql import SparkSession spark = SparkSession.builder \ .appName("Apache Iceberg Write and Read Demonstration") \ .config("spark.jars.packages", "org.apache.iceberg:iceberg-spark3-runtime:0.13.2") \ .getOrCreate() # Load data from a source, such as a CSV file data = spark.read.format("csv").load("/path/to/csv-file") # Write data to an Apache Iceberg table data.write.format("iceberg") \ .mode("overwrite") \ .save("iceberg_catalog_namespace.table_name") # Load data from the Apache Iceberg table iceberg_data = spark.read.format("iceberg") \ .load("iceberg_catalog_namespace.table_name") # Apply transformations and actions to the data result = iceberg_data.filter("some_condition").groupBy("some_column").count() result.show() In the example above, the "iceberg" format is specified for writing data to an Apache Iceberg table. The Iceberg library is included in the Spark session by adding the "org.apache.iceberg:iceberg-spark3-runtime:0.13.2" package to the "spark.jars.packages" configuration. Iceberg can be deployed in on-premises environments or cloud platforms like AWS, Azure, and GCP. It supports various storage systems, including distributed file systems like HDFS and cloud-based storage services such as Amazon S3, Azure Data Lake Storage, and Google Cloud Storage. Weighing the Pros and Cons: Analyzing Delta Lake, Apache Hudi, and Apache Iceberg To help you make an informed decision about which Data Lakehouse solution is best for your organization, we have compared the features of Delta Lake, Apache Hudi, and Apache Iceberg using a set of factors. In the table below, each factor is evaluated as supported (+), unsupported (-), or partly supported (±). CRITERION DELTA LAKE APACHE HUDI APACHE ICEBERG ACID Transactions + + + Schema Evolution + + + Time Travel (Data Versioning) + + + Data Partitioning + + + Upserts and Deletes + + + Incremental Processing + + +- Data Deduplication +- + +- Metadata Scalability + +- + Compaction Management +- + + Merge on Read and Copy on Write Storage - + - Query Optimization Techniques + +- + Support for Multiple Query Engines + + + Integration with Other Data Platforms and Tools + + + Cloud-native Storage Compatibility + + + Ease of Deployment and Management + +- + This table provides a high-level comparison of the features supported by Delta Lake, Apache Hudi, and Apache Iceberg. It is important to note that each solution has its unique strengths and trade-offs, and the best choice for a specific use case depends on the organization's requirements, existing infrastructure, and familiarity with the technologies involved. Summing Up the Data Lakehouse Landscape: Key Insights and Analysis In conclusion, the Data Lakehouse concept has emerged as a promising solution to address the challenges of traditional data warehouses and data lakes, providing a unified platform for scalable, reliable, and efficient data management. As organizations strive to harness the power of their data, selecting the right Data Lakehouse solution becomes crucial for optimizing performance and adaptability. Throughout this comparison, we have examined the key aspects of three prominent Data Lakehouse solutions: Delta Lake, Apache Hudi, and Apache Iceberg. Each of these solutions has its unique strengths and trade-offs, catering to a variety of use cases and requirements. By assessing their data storage, processing, and access capabilities, as well as their integration with existing technologies and infrastructure, organizations can make informed decisions on which solution best aligns with their needs. While the comparison table highlights the high-level differences between Delta Lake, Apache Hudi, and Apache Iceberg, it is essential to consider the specific requirements and constraints of each organization. Factors such as ease of deployment, compatibility with current infrastructure, and familiarity with the underlying technologies can significantly impact the success of a Data Lakehouse implementation. In our next article, we will delve deeper into the technologies used for implementing Data Lakehouses, exploring the underlying mechanisms, tools, and best practices that can help organizations optimize their data management strategies. Ultimately, the choice between Delta Lake, Apache Hudi, and Apache Iceberg will depend on a careful evaluation of their respective features, trade-offs, and alignment with the organization's objectives. By thoroughly understanding the capabilities of each solution, organizations can ensure a future-proof Data Lakehouse infrastructure that facilitates data-driven decision-making and unlocks new insights to drive business growth.
In the world of big data and analytics, processing and managing vast amounts of data efficiently and effectively is a critical challenge. Data engineers play a pivotal role in designing and implementing solutions to handle this data deluge. One such approach that has gained popularity in recent years is the Lambda Architecture, a powerful framework for building scalable and robust data processing pipelines. In this article, we will explore the Lambda Architecture in detail, understanding its key concepts, benefits, and challenges. What Is Lambda Architecture? The Lambda Architecture is a data processing architecture that combines batch processing with real-time/stream processing to handle large volumes of data in a distributed and fault-tolerant manner. It was introduced by Nathan Marz in his book "Big Data: Principles and best practices of scalable real-time data systems" and has since become a widely adopted approach in the field of data engineering. The Lambda Architecture follows a "speed layer" and a "batch layer" approach, where data is processed in parallel through both layers, and the results are combined to produce a single output. The speed layer deals with real-time data processing and provides low-latency responses, while the batch layer handles large-scale data processing and provides comprehensive results. The combination of both layers allows for near-real-time processing of incoming data while also supporting historical data analysis. Key Concepts of Lambda Architecture The Lambda Architecture is based on a few fundamental concepts that make it unique and powerful: Batch Layer: The batch layer is responsible for processing and analyzing large volumes of data in batch mode. It can handle data in a distributed and parallel manner, making it highly scalable. Typically, it uses batch processing frameworks such as Apache Hadoop or Apache Spark to process data stored in distributed file systems like Hadoop Distributed File System (HDFS) or Amazon S3. The batch layer generates batch views, which are immutable and historical representations of the data. Speed Layer: The speed layer is responsible for processing and analyzing real-time data streams in near real-time. It deals with high-velocity data and provides low-latency responses. It uses stream processing frameworks such as Apache Kafka or Apache Flink to process data in real time as it arrives. The speed layer generates real-time views, which are continuously updated and provide up-to-date insights. Serving Layer: The serving layer is responsible for serving the results generated by the batch and speed layers to the end users. It combines batch views and real-time views to provide a comprehensive view of the data. The serving layer uses technologies like Apache Cassandra or Apache HBase to store and serve the computed results in a distributed and fault-tolerant manner. Data Lake: The data lake is a central repository that stores all the raw and processed data. It acts as the source of truth for the Lambda Architecture, providing a scalable and durable storage solution for all the data ingested into the system. Popular data lake technologies include Apache Hadoop, Amazon S3, and Google Cloud Storage. Benefits of Lambda Architecture The Lambda Architecture offers several benefits that make it a popular choice for data engineering: Scalability: The Lambda Architecture is highly scalable, as it can process large volumes of data in a distributed and parallel manner. This makes it suitable for handling big data workloads and allows for horizontal scaling as data volumes grow. Fault-tolerance: The Lambda Architecture is designed to be fault-tolerant, as it replicates data across multiple nodes and uses distributed file systems and databases. This ensures high availability and data durability, even in the presence of hardware failures or other issues. Real-time processing: The Lambda Architecture allows for the processing of real-time data streams, providing low-latency responses and enabling near-real-time analytics. This is crucial for use cases that require real-time insights and actions, such as fraud detection, anomaly detection, recommendation systems, and IoT applications. Flexibility: The Lambda Architecture provides flexibility in data processing, as it allows for both batch processing and real-time/stream processing. This enables organizations to handle a wide variety of data types, including structured and unstructured data, and process them in a way that best suits their needs. Data integrity: The Lambda Architecture ensures data integrity by maintaining immutable batch views and continuously updated real-time views. This makes it easier to trace and audit changes in the data over time, ensuring data consistency and reliability. Extensibility: The Lambda Architecture is highly extensible, as it allows for incorporating new data sources, processing frameworks, or analytics algorithms as needed. This makes it adaptable to changing business requirements and evolving data landscapes. Challenges of Lambda Architecture While Lambda Architecture offers many benefits, it also comes with some challenges: Complexity: The Lambda Architecture can be complex to implement and manage, as it requires a combination of batch processing, real-time/stream processing, and serving layer technologies. This may require specialized skills and expertise in different technologies, making it challenging to set up and maintain. Data consistency: Maintaining consistency between batch views and real-time views can be challenging, as batch processing and real-time/stream processing may produce different results due to differences in processing times and windowing techniques. Ensuring data consistency across both layers requires careful attention to data synchronization and versioning. System complexity: The Lambda Architecture introduces additional complexity in managing and monitoring multiple layers, such as the batch layer, speed layer, serving layer, and data lake. This may require sophisticated monitoring, logging, and alerting mechanisms to ensure smooth operations and timely issue detection. Operational overhead: Managing a distributed and fault-tolerant system like the Lambda Architecture may require additional operational overhead, such as setting up and managing clusters, monitoring performance, optimizing resource utilization, and handling failures. This may require additional resources and effort to manage the system effectively. Conclusion The Lambda Architecture is a powerful approach to data engineering that combines batch processing and real-time/stream processing to handle large volumes of data in a distributed and fault-tolerant manner. It offers benefits such as scalability, fault tolerance, real-time processing, flexibility, data integrity, and extensibility. However, it also comes with challenges such as complexity, data consistency, system complexity, and operational overhead. Organizations need to carefully consider their specific requirements, resources, and expertise before implementing the Lambda Architecture. When implemented correctly, the Lambda Architecture can provide a robust and scalable solution for processing big data and generating valuable insights in real-time and batch mode.
Spark is an analytics engine for large-scale data engineering. Despite its long history, it still has its well-deserved place in the big data landscape. QuestDB, on the other hand, is a time-series database with a very high data ingestion rate. This means that Spark desperately needs data, a lot of it! ...and QuestDB has it, a match made in heaven. Of course, there are pandas for data analytics! The key here is the expression large-scale. Unlike pandas, Spark is a distributed system and can scale really well. What does this mean exactly? Let's take a look at how data is processed in Spark. For the purposes of this article, we only need to know that a Spark job consists of multiple tasks, and each task works on a single data partition. Tasks are executed parallel in stages, distributed on the cluster. Stages have a dependency on the previous ones; tasks from different stages cannot run in parallel. The schematic diagram below depicts an example job: In this tutorial, we will load data from a QuestDB table into a Spark application and explore the inner working of Spark to refine data loading. Finally, we will modify and save the data back to QuestDB. Loading Data to Spark First thing first, we need to load time-series data from QuestDB. I will use an existing table, trades, with just over 1.3 million rows. It contains bitcoin trades spanning over 3 days: not exactly a big data scenario but good enough to experiment. The table contains the following columns: Column Name Column Type symbol SYMBOL side SYMBOL price DOUBLE amount DOUBLE timestamp TIMESTAMP The table is partitioned by day and the timestamp column serves as the designated timestamp. QuestDB accepts connections via Postgres wire protocol, so we can use JDBC to integrate. You can choose from various languages to create Spark applications, and here we will go for Python. Create the script, sparktest.py: Python from pyspark.sql import SparkSession # create Spark session spark = SparkSession.builder.appName("questdb_test").getOrCreate() # load 'trades' table into the dataframe df = spark.read.format("jdbc") \ .option("url", "jdbc:postgresql://localhost:8812/questdb") \ .option("driver", "org.postgresql.Driver") \ .option("user", "admin").option("password", "quest") \ .option("dbtable", "trades") \ .load() # print the number of rows print(df.count()) # do some filtering and print the first 3 rows of the data df = df.filter(df.symbol == 'BTC-USD').filter(df.side == 'buy') df.show(3, truncate=False) Believe it or not, this tiny application already reads data from the database when submitted as a Spark job. Shell spark-submit --jars postgresql-42.5.1.jar sparktest.py The job prints the following row count: Shell 1322570 And these are the first 3 rows of the filtered table: Shell +-------+----+--------+---------+--------------------------+ |symbol |side|price |amount |timestamp | +-------+----+--------+---------+--------------------------+ |BTC-USD|buy |23128.72|6.4065E-4|2023-02-01 00:00:00.141334| |BTC-USD|buy |23128.33|3.7407E-4|2023-02-01 00:00:01.169344| |BTC-USD|buy |23128.33|8.1796E-4|2023-02-01 00:00:01.184992| +-------+----+--------+---------+--------------------------+ only showing top 3 rows Although sparktest.py speaks for itself, it is still worth mentioning that this application has a dependency on the JDBC driver located in postgresql-42.5.1.jar. It cannot run without this dependency; hence it has to be submitted to Spark together with the application. Optimizing Data Loading With Spark We have loaded data into Spark. Now we will look at how this was completed and some aspects to consider. The easiest way to peek under the hood is to check QuestDB's log, which should tell us how Spark interacted with the database. We will also make use of the Spark UI, which displays useful insights of the execution, including stages and tasks. Spark Connection to QuestDB: Spark Is Lazy QuestDB log shows that Spark connected three times to the database. For simplicity, I only show the relevant lines in the log: Shell 2023-03-21T21:12:24.031443Z I pg-server connected [ip=127.0.0.1, fd=129] 2023-03-21T21:12:24.060520Z I i.q.c.p.PGConnectionContext parse [fd=129, q=SELECT * FROM trades WHERE 1=0] 2023-03-21T21:12:24.072262Z I pg-server disconnected [ip=127.0.0.1, fd=129, src=queue] Spark first queried the database when we created the DataFrame, but as it turns out, it was not too interested in the data itself. The query looked like this: SELECT * FROM trades WHERE 1=0 The only thing Spark wanted to know was the schema of the table in order to create an empty DataFrame. Spark evaluates expressions lazily and only does the bare minimum required at each step. After all, it is meant to analyze big data, so resources are incredibly precious for Spark. Especially memory: data is not cached by default. The second connection happened when Spark counted the rows of the DataFrame. It did not query the data this time, either. Interestingly, instead of pushing the aggregation down to the database by running SELECT count(*) FROM trades, it just queried a 1 for each record: SELECT 1 FROM trades. Spark adds the 1s together to get the actual count. Shell 2023-03-21T21:12:25.692098Z I pg-server connected [ip=127.0.0.1, fd=129] 2023-03-21T21:12:25.693863Z I i.q.c.p.PGConnectionContext parse [fd=129, q=SELECT 1 FROM trades ] 2023-03-21T21:12:25.695284Z I i.q.c.TableReader open partition /Users/imre/Work/dbroot/db/trades~10/2023-02-01.2 [rowCount=487309, partitionNameTxn=2, transientRowCount=377783, partitionIndex=0, partitionCount=3] 2023-03-21T21:12:25.749986Z I i.q.c.TableReader open partition /Users/imre/Work/dbroot/db/trades~10/2023-02-02.1 [rowCount=457478, partitionNameTxn=1, transientRowCount=377783, partitionIndex=1, partitionCount=3] 2023-03-21T21:12:25.800765Z I i.q.c.TableReader open partition /Users/imre/Work/dbroot/db/trades~10/2023-02-03.0 [rowCount=377783, partitionNameTxn=0, transientRowCount=377783, partitionIndex=2, partitionCount=3] 2023-03-21T21:12:25.881785Z I pg-server disconnected [ip=127.0.0.1, fd=129, src=queue] Working with the data itself eventually forced Spark to get a taste of the table's content too. Filters are pushed down to the database by default. Shell 2023-03-21T21:12:26.132570Z I pg-server connected [ip=127.0.0.1, fd=28] 2023-03-21T21:12:26.134355Z I i.q.c.p.PGConnectionContext parse [fd=28, q=SELECT "symbol","side","price","amount","timestamp" FROM trades WHERE ("symbol" IS NOT NULL) AND ("side" IS NOT NULL) AND ("symbol" = 'BTC-USD') AND ("side" = 'buy') ] 2023-03-21T21:12:26.739124Z I pg-server disconnected [ip=127.0.0.1, fd=28, src=queue] We can see that Spark's interaction with the database is rather sophisticated and optimized to achieve good performance without wasting resources. The Spark DataFrame is the key component that takes care of the optimization, and it deserves some further analysis. What Is a Spark DataFrame? The name DataFrame sounds like a container to hold data, but we have seen earlier that this is not really true. So, what is a Spark DataFrame, then? One way to look at Spark SQL, with the risk of oversimplifying it, is that it is a query engine. df.filter(predicate) is really just another way of saying WHERE predicate. With this in mind, the DataFrame is pretty much a query, or actually more like a query plan. Most databases come with functionality to display query plans, and Spark has it too! Let's check the plan for the above DataFrame we just created: Python df.explain(extended=True) Shell == Parsed Logical Plan == Filter (side#1 = buy) +- Filter (symbol#0 = BTC-USD) +- Relation [symbol#0,side#1,price#2,amount#3,timestamp#4] JDBCRelation(trades) [numPartitions=1] == Analyzed Logical Plan == symbol: string, side: string, price: double, amount: double, timestamp: timestamp Filter (side#1 = buy) +- Filter (symbol#0 = BTC-USD) +- Relation [symbol#0,side#1,price#2,amount#3,timestamp#4] JDBCRelation(trades) [numPartitions=1] == Optimized Logical Plan == Filter ((isnotnull(symbol#0) AND isnotnull(side#1)) AND ((symbol#0 = BTC-USD) AND (side#1 = buy))) +- Relation [symbol#0,side#1,price#2,amount#3,timestamp#4] JDBCRelation(trades) [numPartitions=1] == Physical Plan == *(1) Scan JDBCRelation(trades) [numPartitions=1] [symbol#0,side#1,price#2,amount#3,timestamp#4] PushedFilters: [*IsNotNull(symbol), *IsNotNull(side), *EqualTo(symbol,BTC-USD), *EqualTo(side,buy)], ReadSchema: struct<symbol:string,side:string,price:double,amount:double,timestamp:timestamp> If the DataFrame knows how to reproduce the data by remembering the execution plan, it does not need to store the actual data. This is precisely what we have seen earlier. Spark desperately tried not to load our data, but this can have disadvantages too. Caching Data Not caching the data radically reduces Spark's memory footprint, but there is a bit of jugglery here. Data does not have to be cached because the plan printed above can be executed again and again and again... Now imagine how a mere decently-sized Spark cluster could make our lonely QuestDB instance suffer martyrdom. With a massive table containing many partitions, Spark would generate a large number of tasks to be executed parallel across different nodes of the cluster. These tasks would query the table almost simultaneously, putting a heavy load on the database. So, if you find your colleagues cooking breakfast on your database servers, consider forcing Spark to cache some data to reduce the number of trips to the database. This can be done by calling df.cache(). In a large application, it might require a bit of thinking about what is worth caching and how to ensure that Spark executors have enough memory to store the data. In practice, you should consider caching smallish datasets used frequently throughout the application's life. Let's rerun our code with a tiny modification, adding .cache(): Python from pyspark.sql import SparkSession # create Spark session spark = SparkSession.builder.appName("questdb_test").getOrCreate() # load 'trades' table into the dataframe df = spark.read.format("jdbc") \ .option("url", "jdbc:postgresql://localhost:8812/questdb") \ .option("driver", "org.postgresql.Driver") \ .option("user", "admin").option("password", "quest") \ .option("dbtable", "trades") \ .load() \ .cache() # print the number of rows print(df.count()) # print the first 3 rows of the data df.show(3, truncate=False) This time Spark hit the database only twice. First, it came for the schema, the second time for the data: SELECT "symbol","side","price","amount","timestamp" FROM trades. Shell 2023-03-21T21:13:04.122390Z I pg-server connected [ip=127.0.0.1, fd=129] 2023-03-21T21:13:04.147353Z I i.q.c.p.PGConnectionContext parse [fd=129, q=SELECT * FROM trades WHERE 1=0] 2023-03-21T21:13:04.159470Z I pg-server disconnected [ip=127.0.0.1, fd=129, src=queue] 2023-03-21T21:13:05.873960Z I pg-server connected [ip=127.0.0.1, fd=129] 2023-03-21T21:13:05.875951Z I i.q.c.p.PGConnectionContext parse [fd=129, q=SELECT "symbol","side","price","amount","timestamp" FROM trades ] 2023-03-21T21:13:05.876615Z I i.q.c.TableReader open partition /Users/imre/Work/dbroot/db/trades~10/2023-02-01.2 [rowCount=487309, partitionNameTxn=2, transientRowCount=377783, partitionIndex=0, partitionCount=3] 2023-03-21T21:13:06.259472Z I i.q.c.TableReader open partition /Users/imre/Work/dbroot/db/trades~10/2023-02-02.1 [rowCount=457478, partitionNameTxn=1, transientRowCount=377783, partitionIndex=1, partitionCount=3] 2023-03-21T21:13:06.653769Z I i.q.c.TableReader open partition /Users/imre/Work/dbroot/db/trades~10/2023-02-03.0 [rowCount=377783, partitionNameTxn=0, transientRowCount=377783, partitionIndex=2, partitionCount=3] 2023-03-21T21:13:08.479209Z I pg-server disconnected [ip=127.0.0.1, fd=129, src=queue] Clearly, even a few carefully placed .cache() calls can improve the overall performance of an application, sometimes significantly. What else should we take into consideration when thinking about performance? Earlier, we mentioned that our Spark application consists of tasks, which are working on the different partitions of the data parallel. So, partitioned data mean parallelism, which results in better performance. Spark Data Partitioning Now we turn to the Spark UI. It tells us that the job was done in a single task: The truth is that we have already suspected this. The execution plan told us (numPartitions=1) and we did not see any parallelism in the QuestDB logs either. We can display more details about this partition with a bit of additional code: Python from pyspark.sql.functions import spark_partition_id, min, max, count df = df.withColumn("partitionId", spark_partition_id()) df.groupBy(df.partitionId) \ .agg(min(df.timestamp), max(df.timestamp), count(df.partitionId)) \ .show(truncate=False) Shell +-----------+--------------------------+--------------------------+------------------+ |partitionId|min(timestamp) |max(timestamp) |count(partitionId)| +-----------+--------------------------+--------------------------+------------------+ |0 |2023-02-01 00:00:00.078073|2023-02-03 23:59:59.801778|1322570 | +-----------+--------------------------+--------------------------+------------------+ The UI helps us confirm that the data is loaded as a single partition. QuestDB stores this data in 3 partitions. We should try to fix this. Although it is not recommended, we can try to use DataFrame.repartition(). This call reshuffles data across the cluster while partitioning the data, so it should be our last resort. After running df.repartition(3, df.timestamp), we see 3 partitions, but not exactly the way we expected. The partitions overlap with one another: Shell +-----------+--------------------------+--------------------------+------------------+ |partitionId|min(timestamp) |max(timestamp) |count(partitionId)| +-----------+--------------------------+--------------------------+------------------+ |0 |2023-02-01 00:00:00.698152|2023-02-03 23:59:59.801778|438550 | |1 |2023-02-01 00:00:00.078073|2023-02-03 23:59:57.188894|440362 | |2 |2023-02-01 00:00:00.828943|2023-02-03 23:59:59.309075|443658 | +-----------+--------------------------+--------------------------+------------------+ It seems that DataFrame.repartition() used hashes to distribute the rows across the 3 partitions. This would mean that all 3 tasks would require data from all 3 QuestDB partitions. Let's try this instead: df.repartitionByRange(3, "timestamp"): Shell +-----------+--------------------------+--------------------------+------------------+ |partitionId|min(timestamp) |max(timestamp) |count(partitionId)| +-----------+--------------------------+--------------------------+------------------+ |0 |2023-02-01 00:00:00.078073|2023-02-01 21:22:35.656399|429389 | |1 |2023-02-01 21:22:35.665599|2023-02-02 21:45:02.613339|470372 | |2 |2023-02-02 21:45:02.641778|2023-02-03 23:59:59.801778|422809 | +-----------+--------------------------+--------------------------+------------------+ This looks better but still not ideal. That is because DaraFrame.repartitionByRange() samples the dataset and then estimates the borders of the partitions. What we really want is for the DataFrame partitions to match exactly the partitions we see in QuestDB. This way, the tasks running parallel in Spark do not cross their way in QuestDB, likely to result in better performance. Data source options are to the rescue! Let's try the following: Python from pyspark.sql import SparkSession from pyspark.sql.functions import spark_partition_id, min, max, count # create Spark session spark = SparkSession.builder.appName("questdb_test").getOrCreate() # load 'trades' table into the dataframe df = spark.read.format("jdbc") \ .option("url", "jdbc:postgresql://localhost:8812/questdb") \ .option("driver", "org.postgresql.Driver") \ .option("user", "admin").option("password", "quest") \ .option("dbtable", "trades") \ .option("partitionColumn", "timestamp") \ .option("numPartitions", "3") \ .option("lowerBound", "2023-02-01T00:00:00.000000Z") \ .option("upperBound", "2023-02-04T00:00:00.000000Z") \ .load() df = df.withColumn("partitionId", spark_partition_id()) df.groupBy(df.partitionId) \ .agg(min(df.timestamp), max(df.timestamp), count(df.partitionId)) \ .show(truncate=False) Shell +-----------+--------------------------+--------------------------+------------------+ |partitionId|min(timestamp) |max(timestamp) |count(partitionId)| +-----------+--------------------------+--------------------------+------------------+ |0 |2023-02-01 00:00:00.078073|2023-02-01 23:59:59.664083|487309 | |1 |2023-02-02 00:00:00.188002|2023-02-02 23:59:59.838473|457478 | |2 |2023-02-03 00:00:00.565319|2023-02-03 23:59:59.801778|377783 | +-----------+--------------------------+--------------------------+------------------+ After specifying partitionColumn, numPartitions, lowerBound, and upperBound, the situation is much better: the row counts in the partitions match what we have seen in the QuestDB logs earlier: rowCount=487309, rowCount=457478 and rowCount=377783. Looks like we did it! Shell 2023-03-21T21:13:05.876615Z I i.q.c.TableReader open partition /Users/imre/Work/dbroot/db/trades~10/2023-02-01.2 [rowCount=487309, partitionNameTxn=2, transientRowCount=377783, partitionIndex=0, partitionCount=3] 2023-03-21T21:13:06.259472Z I i.q.c.TableReader open partition /Users/imre/Work/dbroot/db/trades~10/2023-02-02.1 [rowCount=457478, partitionNameTxn=1, transientRowCount=377783, partitionIndex=1, partitionCount=3] 2023-03-21T21:13:06.653769Z I i.q.c.TableReader open partition /Users/imre/Work/dbroot/db/trades~10/2023-02-03.0 [rowCount=377783, partitionNameTxn=0, transientRowCount=377783, partitionIndex=2, partitionCount=3] We can check Spark UI again; it also confirms that the job was completed in 3 separate tasks, each of them working on a single partition. Sometimes it might be tricky to know the minimum and maximum timestamps when creating the DataFrame. In the worst case, you could query the database for those values via an ordinary connection. We have managed to replicate our QuestDB partitions in Spark, but data does not always come from a single table. What if the data required is the result of a query? Can we load that, and is it possible to partition it? Options to Load Data: SQL Query vs. Table We can use the query option to load data from QuestDB with the help of a SQL query: Python # 1-minute aggregated trade data df = spark.read.format("jdbc") \ .option("url", "jdbc:postgresql://localhost:8812/questdb") \ .option("driver", "org.postgresql.Driver") \ .option("user", "admin").option("password", "quest") \ .option("query", "SELECT symbol, sum(amount) as volume, " "min(price) as minimum, max(price) as maximum, " "round((max(price)+min(price))/2, 2) as mid, " "timestamp as ts " "FROM trades WHERE symbol = 'BTC-USD' " "SAMPLE BY 1m ALIGN to CALENDAR") \ .load() Depending on the amount of data and the actual query, you might find that pushing the aggregations to QuestDB is faster than completing it in Spark. Spark definitely has an edge when the dataset is really large. Now let's try partitioning this DataFrame with the options used before with the option dbtable. Unfortunately, we will get an error message: Shell Options 'query' and 'partitionColumn' can not be specified together. However, we can trick Spark by just giving the query an alias name. This means we can go back to using the dbtable option again, which lets us specify partitioning. See the example below: Python from pyspark.sql import SparkSession from pyspark.sql.functions import spark_partition_id, min, max, count # create Spark session spark = SparkSession.builder.appName("questdb_test").getOrCreate() # load 1-minute aggregated trade data into the dataframe df = spark.read.format("jdbc") \ .option("url", "jdbc:postgresql://localhost:8812/questdb") \ .option("driver", "org.postgresql.Driver") \ .option("user", "admin").option("password", "quest") \ .option("dbtable", "(SELECT symbol, sum(amount) as volume, " "min(price) as minimum, max(price) as maximum, " "round((max(price)+min(price))/2, 2) as mid, " "timestamp as ts " "FROM trades WHERE symbol = 'BTC-USD' " "SAMPLE BY 1m ALIGN to CALENDAR) AS fake_table") \ .option("partitionColumn", "ts") \ .option("numPartitions", "3") \ .option("lowerBound", "2023-02-01T00:00:00.000000Z") \ .option("upperBound", "2023-02-04T00:00:00.000000Z") \ .load() df = df.withColumn("partitionId", spark_partition_id()) df.groupBy(df.partitionId) \ .agg(min(df.ts), max(df.ts), count(df.partitionId)) \ .show(truncate=False) Shell +-----------+-------------------+-------------------+------------------+ |partitionId|min(ts) |max(ts) |count(partitionId)| +-----------+-------------------+-------------------+------------------+ |0 |2023-02-01 00:00:00|2023-02-01 23:59:00|1440 | |1 |2023-02-02 00:00:00|2023-02-02 23:59:00|1440 | |2 |2023-02-03 00:00:00|2023-02-03 23:59:00|1440 | +-----------+-------------------+-------------------+------------------+ Looking good. Now it seems that we can load any data from QuestDB into Spark by passing a SQL query to the DataFrame. Do we, really? Our trades table is limited to three data types only. What about all the other types you can find in QuestDB? We expect that Spark will successfully map a double or a timestamp when queried from the database, but what about a geohash? It is not that obvious what is going to happen. As always, when unsure, we should test. Type Mappings I have another table in the database with a different schema. This table has a column for each type currently available in QuestDB. SQL CREATE TABLE all_types ( symbol SYMBOL, string STRING, char CHAR, long LONG, int INT, short SHORT, byte BYTE, double DOUBLE, float FLOAT, bool BOOLEAN, uuid UUID, --long128 LONG128, long256 LONG256, bin BINARY, g5c GEOHASH(5c), date DATE, timestamp TIMESTAMP ) timestamp (timestamp) PARTITION BY DAY; INSERT INTO all_types values('sym1', 'str1', 'a', 456, 345, 234, 123, 888.8, 11.1, true, '9f9b2131-d49f-4d1d-ab81-39815c50d341', --to_long128(10, 5), rnd_long256(), rnd_bin(10,20,2), rnd_geohash(35), to_date('2022-02-01', 'yyyy-MM-dd'), to_timestamp('2022-01-15T00:00:03.234', 'yyyy-MM-ddTHH:mm:ss.SSS')); long128 is not fully supported by QuestDB yet, so it is commented out. Let's try to load and print the data; we can also take a look at the schema of the DataFrame: Python from pyspark.sql import SparkSession # create Spark session spark = SparkSession.builder.appName("questdb_test").getOrCreate() # create dataframe df = spark.read.format("jdbc") \ .option("url", "jdbc:postgresql://localhost:8812/questdb") \ .option("driver", "org.postgresql.Driver") \ .option("user", "admin").option("password", "quest") \ .option("dbtable", "all_types") \ .load() # print the schema print(df.schema) # print the content of the dataframe df.show(truncate=False) Much to my surprise, Spark managed to create the DataFrame and mapped all types. Here is the schema: Shell StructType([ StructField('symbol', StringType(), True), StructField('string', StringType(), True), StructField('char', StringType(), True), StructField('long', LongType(), True), StructField('int', IntegerType(), True), StructField('short', ShortType(), True), StructField('byte', ShortType(), True), StructField('double', DoubleType(), True), StructField('float', FloatType(), True), StructField('bool', BooleanType(), True), StructField('uuid', StringType(), True), # StructField('long128', StringType(), True), StructField('long256', StringType(), True), StructField('bin', BinaryType(), True), StructField('g5c', StringType(), True), StructField('date', TimestampType(), True), StructField('timestamp', TimestampType(), True) ]) It looks pretty good, but you might wonder if it is a good idea to map long256 and geohash types to String. QuestDB does not provide arithmetics for these types, so it is not a big deal. Geohashes are basically 32-base numbers, represented and stored in their string format. The 256-bit long values are also treated as string literals. long256 is used to store cryptocurrency private keys. Now let's see the data: Shell +------+------+----+----+---+-----+----+------+-----+----+------------------------------------+ |symbol|string|char|long|int|short|byte|double|float|bool|uuid | +------+------+----+----+---+-----+----+------+-----+----+------------------------------------+ |sym1 |str1 |a |456 |345|234 |123 |888.8 |11.1 |true|9f9b2131-d49f-4d1d-ab81-39815c50d341| +------+------+----+----+---+-----+----+------+-----+----+------------------------------------+ +------------------------------------------------------------------+----------------------------------------------------+ |long256 |bin | +------------------------------------------------------------------+----------------------------------------------------+ |0x8ee3c2a42acced0bb0bdb411c82bb01e7e487689228c189d1e865fa0e025973c|[F2 4D 4B F6 18 C2 9A A7 87 C2 D3 19 4A 2C 4B 92 C4]| +------------------------------------------------------------------+----------------------------------------------------+ +-----+-------------------+-----------------------+ |g5c |date |timestamp | +-----+-------------------+-----------------------+ |q661k|2022-02-01 00:00:00|2022-01-15 00:00:03.234| +-----+-------------------+-----------------------+ It also looks good, but we could omit the 00:00:00 from the end of the date field. We can see that it is mapped to Timestamp and not Date. We could also try to map one of the numeric fields to Decimal. This can be useful if later we want to do computations that require high precision. We can use the customSchema option to customize the column types. Our modified code: Python from pyspark.sql import SparkSession # create Spark session spark = SparkSession.builder.appName("questdb_test").getOrCreate() # create dataframe df = spark.read.format("jdbc") \ .option("url", "jdbc:postgresql://localhost:8812/questdb") \ .option("driver", "org.postgresql.Driver") \ .option("user", "admin").option("password", "quest") \ .option("dbtable", "all_types") \ .option("customSchema", "date DATE, double DECIMAL(38, 10)") \ .load() # print the schema print(df.schema) # print the content of the dataframe df.show(truncate=False) The new schema: Shell StructType([ StructField('symbol', StringType(), True), StructField('string', StringType(), True), StructField('char', StringType(), True), StructField('long', LongType(), True), StructField('int', IntegerType(), True), StructField('short', ShortType(), True), StructField('byte', ShortType(), True), StructField('double', DecimalType(38,10), True), StructField('float', FloatType(), True), StructField('bool', BooleanType(), True), StructField('uuid', StringType(), True), # StructField('long128', StringType(), True), StructField('long256', StringType(), True), StructField('bin', BinaryType(), True), StructField('g5c', StringType(), True), StructField('date', DateType(), True), StructField('timestamp', TimestampType(), True) ]) And the data is displayed as: Shell +------+------+----+----+---+-----+----+--------------+-----+----+------------------------------------+ |symbol|string|char|long|int|short|byte|double |float|bool|uuid | +------+------+----+----+---+-----+----+--------------+-----+----+------------------------------------+ |sym1 |str1 |a |456 |345|234 |123 |888.8000000000|11.1 |true|9f9b2131-d49f-4d1d-ab81-39815c50d341| +------+------+----+----+---+-----+----+--------------+-----+----+------------------------------------+ +------------------------------------------------------------------+----------------------------------------------------+ |long256 |bin | +------------------------------------------------------------------+----------------------------------------------------+ |0x8ee3c2a42acced0bb0bdb411c82bb01e7e487689228c189d1e865fa0e025973c|[F2 4D 4B F6 18 C2 9A A7 87 C2 D3 19 4A 2C 4B 92 C4]| +------------------------------------------------------------------+----------------------------------------------------+ +-----+----------+-----------------------+ |g5c |date |timestamp | +-----+----------+-----------------------+ |q661k|2022-02-01|2022-01-15 00:00:03.234| +-----+----------+-----------------------+ It seems that Spark can handle almost all database types. The only issue is long128, but this type is a work in progress currently in QuestDB. When completed, it will be mapped as String, just like long256. Writing Data Back Into the Database There is only one thing left: writing data back into QuestDB. In this example, first, we will load some data from the database and add two new features: 10-minute moving average standard deviation, also calculated over the last 10-minute window Then we will try to save the modified DataFrame back into QuestDB as a new table. We need to take care of some type mappings as Double columns are sent as FLOAT8 to QuestDB by default, so we end up with this code: Python from pyspark.sql import SparkSession from pyspark.sql.window import Window from pyspark.sql.functions import avg, stddev, when # create Spark session spark = SparkSession.builder.appName("questdb_test").getOrCreate() # load 1-minute aggregated trade data into the dataframe df = spark.read.format("jdbc") \ .option("url", "jdbc:postgresql://localhost:8812/questdb") \ .option("driver", "org.postgresql.Driver") \ .option("user", "admin").option("password", "quest") \ .option("dbtable", "(SELECT symbol, sum(amount) as volume, " "round((max(price)+min(price))/2, 2) as mid, " "timestamp as ts " "FROM trades WHERE symbol = 'BTC-USD' " "SAMPLE BY 1m ALIGN to CALENDAR) AS fake_table") \ .option("partitionColumn", "ts") \ .option("numPartitions", "3") \ .option("lowerBound", "2023-02-01T00:00:00.000000Z") \ .option("upperBound", "2023-02-04T00:00:00.000000Z") \ .load() # add new features window_10 = Window.partitionBy(df.symbol).rowsBetween(-10, Window.currentRow) df = df.withColumn("ma10", avg(df.mid).over(window_10)) df = df.withColumn("std", stddev(df.mid).over(window_10)) df = df.withColumn("std", when(df.std.isNull(), 0.0).otherwise(df.std)) # save the data as 'trades_enriched' df.write.format("jdbc") \ .option("url", "jdbc:postgresql://localhost:8812/questdb") \ .option("driver", "org.postgresql.Driver") \ .option("user", "admin").option("password", "quest") \ .option("dbtable", "trades_enriched") \ .option("createTableColumnTypes", "volume DOUBLE, mid DOUBLE, ma10 DOUBLE, std DOUBLE") \ .save() All works but…we soon realize that our new table, trades_enriched is not partitioned and does not have a designated timestamp, which is not ideal. Obviously Spark has no idea of QuestDB specifics. It would work better if we created the table upfront and Spark only saved the data into it. We drop the table and re-create it; this time, it is partitioned and has a designated timestamp: SQL DROP TABLE trades_enriched; CREATE TABLE trades_enriched ( volume DOUBLE, mid DOUBLE, ts TIMESTAMP, ma10 DOUBLE, std DOUBLE ) timestamp (ts) PARTITION BY DAY; The table is empty and waiting for the data. We rerun the code; all works, no complaints. The data is in the table, and it is partitioned. One aspect of writing data into the database is if we are allowed to create duplicates. What if I try to rerun the code again without dropping the table? Will Spark let me save the data this time? No, we get an error: Shell pyspark.sql.utils.AnalysisException: Table or view 'trades_enriched' already exists. SaveMode: ErrorIfExists. The last part of the error message looks interesting: SaveMode: ErrorIfExists. What is SaveMode? It turns out we can configure what should happen if the table already exists. Our options are: errorifexists: the default behavior is to return an error if the table already exists, Spark is playing safe here append : data will be appended to the existing rows already present in the table overwrite: the content of the table will be replaced entirely by the newly saved data ignore: if the table is not empty, our save operation gets ignored without any error We have already seen how errorifexists behaves, append and ignore seem to be simple enough just to work. However, overwrite is not straightforward. The content of the table must be cleared before the new data can be saved. Spark will delete and re-create the table by default, which means losing partitioning and the designated timestamp. In general, we do not want Spark to create tables for us. Luckily, with the truncate option we can tell Spark to use TRUNCATE to clear the table instead of deleting it: Python # save the data as 'trades_enriched', overwrite if already exists df.write.format("jdbc") \ .option("url", "jdbc:postgresql://localhost:8812/questdb") \ .option("driver", "org.postgresql.Driver") \ .option("user", "admin").option("password", "quest") \ .option("dbtable", "trades_enriched") \ .option("truncate", True) \ .option("createTableColumnTypes", "volume DOUBLE, mid DOUBLE, ma10 DOUBLE, std DOUBLE") \ .save(mode="overwrite") The above works as expected. Conclusion Our ride might seem bumpy, but we finally have everything working. Our new motto should be "There is a config option for everything!". To summarize what we have found: We can use Spark's JDBC data source to integrate with QuestDB. It is recommended to use the dbtable option, even if we use a SQL query to load data. Always try to specify partitioning options (partitionColumn, numPartitions, lowerBound and upperBound) when loading data, partitions ideally should match with the QuestDB partitions. Sometimes it makes sense to cache some data in Spark to reduce the number of trips to the database. It can be beneficial to push work down into the database, depending on the task and how much data is involved. It makes sense to make use of QuestDB's time-series-specific features, such as SAMPLE BY, instead of trying to rewrite it in Spark. Type mappings can be customized via the customSchema option when loading data. When writing data into QuestDB always specify the desired saving mode. Generally works better if you create the table upfront and do not let Spark create it, because this way you can add partitioning and designated timestamp. If selected the overwrite saving mode, you should enable the truncate option too to make sure Spark does not delete the table; hence partitioning and the designated timestamp will not get lost. Type mappings can be customized via the createTableColumnTypes option when saving data. I mentioned only the config options which are most likely to be tweaked when working with QuestDB; the complete set of options can be found here: Spark data source options. What Could the Future Bring? Overall everything works, but it would be nice to have a much more seamless way of integration, where partitioning would be taken care of automagically. Some type mappings could use better defaults, too, when saving data into QuestDB. The overwrite saving mode could default to use TRUNCATE. More seamless integration is not impossible to achieve. If QuestDB provided its own JDBCDialect implementation for Spark, the above nuances could be handled. We should probably consider adding this. Finally, there is one more thing we did not mention yet, data locality. That is because, currently QuestDB cannot run as a cluster. However, we are actively working on a solution — check out The Inner Workings of Distributed Databases for more information. When the time comes, we should ensure that data locality is also considered. Ideally, each Spark node would work on tasks that require partitions loaded from the local (or closest) QuestDB instance. However, this is not something we should be concerned about at this moment... for now, just enjoy data crunching!
ActiveMQ and Kafka are both messaging systems used for real-time data processing and streaming. Both of these systems are open-source and offer different features that cater to specific use cases. While ActiveMQ JMS and Kafka are both used for message queuing and real-time data processing, there are significant differences between them. ActiveMQ JMS is a traditional message broker that supports multiple messaging protocols such as JMS, AMQP, and MQTT. It is designed to provide reliable message delivery and offers features such as message persistence, clustering, and transaction support. ActiveMQ JMS is commonly used in enterprise systems for mission-critical applications where reliability is of utmost importance. Sample Example for consuming message from rest API and Writing to AMQ queues: Java import org.apache.camel.CamelContext; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.impl.DefaultCamelContext; public class RestApiToActiveMq { public static void main(String[] args) throws Exception { CamelContext context = new DefaultCamelContext(); // define a route to consume messages from REST API and write them to ActiveMQ RouteBuilder builder = new RouteBuilder() { public void configure() { from("rest:get:/api/messages") .to("activemq:queue:myQueue"); } }; // add the route to the Camel context context.addRoutes(builder); // start the Camel context context.start(); // keep the program running to continue consuming messages Thread.sleep(Long.MAX_VALUE); // stop the Camel context context.stop(); } } Sample Example for consuming message AMQ queues and writing to snowflakes Data warehouse: Java import org.apache.camel.CamelContext; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.impl.DefaultCamelContext; public class AmqToSnowflake { public static void main(String[] args) throws Exception { CamelContext context = new DefaultCamelContext(); // define a route to consume messages from AMQ and write them to Snowflake RouteBuilder builder = new RouteBuilder() { public void configure() { from("activemq:queue:myQueue") .to("snowflake-jdbc:myDatabase?query=INSERT INTO myTable (message) VALUES (:?message)"); } }; // add the route to the Camel context context.addRoutes(builder); // start the Camel context context.start(); // keep the program running to continue consuming messages Thread.sleep(Long.MAX_VALUE); // stop the Camel context context.stop(); } } On the other hand, Kafka is a distributed streaming platform designed for handling large-scale data streaming. It is optimized for horizontal scalability, fault tolerance, and high throughput, making it an excellent choice for big data applications. In addition, Kafka offers features such as real-time data streaming, high-performance messaging, and distributed data storage. Sample Example for consuming message from rest API and Writing to Kafak Topic: Java import org.apache.camel.CamelContext; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.impl.DefaultCamelContext; public class RestApiToKafka { public static void main(String[] args) throws Exception { CamelContext context = new DefaultCamelContext(); // define a route to consume messages from REST API and write them to Kafka RouteBuilder builder = new RouteBuilder() { public void configure() { from("rest:get:/api/messages") .to("kafka:myTopic?brokers=localhost:9092"); } }; // add the route to the Camel context context.addRoutes(builder); // start the Camel context context.start(); // keep the program running to continue consuming messages Thread.sleep(Long.MAX_VALUE); // stop the Camel context context.stop(); } } Sample Example for consuming message Kafka Topic and writing to snowflakes Data warehouse: Java import org.apache.camel.CamelContext; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.impl.DefaultCamelContext; public class KafkaToSnowflake { public static void main(String[] args) throws Exception { CamelContext context = new DefaultCamelContext(); // define a route to consume messages from Kafka and write them to Snowflake RouteBuilder builder = new RouteBuilder() { public void configure() { from("kafka:myTopic?brokers=localhost:9092") .to("snowflake-jdbc:myDatabase?query=INSERT INTO myTable (message) VALUES (:?message)"); } }; // add the route to the Camel context context.addRoutes(builder); // start the Camel context context.start(); // keep the program running to continue consuming messages Thread.sleep(Long.MAX_VALUE); // stop the Camel context context.stop(); } } One of the key differences between ActiveMQ JMS and Kafka is their architecture. ActiveMQ JMS is a traditional messaging system that is based on a hub-and-spoke model, where the message broker acts as a centralized hub for all message exchanges. On the other hand, Kafka is designed as a distributed system that uses a publish-subscribe model, where messages are published to a topic, and subscribers consume the messages from that topic. Kafka's distributed architecture provides fault tolerance and high availability, making it an ideal choice for mission-critical applications. Another difference between ActiveMQ JMS and Kafka is their performance. ActiveMQ JMS is designed to provide reliable message delivery with features such as message persistence and transaction support. While this provides a high level of reliability, it can also impact performance. In contrast, Kafka's architecture is designed for high throughput and low latency, making it an excellent choice for real-time data processing and analysis. In terms of use cases, ActiveMQ JMS is an excellent choice for traditional messaging applications where reliability and message ordering are more important than speed. It is commonly used in enterprise systems for mission-critical applications where reliability is of utmost importance. On the other hand, Kafka is an excellent choice for real-time data processing and analysis. It is commonly used for big data applications where high throughput and low latency are critical. In conclusion, both ActiveMQ JMS and Kafka are excellent messaging systems that offer different features for different use cases. ActiveMQ JMS is an excellent choice for traditional messaging applications where reliability is of utmost importance, while Kafka is an excellent choice for real-time data processing and analysis. Therefore, it is important to consider the specific requirements of your application when choosing between ActiveMQ JMS and Kafka.
Apache Kafka is an event streaming platform that was developed by LinkedIn and later made open-source under the Apache Software Foundation. Its primary function is to handle high-volume real-time data streams and provide a scalable and fault-tolerant architecture for creating data pipelines, streaming applications, and microservices. Kafka employs a publish-subscribe messaging model, in which data is sorted into topics, and publishers send messages to those topics. Subscribers can then receive those messages in real time. The platform offers a scalable and fault-tolerant architecture by spreading data across multiple nodes and replicating data across multiple brokers. This guarantees that data is consistently available, even if a node fails. Kafka's architecture is based on several essential components, including brokers, producers, consumers, and topics. Brokers manage the message queues and handle message persistence, while producers and consumers are responsible for publishing and subscribing to Kafka topics, respectively. Topics function as the communication channels through which messages are sent and received. Kafka also provides an extensive range of APIs and tools to manage data streams and build real-time applications. Kafka Connect, one of its most popular tools and APIs, enables the creation of data pipelines that integrate with other systems. Kafka Streams, on the other hand, allows developers to build streaming applications using a high-level API. In summary, Kafka is a robust and adaptable platform that can be used to construct real-time data pipelines and streaming applications. It has been widely adopted in various sectors, including finance, healthcare, e-commerce, and more. To create a Kafka data stream using Camel, you can use the Camel-Kafka component, which is already included in Apache Camel. Below are the steps to follow for creating a Kafka data stream using Camel: Prepare a Kafka broker and create a topic for the data stream. Set up a new Camel project on your IDE and include the required Camel dependencies, including the Camel-Kafka component. Create a new Camel route within your project that defines the data stream. The route should use the Kafka component and specify the topic to which the data should be sent or received. Select the appropriate data format for the data stream. For instance, if you want to send JSON data, use the Jackson data format to serialize and deserialize the data. Launch the Camel context and the Kafka producer or consumer to start sending or receiving data. Overall, using the Camel-Kafka component with Apache Camel is a simple way to create data streams between applications and a Kafka cluster. Here is the code for reading Table form DB and writing to Kafka cluster: Apache Camel Producer Application: Java import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.kafka.KafkaConstants; import org.springframework.stereotype.Component; @Component public class OracleDBToKafkaRouteBuilder extends RouteBuilder { @Override public void configure() throws Exception { // Configure Oracle DB endpoint String oracleDBEndpoint = "jdbc:oracle:thin:@localhost:1521:orcl"; String oracleDBUser = "username"; String oracleDBPassword = "password"; String oracleDBTable = "mytable"; String selectQuery = "SELECT * FROM " + oracleDBTable; // Configure Kafka endpoint String kafkaEndpoint = "kafka:my-topic?brokers=localhost:9092"; String kafkaSerializer = "org.apache.kafka.common.serialization.StringSerializer"; from("timer:oracleDBPoller?period=5000") // Read from Oracle DB .to("jdbc:" + oracleDBEndpoint + "?user=" + oracleDBUser + "&password=" + oracleDBPassword) .setBody(simple(selectQuery)) .split(body()) // Serialize to Kafka .setHeader(KafkaConstants.KEY, simple("${body.id}")) .marshal().string(kafkaSerializer) .to(kafkaEndpoint); } } Here is the code for reading Kafka Topic and writing the Oracle DB table: Apache Camel Camel Application; Java import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.kafka.KafkaConstants; import org.springframework.stereotype.Component; @Component public class KafkaToOracleDBRouteBuilder extends RouteBuilder { @Override public void configure() throws Exception { // Configure Kafka endpoint String kafkaEndpoint = "kafka:my-topic?brokers=localhost:9092"; String kafkaDeserializer = "org.apache.kafka.common.serialization.StringDeserializer"; // Configure Oracle DB endpoint String oracleDBEndpoint = "jdbc:oracle:thin:@localhost:1521:orcl"; String oracleDBUser = "username"; String oracleDBPassword = "password"; String oracleDBTable = "mytable"; from(kafkaEndpoint) // Deserialize from Kafka .unmarshal().string(kafkaDeserializer) .split(body().tokenize("\n")) // Write to Oracle DB .to("jdbc:" + oracleDBEndpoint + "?user=" + oracleDBUser + "&password=" + oracleDBPassword) .setBody(simple("INSERT INTO " + oracleDBTable + " VALUES(${body})")) .to("jdbc:" + oracleDBEndpoint + "?user=" + oracleDBUser + "&password=" + oracleDBPassword); } }
Let’s start with the basics. After successfully starting a Redpanda or Apache Kafka® cluster, you want to stream data into it right away. No matter what tool and language you chose, you will immediately be asked for a list of bootstrap servers for your client to connect to it. This bootstrap server is just for your client to initiate the connection to one of the brokers in the cluster, and then it will provide your client with initial sets of metadata. The metadata tells the client the currently available brokers, and which the leader of each partition is hosted by which brokers so that the client can initiate a direct connection to all brokers individually. The diagram below will give you a better idea. The client figures out where to stream the data based on the info given. Depending on the number of partitions and where they are hosted, your client will push or pull from the partitions to/from its host brokers. Both the Kafka address and the advertised Kafka address are needed. Kafka Address is used for Kafka brokers to locate each other, and the advertised address for the client to find them. In this post, we’ll help you understand the advertised Kafka address, how to use it in Docker and Kubernetes (K8s), and how to debug it. When To Use Kafka Address and Advertised Kafka Address When starting up your Redpanda cluster, Kafka Address is used to bind the Redpanda service to its host and use the established endpoint to start accepting requests. {LISTENER_NAME}://{HOST_NAME}:{PORT} The broker uses the advertised Kafka address in the metadata, so your client will take the address to locate other brokers. To set it, use --kafka-addr and --advertise-kafka-addr with RPK or kafka_api or advertised_kafka_api inside /etc/redpanda/redpanda.yaml for each broker. Until this point, everything seems straightforward. And, you might start to wonder whether the Kafka address and advertised Kafka address are actually redundant. It starts to get tricky when your client has no visibility into the cluster host, and if you pass the same internal address to the client, it won’t be able to resolve it. So, we need to modify the advertised Kafka address to let the Kafka client understand and be reachable from outside (i.e., external IP). How To Use Kafka Address and Advertised Kafka Address in Docker (Container) Another problem that often comes up is while running Redpanda brokers in Docker (container). The same applies to other more complex network topologies. But fear not, you already know the mechanics. All you need to do is put the right address for clients that reside in different places. When running the Docker container, it creates its own network by default, but in the case where you need to have multiple of them communicating, you will need to set up a network (sometimes even multiple layers of network) by bridging them together. We know that the Kafka address is used for binding to the host, we’ll just use 0.0.0.0 as it will bind to all interfaces in the host and any port of your wish (do not use an already occupied port). An example could be 0.0.0.0:9092 and 0.0.0.0:9095 for each broker running in the Docker container, you will register a name in the network, if your client is trying to access the broker within the network, all you need to do is set the advertised Kafka address to it’s registered hostname in the network. For example, if your first Redpanda container registered its name as Building-A, you can set the advertised Kafka address to Building-A:9092. For clients outside of the Docker network, where they don’t have access to the network’s routing table, the advertised Kafka address will need to be set to the host where the Docker containers are running on, so the client can find it. And don’t forget that you also need to expose the port and associate that with the host. But, what happens if you have clients who both want to access the cluster at the same time? Simple, add multiple listeners! Each listener will return a set of advertised Kafka addresses for clients in a different environment. Here’s a diagram for you. Using Kafka Address and Advertised Kafka Address in K8s Since Kubernetes is a platform that orchestrates containers, the concept is very similar to running Docker containers, but on a larger scale. In a typical Redpanda cluster, you will want to install a single Redpanda broker in an individual worker node. All the pods running in the K8s would get assigned an internal address, that is only visible inside the Kubernetes environment, if the client is running outside of Kubernetes, it will need a way to find the brokers. So you can use NodePort to expose the port and use the public IP address of the hosting worker node. For the Kafka address, as usual, just bind it to the local container. For example, 0.0.0.0:9092 and 0.0.0.0:9095. As for the advertised Kafka address, we will need to set two listeners: one for internal connection, and one for external. For internal clients, we can simply use the generated internal service name, for example, if your service name is set to Building-A, the advertised Kafka address will be internal://Building-A:9092.For the external listener use the hosting worker node’s public IP (or domain name ) with the port exposed in NodePort, where you will be assigned a new port. For example, if your first work node has public IP (Domain) as XXX-Blvd, and the new port assigned is 39092, you can set the advertised Kafka address to external://XXX-Blvd:39092 . How to Debug the Advertised Kafka Address When you are able to connect to your cluster with Redpanda Keep (rpk) and your client throws errors like “ENOTFOUND”. Check if the advertised_kafka_api is correctly set, with an address that can be resolved by your client. Shell > curl localhost:9644/v1/node_config {"advertised_kafka_api":[{"name":"internal","address":"0.0.0.0","port":9092},{"name":"external","address":"192.186.0.3","port":19092}]....} If you are running Docker, find out which port 9644 was exposed from Docker. Shell > docker port redpanda-0 8081/tcp -> 0.0.0.0:18081 9644/tcp -> 0.0.0.0:19644 18082/tcp -> 0.0.0.0:18082 19092/tcp -> 0.0.0.0:19092 And cURL. Shell > curl localhost:19644/v1/node_config {"advertised_kafka_api":[{"name":"internal","address":"redpanda-0","port":9092},{"name":"external","address":"localhost","port":19092}]....} If you are running Kubernetes, find out what is the exposed admin port. Shell > kubectl get svc redpanda-external -n redpanda NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) redpanda-external NodePort 10.100.87.34 <none> 9644:31644/TCP,9094:31092/TCP,8083:30082/TCP,8084:30081/TCP 3h53m Shell > kubectl get pod -o=custom-columns=NAME:.metadata.name,STATUS:.status.phase,NODE:.spec.nodeName -n redpanda NAME NODE redpanda-0 ip-1-168-57-208.xx.compute.internal redpanda-1 ip-1-168-1-231.xx.compute.internal redpanda-2 ip-1-168-83-90.xx.compute.internal Shell >kubectl get nodes -o=custom-columns='NAME:.metadata.name,IP:.status.addresses[?(@.type=="ExternalIP")].address' NAME IP ip-1.us-east-2.compute.internal 3.12.84.230 ip-1.us-east-2.compute.internal 3.144.255.61 ip-1.us-east-2.compute.internal 3.144.144.138 And cURL. Shell > curl 3.12.84.230:31644/v1/node_config {"advertised_kafka_api":[{"name":"internal","address":"redpanda-1.redpanda.redpanda.svc.cluster.local.","port":9093},{"name":"default","address":"3.12.84.230","port":31092}].....} Lastly, check if you are connecting to the correct listener(Port). And you’re all done! Conclusion If you made it this far, you should now have a better understanding of what the advertised Kafka address is and how you can use it in Docker and K8s. To learn more about Redpanda, check out our documentation and browse the Redpanda blog for tutorials and guides on how to easily integrate with Redpanda. For a more hands-on approach, take Redpanda's free Community edition for a test drive!
Data streaming is one of the most relevant buzzwords in tech to build scalable real-time applications in the cloud and innovative business models. Do you wonder about my predicted TOP 5 data streaming trends in 2023 to set data in motion? Check out the following presentation and learn what role Apache Kafka plays. Learn about decentralized data mesh, cloud-native lakehouse, data sharing, improved user experience, and advanced data governance. Some followers might notice that this became a series with past posts about the top 5 data streaming trends for 2021 and the top 5 for 2022. Data streaming with Apache Kafka is a journey and evolution to set data in motion. Trends change over time, but the core value of a scalable real-time infrastructure as the central data hub stays. Gartner Top Strategic Technology Trends for 2023 The research and consulting company Gartner defines the top strategic technology trends every year. This time, the trends are more focused on particular niche concepts. On a higher level, it is all about optimizing, scaling, and pioneering. Here is what Gartner expects for 2023: Source Gartner It is funny (but not surprising): Gartner’s predictions overlap and complement the five trends I focus on for data streaming with Apache Kafka looking forward to 2023. I explore how data streaming enables better time to market with decentralized optimized architectures, cloud-native infrastructure for elastic scale, and pioneering innovative use cases to build valuable data products. Hence, here you go with the top 5 trends in data streaming for 2023. The Top 5 Data Streaming Trends for 2023 I see the following topics coming up more regularly in conversations with customers, prospects, and the broader Kafka community across the globe: Cloud-native lakehouses Decentralized data mesh Data sharing in real-time Improved developer and user experience Advanced data governance and policy enforcement The following sections describe each trend in more detail. The end of the article contains the complete slide deck. The trends are relevant for various scenarios. No matter if you use open source Apache Kafka, a commercial platform, or a fully-managed cloud service like Confluent Cloud. Kafka as Data Fabric for Cloud-Native Lakehouses Many data platform vendors pitch the lakehouse vision today. That's the same story as the data lake in the Hadoop era with few new nuances. Put all your data into a single data store to save the world and solve every problem and use case: In the last ten years, most enterprises realized this strategy did not work. The data lake is great for reporting and batch analytics, but not the right choice for every problem. Besides technical challenges, new challenges emerged: data governance, compliance issues, data privacy, and so on. Applying a best-of-breed enterprise architecture for real-time and batch data analytics using the right tool for each job is a much more successful, flexible, and future-ready approach: Data platforms like Databricks, Snowflake, Elastic, MongoDB, BigQuery, etc., have their sweet spots and trade-offs. Data streaming increasingly becomes the real-time data fabric between all the different data platforms and other business applications leveraging the real-time Kappa architecture instead of the much more batch-focused Lamba architecture. Decentralized Data Mesh With Valuable Data Products Focusing on business value by building data products in independent domains with various technologies is key to success in today's agile world with ever-changing requirements and challenges. Data mesh came to the rescue and emerged as a next-generation design pattern, succeeding service-oriented architectures and microservices. Two main proposals exist by vendors for building a data mesh: Data integration with data streaming enables fully decentralized business products. On the other side, data virtualization provides centralized queries: Centralized queries are simple but do not provide a clean architecture and decoupled domains and applications. It might work well to solve a single problem in a project. However, I highly recommend building a decentralized data mesh with data streaming to decouple the applications, especially for strategic enterprise architectures. Collaboration Within and Across Organizations in Real Time Collaborating within and outside the organization with data sharing using Open APIs, streaming data exchange, and cluster linking enable many innovative business models: The difference between data streaming to a database, data warehouse, or data lake is crucial: All these platforms enable data sharing at rest. The data is stored on a disk before it is replicated and shared within the organization or with partners. This is not real time. You cannot connect a real-time consumer to data at rest. However, real-time data beats slow data. Hence, sharing data in real time with data streaming platforms like Apache Kafka or Confluent Cloud enables accurate data as soon as a change happens. A consumer can be real-time, near real-time, or batch. A streaming data exchange puts data in motion within the organization or for B2B data sharing and Open API business models. AsyncAPI Spec for Apache Kafka API Schemas AsyncAPI allows developers to define the interfaces of asynchronous APIs. It is protocol agnostic. Features include: Specification of OpenAPI contracts (= schemas in the data streaming world) Documentation of APIs Code generation for many programming languages Data governance And much more... Confluent Cloud recently added a feature for generating an AsyncAPI specification for Apache Kafka clusters. We don't know yet where the market is going. Will AsynchAPI become the standard for OpenAPI in data streaming? Maybe. I see increasing demand for this specification by customers. Let's review the status of AsynchAPI in a few quarters or years. But it has the potential. Improved Developer Experience With Low-Code/No-Code Tools for Apache Kafka Many analysts and vendors pitch low code/no code tools. Visual coding is nothing new. Very sophisticated, powerful, and easy-to-use solutions exist as IDE or cloud applications. The significant benefit is time-to-market for developing applications and easier maintenance. At least in theory. These tools support various personas like developers, citizen integrators, and data scientists. At least in theory. The reality is that: Code is king Development is about evolution Open platforms win Low code/no code is great for some scenarios and personas. But it is just one option of many. Let's look at a few alternatives for building Kafka-native applications: These Kafka-native technologies have their trade-offs. For instance, the Confluent Stream Designer is perfect for building streaming ETL pipelines between various data sources and sinks. Just click the pipeline and transformations together. Then deploy the data pipeline into a scalable, reliable, and fully-managed streaming application. The difference to separate tools like Apache Nifi is that the generated code run in the same streaming platform, i.e., one infrastructure end-to-end. This makes ensuring SLAs and latency requirements much more manageable and the whole data pipeline more cost-efficient. However, the simpler a tool is, the less flexible it is. It is that easy. No matter which product or vendor you look at. This is not just true for Kafka-native tools. And you are flexible with your tool choice per project or business problem. Add your favorite non-Kafka stream processing engine to the stack, for instance, Apache Flink. Or use a separate iPaaS middleware like Dell Boomi or SnapLogic. Domain-Driven Design With Dumb Pipes and Smart Endpoints The real benefit of data streaming is the freedom of choice for your favorite Kafka-native technology, open-source stream processing framework, or cloud-native iPaaS middleware. Choose the proper library, tool, or SaaS for your project. Data streaming enables a decoupled domain-driven design with dumb pipes and smart endpoints: Data streaming with Apache Kafka is perfect for domain-driven design (DDD). On the contrary, often used point-to-point microservice architecture HTTP/REST web service or push-based message brokers like RabbitMQ create much stronger dependencies between applications. Data Governance Across the Data Streaming Pipeline An enterprise architecture powered by data streaming enables easy access to data in real-time. Many enterprises leverage Apache Kafka as the central nervous system between all data sources and sinks. The consequence of being able to access all data easily across business domains is two conflicting pressures on organizations: Unlock the data to enable innovation versus Lock up the data to keep it safe. Achieving data governance across the end-to-end data streams with data lineage, event tracing, policy enforcement, and time travel to analyze historical events is critical for strategic data streaming in the enterprise architecture. Data governance on top of the streaming platform is required for end-to-end visibility, compliance, and security: Policy Enforcement With Schemas and API Contracts The foundation for data governance is the management of API contracts (so-called schemas in data streaming platforms like Apache Kafka). Solutions like Confluent enforce schemas along the data pipeline, including data producer, server, and consumer: Additional data governance tools like data lineage, catalog, or police enforcement are built on this foundation. The recommendation for any serious data streaming project is to use schema from the beginning. It is unnecessary for the first pipeline. But the following producers and consumers need a trusted environment with enforced policies to establish a decentralized data mesh architecture with independent but connected data products. Slides and Video for Data Streaming Use Cases in 2023 Here is the slide deck from my presentation: Fullscreen Mode And here is the free on-demand video recording. Data Streaming Goes Up in the Maturity Curve in 2023 It is still an early stage for data streaming in most enterprises. But the discussion goes beyond questions like "when to use Kafka?" or "which cloud service to use?"... In 2023, most enterprises look at more sophisticated challenges around their numerous data streaming projects. The new trends are often related to each other. A data mesh enables the building of independent data products that focus on business value. Data sharing is a fundamental requirement for a data mesh. New personas access the data stream. Often, citizen developers or data scientists need easy tools to pioneer new projects. The enterprise architecture requires and enforces data governance across the pipeline for security, compliance, and privacy reasons. Scalability and elasticity need to be there out of the box. Fully-managed data streaming is a brilliant opportunity for getting started in 2023 and moving up in the maturity curve from single projects to a central nervous system of real-time data. What are your most relevant and exciting trends for data streaming and Apache Kafka in 2023 to set data in motion? What are your strategy and timeline?
Previous Articles on CockroachDB CDC Using CockroachDB CDC with Azure Event Hubs Using CockroachDB CDC with Confluent Cloud Kafka and Schema Registry SaaS Galore: Integrating CockroachDB with Confluent Kafka, Fivetran, and Snowflake CockroachDB CDC using Minio as a cloud storage sink CockroachDB CDC using Hadoop Ozone S3 Gateway as a cloud storage sink Motivation Apache Pulsar is a cloud-native distributed messaging and streaming platform. In my customer conversations, it most often comes up when compared to Apache Kafka. I have a customer needing a Pulsar sink support as they rely on Pulsar's multi-region capabilities. CockroachDB does not have a native Pulsar sink; however, the Pulsar project supports Kafka on Pulsar protocol support, and that's the core of today's article. This tutorial assumes you have an enterprise license, you can also leverage our managed offerings where enterprise changefeeds are enabled by default. I am going to demonstrate the steps using a Docker environment instead. High-Level Steps Deploy Apache Pulsar Deploy a CockroachDB cluster with enterprise changefeeds Deploy a Kafka Consumer Verify Conclusion Step-By-Step Instructions Deploy Apache Pulsar Since I'm using Docker, I'm relying on the KoP Docker Compose environment provided by the Stream Native platform, which spearheads the development of Apache Pulsar. I've used the service taken from the KoP example almost as-is aside from a few differences: pulsar: container_name: pulsar hostname: pulsar image: streamnative/sn-pulsar:2.11.0.5 command: > bash -c "bin/apply-config-from-env.py conf/standalone.conf && exec bin/pulsar standalone -nss -nfw" # disable stream storage and functions worker environment: allowAutoTopicCreationType: partitioned brokerDeleteInactiveTopicsEnabled: "false" PULSAR_PREFIX_messagingProtocols: kafka PULSAR_PREFIX_kafkaListeners: PLAINTEXT://pulsar:9092 PULSAR_PREFIX_brokerEntryMetadataInterceptors: org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor PULSAR_PREFIX_webServicePort: "8088" ports: - 6650:6650 - 8088:8088 - 9092:9092 I removed PULSAR_PREFIX_kafkaAdvertisedListeners: PLAINTEXT://127.0.0.1:19092 as I don't need it, I also changed the exposed port for - 19092:9092 to - 9092:9092. My PULSAR_PREFIX_kafkaListeners address points to a Docker container with the hostname pulsar. I will need to access the address from other containers and I can't rely on the localhost. I'm also using a more recent version of the image than the one in their docs. Deploy a CockroachDB Cluster With Enterprise Changefeeds I am using a 3-node cluster in Docker. If you've followed my previous articles, you should be familiar with it. I am using Flyway to set up the schema and seed the tables. The actual schema and data are taken from the changefeed examples we have in our docs. The only difference is I'm using a database called example. To enable CDC we need to execute the following commands: SET CLUSTER SETTING cluster.organization = '<organization name>'; SET CLUSTER SETTING enterprise.license = '<secret>'; SET CLUSTER SETTING kv.rangefeed.enabled = true; Again, if you don't have an enterprise license, you won't be able to complete this tutorial. Feel free to use our Dedicated or Serverless instances if you want to follow along. Finally, after the tables and the data are in place, we can create a changefeed on these tables. CREATE CHANGEFEED FOR TABLE office_dogs, employees INTO 'kafka://pulsar:9092'; Here I am using the Kafka port and the address of the Pulsar cluster, in my case pulsar. job_id ---------------------- 855538618543276035 (1 row) NOTICE: changefeed will emit to topic office_dogs NOTICE: changefeed will emit to topic employees Time: 50ms total (execution 49ms / network 1ms) Everything seems to work and changefeed does not error out. Deploy a Kafka Consumer To validate data is being written to Pulsar, we need to stand up a Kafka client. I've created an image that downloads and installs Kafka. Once the entire Docker Compose environment is running, we can access the client and run the console consumer to verify. /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server pulsar:9092 --topic office_dogs --from-beginning {"after": {"id": 1, "name": "Petee H"} {"after": {"id": 2, "name": "Carl"} If we want to validate that new data is flowing, let's insert another record into CockroachDB: INSERT INTO office_dogs VALUES (3, 'Test'); The consumer will print a new row: {"after": {"id": 3, "name": "Test"} Since we've created two topics, let's now look at the employees topic. /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server pulsar:9092 --topic employees --from-beginning {"after": {"dog_id": 1, "employee_name": "Lauren", "rowid": 855539880336523267} {"after": {"dog_id": 2, "employee_name": "Spencer", "rowid": 855539880336654339} Similarly, let's update a record and see the changes propagate to Pulsar. UPDATE employees SET employee_name = 'Spencer Kimball' WHERE dog_id = 2; {"after": {"dog_id": 2, "employee_name": "Spencer Kimball", "rowid": 855539880336654339} Verify We've confirmed we can produce messages to Pulsar topics using the Kafka protocol via KoP. We've also confirmed we can consume using the Kafka console consumer. We can also use the native Pulsar tooling to confirm the data is consumable from Pulsar. I installed the Pulsar Python client, pip install pulsar-client, on the Kafka client machine and created a Python script with the following code: import pulsar client = pulsar.Client('pulsar://pulsar:6650') consumer = client.subscribe('employees', subscription_name='my-sub') while True: msg = consumer.receive() print("Received message: '%s'" % msg.data()) consumer.acknowledge(msg) client.close() I execute the script: root@kafka-client:/opt/kafka# python3 consume_messages.py 2023-04-11 14:17:21.761 INFO [281473255101472] Client:87 | Subscribing on Topic :employees 2023-04-11 14:17:21.762 INFO [281473255101472] ClientConnection:190 | [<none> -> pulsar://pulsar:6650] Create ClientConnection, timeout=10000 2023-04-11 14:17:21.762 INFO [281473255101472] ConnectionPool:97 | Created connection for pulsar://pulsar:6650 2023-04-11 14:17:21.763 INFO [281473230237984] ClientConnection:388 | [172.28.0.3:33826 -> 172.28.0.6:6650] Connected to broker 2023-04-11 14:17:21.771 INFO [281473230237984] HandlerBase:72 | [persistent://public/default/employees-partition-0, my-sub, 0] Getting connection from pool 2023-04-11 14:17:21.776 INFO [281473230237984] ClientConnection:190 | [<none> -> pulsar://pulsar:6650] Create ClientConnection, timeout=10000 2023-04-11 14:17:21.776 INFO [281473230237984] ConnectionPool:97 | Created connection for pulsar://localhost:6650 2023-04-11 14:17:21.776 INFO [281473230237984] ClientConnection:390 | [172.28.0.3:33832 -> 172.28.0.6:6650] Connected to broker through proxy. Logical broker: pulsar://localhost:6650 2023-04-11 14:17:21.786 INFO [281473230237984] ConsumerImpl:238 | [persistent://public/default/employees-partition-0, my-sub, 0] Created consumer on broker [172.28.0.3:33832 -> 172.28.0.6:6650] 2023-04-11 14:17:21.786 INFO [281473230237984] MultiTopicsConsumerImpl:274 | Successfully Subscribed to a single partition of topic in TopicsConsumer. Partitions need to create : 0 2023-04-11 14:17:21.786 INFO [281473230237984] MultiTopicsConsumerImpl:137 | Successfully Subscribed to Topics Let's insert a record into the employees tables: INSERT INTO employees (dog_id, employee_name) VALUES (3, 'Test'); UPDATE employees SET employee_name = 'Artem' WHERE dog_id = 3; The Pulsar client output is as follows: Received message: 'b'{"after": {"dog_id": 3, "employee_name": "Test", "rowid": 855745376561364994}'' Received message: 'b'{"after": {"dog_id": 3, "employee_name": "Artem", "rowid": 855745376561364994}'' Conclusion This is how you can leverage existing CockroachDB capability with non-standard services like Apache Pulsar. Hopefully, you've found this article useful and can start leveraging the existing Kafka sink with non-standard message brokers.
Apache Kafka is a battle-tested distributed stream-processing platform popular in the financial industry to handle mission-critical transactional workloads. Kafka’s ability to handle large volumes of real-time market data makes it a core infrastructure component for trading, risk management, and fraud detection. Financial institutions use Kafka to stream data from market data feeds, transaction data, and other external sources to drive decisions. A common data pipeline to ingest and store financial data involves publishing real-time data to Kafka and utilizing Kafka Connect to stream that to databases. For example, the market data team may continuously update real-time quotes for a security to Kafka, and the trading team may consume that data to make buy/sell orders. Processed market data and orders may then be saved to a time series database for further analysis. In this article, we’ll create a sample data pipeline to illustrate how this could work in practice. We will poll an external data source (FinnHub) for real-time quotes of stocks and ETFs, and publish that information to Kafka. Kafka Connect will then grab those records and publish them to a time series database (QuestDB) for analysis. Prerequisites Git Docker Engine: 20.10+ Golang 1.19+ FinnHub API Token Setup To run the example locally, first clone the repo. The codebase is organized into three parts: Golang code is located at the root of the repo. Dockerfile for the Kafka Connect QuestDB image and the Docker Compose YAML file is under docker. JSON files for Kafka Connect sinks are under kafka-connect-sinks. Building the Kafka Connect QuestDB Image We first need to build the Kafka Connect docker image with QuestDB Sink connector. Navigate to the docker directory and run docker-compose build. The Dockerfile is simply installing the Kafka QuestDB Connector via Confluent Hub on top of the Confluent Kafka Connect base image: FROM confluentinc/cp-kafka-connect-base:7.3.2 RUN confluent-hub install --no-prompt questdb/kafka-questdb-connector:0.6 Start Kafka, Kafka Connect, and QuestDB Next, we will set up the infrastructure via Docker Compose. From the same docker directory, run Docker Compose in the background: docker-compose up -d This will start Kafka + Zookeeper, our custom Kafka Connect image with the QuestDB Connector installed, as well as QuestDB. The full content of the Docker Compose file is as follows: --- version: '2' services: zookeeper: image: confluentinc/cp-zookeeper:7.3.2 hostname: zookeeper container_name: zookeeper ports: - "2181:2181" environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 broker: image: confluentinc/cp-server:7.3.2 hostname: broker container_name: broker depends_on: - zookeeper ports: - "9092:9092" - "9101:9101" environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092 KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1 KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 KAFKA_JMX_PORT: 9101 KAFKA_JMX_HOSTNAME: localhost CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092 CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1 CONFLUENT_METRICS_ENABLE: 'true' CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous' kafka-connect: image: cp-kafka-connect-questdb build: context: . hostname: connect container_name: connect depends_on: - broker - zookeeper ports: - "8083:8083" environment: CONNECT_BOOTSTRAP_SERVERS: 'broker:29092' CONNECT_REST_ADVERTISED_HOST_NAME: connect CONNECT_REST_PORT: 8083 CONNECT_GROUP_ID: compose-connect-group CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1 CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000 CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1 CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1 CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter questdb: image: questdb/questdb hostname: questdb container_name: questdb ports: - "9000:9000" - "9009:9009" Start the QuestDB Kafka Connect Sink Wait for the Docker containers to be healthy (the kafka-connect image will log “Finished starting connectors and tasks” message), and we can create our Kafka Connect sinks. We will create two sinks: one for Tesla and one for SPY (SPDR S&P 500 ETF) to compare price trends of a volatile stock and the overall market. Issue the following curl command to create the Tesla sink within the kafka-connect-sinks directory: curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" --data @questdb-sink-TSLA.json http://localhost:8083/connectors The JSON file it posts contains the following configurations. { "name": "questdb-sink-SPY", "config": { "connector.class":"io.questdb.kafka.QuestDBSinkConnector", "tasks.max":"1", "topics": "topic_SPY", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": "false", "value.converter.schemas.enable": "false", "host": "questdb", "timestamp.field.name": "timestamp" } } Create the sink for SPY as well: curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" --data @questdb-sink-SPY.json http://localhost:8083/connectors Streaming Real Time Stock Quotes Now that we have our data pipeline set up, we are ready to stream real-time stock quotes to Kafka and store them in QuestDB. First, we need to get a free API token from Finnhub Stock API. Create a free account online and copy the API key. Export that key to our shell under FINNHUB_TOKEN : export FINNHUB_TOKEN=<my-token-here> The real-time quote endpoint returns various attributes such as the current price, high/low/open quotes, as well as previous close price. Since we are just interested in the current price, we only grab the price and add the ticket symbol and timestamp to the Kafka JSON message. The code below will grab the quote every 30 seconds and publish to the Kafka topic: topic_TSLA . package main import ( "encoding/json" "fmt" "io/ioutil" "net/http" "os" "time" "github.com/confluentinc/confluent-kafka-go/v2/kafka" ) type StockData struct { Price float64 `json:"c"` } type StockDataWithTime struct { Symbol string `json:"symbol"` Price float64 `json:"price"` Timestamp int64 `json:"timestamp"` } func main() { // Create a new Kafka producer instance p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092"}) if err != nil { panic(fmt.Sprintf("Failed to create producer: %s\n", err)) } defer p.Close() for { token, found := os.LookupEnv("FINNHUB_TOKEN") if !found { panic("FINNHUB_TOKEN is undefined") } symbol := "TSLA" url := fmt.Sprintf("https://finnhub.io/api/v1/quote?symbol=%s&token=%s", symbol, token) // Retrieve the stock data resp, err := http.Get(url) if err != nil { fmt.Println(err) return } defer resp.Body.Close() // Read the response body body, err := ioutil.ReadAll(resp.Body) if err != nil { fmt.Println(err) return } // Unmarshal the JSON data into a struct var data StockData err = json.Unmarshal(body, &data) if err != nil { fmt.Println(err) return } // Format data with timestamp tsData := StockDataWithTime{ Symbol: symbol, Price: data.Price, Timestamp: time.Now().UnixNano() / 1000000, } jsonData, err := json.Marshal(tsData) if err != nil { fmt.Println(err) return } topic := fmt.Sprintf("topic_%s", symbol) err = p.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: jsonData, }, nil) if err != nil { fmt.Printf("Failed to produce message: %s\n", err) } fmt.Printf("Message published to Kafka: %s", string(jsonData)) time.Sleep(30 * time.Second) } } To start streaming the data, run the code: $ go run main.go To also get data for SPY, open up another terminal window, modify the code for the symbol to SPY and run the code as well with the token value set. Result After running the producer code, it will print out messages that it sends to Kafka like: Message published to Kafka: {“symbol”:”TSLA”,”price”:174.48,”timestamp”:1678743220215} . This data is sent to the Kafka topic topic_TSLA and sent to QuestDB via the Kafka Connect sink. We can then navigate to localhost:9000 to access the QuestDB console. Searching for all records in the topic_TSLA table, we can see our real-time market quotes: SELECT * FROM ‘topic_TSLA’ We can also look at SPY data from topic_SPY : SELECT * FROM ‘topic_SPY’ With the data now in QuestDB, we can query for aggregate information by getting the average price over a 2m window: SELECT avg(price), timestamp FROM topic_SPY SAMPLE BY 2m; Conclusion Kafka is a trusted component of data pipelines handling large amounts of time series data such as financial data. Kafka can be used to stream mission-critical source data to multiple destinations, including time series databases suited for real-time analytics. In this article, we created a reference implementation of how to poll real-time market data and use Kafka to stream that to QuestDB via Kafka Connect. For more information on the QuestDB Kafka connector, check out the overview page on the QuestDB website. It lists more information on the configuration details and FAQs on setting it up. The GitHub repo for the connector also has sample projects including a Node.js and Java example for those looking to extend this reference architecture.
Miguel Garcia
VP of Engineering,
Nextail Labs
Gautam Goswami
Founder,
DataView
Alexander Eleseev
Full Stack Developer,
First Line Software
Ben Herzberg
Chief Scientist,
Satori