DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports Events Over 2 million developers have joined DZone. Join Today! Thanks for visiting DZone today,
Edit Profile Manage Email Subscriptions Moderation Admin Console How to Post to DZone Article Submission Guidelines
View Profile
Sign Out
Refcards
Trend Reports
Events
Zones
Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Partner Zones AWS Cloud
by AWS Developer Relations
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Partner Zones
AWS Cloud
by AWS Developer Relations

Big Data

Big data comprises datasets that are massive, varied, complex, and can't be handled traditionally. Big data can include both structured and unstructured data, and it is often stored in data lakes or data warehouses. As organizations grow, big data becomes increasingly more crucial for gathering business insights and analytics. The Big Data Zone contains the resources you need for understanding data storage, data modeling, ELT, ETL, and more.

icon
Latest Refcards and Trend Reports
Trend Report
Data Pipelines
Data Pipelines
Refcard #378
Apache Kafka Patterns and Anti-Patterns
Apache Kafka Patterns and Anti-Patterns
Refcard #371
Data Pipeline Essentials
Data Pipeline Essentials

DZone's Featured Big Data Resources

Introduction to Elasticsearch

Introduction to Elasticsearch

By Rama Krishna Panguluri
What Is Elasticsearch? Elasticsearch is a highly scalable and distributed search and analytics engine that is built on top of the Apache Lucene search library. It is designed to handle large volumes of structured, semi-structured, and unstructured data, making it well-suited for a wide range of use cases, including search engines, log analysis, e-commerce, and security analytics. Elasticsearch uses a distributed architecture that allows it to store and process large volumes of data across multiple nodes in a cluster. Data is indexed and stored in shards, which are distributed across nodes for improved scalability and fault tolerance. Elasticsearch also supports real-time search and analytics, allowing users to query and analyze data in near real time. One of the key features of Elasticsearch is its powerful search capabilities. It supports a wide range of search queries, including full-text search, geospatial search, etc. It also provides support for advanced analytics features such as aggregations, metrics, and data visualization. Elasticsearch is often used in conjunction with other tools in the Elastic Stack, including Logstash for data collection and processing and Kibana for data visualization and analysis. Together, these tools provide a comprehensive solution for search and analytics that can be used for a wide range of applications and use cases. What Is Apache Lucene? Apache Lucene is an open-source search library that provides powerful text search and indexing capabilities. It is widely used by developers and organizations to build search applications, ranging from search engines to e-commerce platforms. Lucene works by indexing the text content of documents and storing the index in a structured format that can be searched efficiently. The index is composed of a series of inverted lists, which provide mappings between terms and the documents that contain them. When a search query is submitted, Lucene uses the index to quickly retrieve the documents that match the query. In addition to its core search and indexing capabilities, Lucene provides a range of advanced features, including support for fuzzy search and spatial search. It also provides tools for highlighting search results and ranking search results based on relevance. Lucene is used by a wide range of organizations and projects, including Elasticsearch. Its rich set of features, flexibility, and extensibility make it a popular choice for building search applications of all kinds. What Is Inverted Index? Lucene's Inverted Index is a data structure used to efficiently search and retrieve text data from a collection of documents. The Inverted Index is a central feature of Lucene, and it is used to store the terms and their associated documents that make up the index. The Inverted Index provides several benefits over other search strategies. First, it allows for fast and efficient retrieval of documents based on search terms. Second, it can handle a large amount of text data, making it well-suited for use cases with large collections of documents. Finally, it supports a wide range of advanced search features, such as fuzzy matching and stemming, that can improve the accuracy and relevance of search results. Why Elasticsearch? There are several reasons why Elasticsearch is a popular choice for building search and analytics applications: Easy to scale (Distributed): Elasticsearch is built to scale horizontally out of the box. Whenever you need to increase capacity, just add more nodes, and let the cluster reorganize itself to take advantage of the extra hardware. One server can hold one or more parts of one or more indexes, and whenever new nodes are introduced to the cluster, they are just being added to the party. Every such index, or part of it, is called a shard, and Elasticsearch shards can be moved around the cluster very easily. Everything is one JSON call away (RESTful API): Elasticsearch is API driven. Almost any action can be performed using a simple RESTful API using JSON over HTTP. Responses are always in JSON format. Unleashed power of Lucene under the hood: Elasticsearch uses Lucene internally to build its state-of-the-art distributed search and analytics capabilities. Since Lucene is a stable, proven technology and is continuously being added with more features and best practices, having Lucene as the underlying engine that powers Elasticsearch. Excellent Query DSL: The REST API exposes a very complex and capable query DSL that is very easy to use. Every query is just a JSON object that can practically contain any type of query or even several of them combined. Using filtered queries, with some queries expressed as Lucene filters, helps leverage caching and thus speed up common queries or complex queries with parts that can be reused. Multi-Tenancy: Multiple indexes can be stored on one Elasticsearch installation - node or cluster. The nice thing is you can query multiple indexes with one simple query. Support for advanced search features (Full Text): Elasticsearch uses Lucene under the covers to provide the most powerful full-text search capabilities available in any open-source product. The search comes with multi-language support, a powerful query language, support for geolocation, context-aware did-you-mean suggestions, autocomplete, and search snippets. Script support in filters and scorers. Configurable and Extensible: Many Elasticsearch configurations can be changed while Elasticsearch is running, but some will require a restart (and, in some cases, re-indexing). Most configurations can be changed using the REST API too. Document Oriented: Store complex real-world entities in Elasticsearch as structured JSON documents. All fields are indexed by default, and all the indices can be used in a single query to return results at breathtaking speed. Schema Free: Elasticsearch allows you to get started easily. Send a JSON document, and it will try to detect the data structure, index the data, and make it searchable. Conflict Management: Optimistic version control can be used where needed to ensure that data is never lost due to conflicting changes from multiple processes. Active Community: The community, other than creating nice tools and plugins, is very helpful and supportive. The overall vibe is great, and this is an important metric of any OSS project. There are also some books currently being written by community members and many blog posts around the net sharing experiences and knowledge. Elasticsearch Architecture The main components of Elasticsearch architecture are: Node: A node is an instance of Elasticsearch that stores data and provides search and indexing capabilities. Nodes can be configured to be either a master node or a data node, or both. Master nodes are responsible for cluster-wide management, while data nodes store the data and perform search operations. Cluster: A cluster is a group of one or more nodes working together to store and process data. A cluster can contain multiple indices (collections of documents) and shards (a way to distribute data across multiple nodes). Index: An index is a collection of documents that share a similar structure. Each document is represented as a JSON object and contains one or more fields. Elasticsearch indexes all fields by default, making it easy to search and analyze data. Shards: An index can be split into multiple shards, which are essentially smaller subsets of the index. Sharding allows for the parallel processing of data and distributed storage across multiple nodes. Replicas: Elasticsearch can create replicas of each shard to provide fault tolerance and high availability. Replicas are copies of the original shard and can be located on different nodes. Data Node Cluster Architecture Data nodes are responsible for storing and indexing data, as well as performing search and aggregation operations. The architecture is designed to be scalable and distributed, allowing for horizontal scaling by adding more nodes to the cluster. Here are the main components of an Elasticsearch data node cluster architecture: Data Node: A node is an instance of Elasticsearch that stores data and provides search and indexing capabilities. In a data node cluster, each node is responsible for storing a portion of the index data and serving search queries against that data. Cluster State: The cluster state is a data structure that holds information about the cluster, including the list of nodes, indices, shards, and their locations. The master node is responsible for maintaining the cluster state and distributing it to all other nodes in the cluster. Discovery and transport: Nodes in an Elasticsearch cluster communicate with each other using two protocols: discovery and transport. The discovery protocol is responsible for discovering new nodes joining the cluster or nodes that have left the cluster. The transport protocol is responsible for sending and receiving data between nodes. Index Request Index request is executed as below block diagram in Elasticsearch. Who Is Using Elasticsearch? Few companies and organizations that use Elasticsearch: Netflix: Netflix uses Elasticsearch to power its search and recommendations engine, allowing users to quickly find content to watch. GitHub: GitHub uses Elasticsearch to provide fast and efficient search capabilities across their code repositories, issues, and pull requests. Uber: Uber uses Elasticsearch to power their real-time analytics platform, allowing them to track and analyze data on its ride-hailing service in real-time. Wikipedia: Wikipedia uses Elasticsearch to power its search engine and provide fast and accurate search results to users. More
Kafka: The Basics

Kafka: The Basics

By Pavel Micka
Data synchronization is one of the most important aspects of any product. Apache Kafka is one of the most popular choices when designing a system that expects near-real-time propagation of large volumes of data. Even though Kafka has simple yet powerful semantics, working with it requires insight into its architecture. This article summarizes the most important design aspects of Kafka as a broker and applications that act as data producers or consumers. About Kafka Apache Kafka originated on LinkedIn and was developed as a highly scalable distribution system for telemetry and usage data. Over time, Kafka evolved into a general-purpose streaming data backbone that combines high throughput with low data delivery latencies. Internally, Kafka is a distributed log. A (commit) log is an append-only data structure to whose end the producers append the data (log records), and subscribers read the log from the beginning to replay the records. This data structure is used, for example, in the database write-ahead log. Distributed log means that the actual data structure is not hosted on a single node but is distributed across many nodes to achieve both high availability and high performance. Internals and Terminology Before we jump into how Kafka is used by applications, let's quickly go through the basic terminology and architecture so we understand the guarantees that Kafka provides to its users. A single Kafka node is called a Broker. The broker receives messages from producers and distributes these to consumers. Producers send the messages into distributed logs, which are called topics (in traditional messaging, this corresponds to a queue). To scale up the performance of a single topic over the capacity of a single node, each topic may be split into multiple partitions. To achieve high availability and durability of the data stored, each partition has a leader (performing all read and write operations) and multiple followers. Partitions are assigned to brokers automatically, and the failover of a broker is also automatic and transparent to developers using Kafka. On the backend, the assignment of leader/replica roles is orchestrated using leader election in Apache ZooKeeper or in the newer versions of Kafka using the KRaft protocol. In the diagram, we can see a Kafka cluster, which consists of five brokers. In this scenario, two topics (A and B) were created. Topic A has two partitions, while topic B has only a single partition. The cluster was set up with replication factor 3 — this means there are always three copies of the data stored, allowing two nodes to fail without losing the data. The replication factor of 3 is a sane default since it guarantees tolerance of a node failure even during the maintenance of one other broker. You may ask why topic A was divided into two partitions; what is the benefit? First, please notice that leader of Partition 1 is on a different node than the leader of Partition 2. This means that if clients produce/consume data to/from this topic, they may use the disk throughput and performance of 2 nodes instead of 1. On the other hand, there is a cost to this decision: message ordering is guaranteed only within a single partition. Producers and Consumers Now that we have some understanding of how Kafka works internally, let's take a look at how the situation looks from the perspective of producers/consumers. Producer Let's start with the producer. As mentioned above, replication or assignment of topics/partitions is a concern of Kafka itself and is not visible to producers or consumers. So the producer only needs to know which topics it wishes to send data to and if these topics have multiple partitions. In case the topic is partitioned (entity-1), the producer may create as part of its code a "partitioner," which is a simple class that decides to which partition the given record belongs. So in Kafka, the partitioning is driven by the producer. In case the producer does not specify any partitioner (but the topic is partitioned), a round-robin strategy is used. Round-robin is completely fine for entities where the exact ordering is not important — there is no causal relation between the records. For example, if you have a topic with sensor measurements, these measurements may be sent by the sensors on a scheduled basis — hence there is no particular order of the records. And round-robin provides an easy way to balance the records among the individual partitions. Our example with sensors also shows another important detail: there may be multiple producers sending the records into one topic. In the diagram above, we see that we have many sensors (producers) creating two types of records: humidity (in green) and CO2 concentration (in red). Each of the records also contains information about the sensor itself (such as its (serial) number, in this example integer is used for the sake of simplicity). Because each of the sensors ever produced has the capability of measuring humidity, while only some of the sensors support CO2 measurements, the designers of the systems have decided to split the humidity records into two partitions using the serial number of the sensor. Notice that there is strict ordering within each of the humidity partitions (and within the CO2 partition), but there is no ordering of records between the partitions — in other words: B will be always processed before D and E. A will always be processed before C, but there is no ordering guarantee between B and A (or C). Consumer Kafka consumer is an application that reads the records from the topic. In Kafka, there is one more concept through the consumer group — a set of consumers that cooperate. When there are multiple consumers from the same group subscribed to the same topic, Kafka always distributes the partitions among the consumers in the same group in a way that each partition is read exactly once (there may be multiple partitions read by a single consumer, but one partition will not be read by multiple consumers). In case some of the consumers fail, Kafka will automatically reassign partitions to other consumers (please note that consumers do not need to subscribe to all topics). But in case of a failover or switchover, how does Kafka know where to continue? We have already said that a topic contains all the messages (even the messages that were already read). Does this mean that the consumer must read again the whole topic? The answer is that the consumer is able to continue where the previous one stopped. Kafka uses a concept called an offset, which is essentially a pointer to a message in the partition, which stores the position of the last processed message by any given consumer group. Offset While it may seem trivial, the concept of offsets and distributed logs is extremely powerful. It is possible to dynamically add new consumers, and these consumers (starting from offset=0) are able to catch up with the full history of data. While with traditional queues, the consumers would need to somehow fetch all the data from consumers (because messages are deleted once read in classic messaging). This data sync is more complex because either the producer produces the messages into the one queue used for increments (and affects all other consumers), or the consumer needs to use some other mechanism (such as REST or another dedicated queue), which creates data synchronization issues (as two independent unsynchronized mechanisms are used). Another huge benefit is that the consumer may any time decide to reset the offset and read from the beginning of the time. Why would one do that? Firstly there is a class of analytical applications (such as machine learning) that requires processing the whole dataset, and offset reset gives such a mechanism. Secondly, it may happen that there is a bug in the consumer, which corrupts the data. In this case, the consumer product team may fix the issue and reset the offset – effectively reprocessing the whole dataset and replacing corrupt data with the correct one. This mechanism is heavily used in Kappa-architecture. Retention and Compaction We have above stated that the commit log is append-only, but this does not imply that the log is immutable. In fact, this is true only for certain types of deployments, where it is necessary to hold the full history of all changes (for auditing purposes or to have real kappa architecture). This strategy is powerful but also has a price. Firstly performance: the consumer needs to go through huge volumes of data in order to get on top of the log. Secondly, if the log contains any sensitive information, it is hard to get rid of it (which makes this type of log unfriendly to regulations that require the data to be erased on request). But in many cases, the logs have some fixed retention — either size or time-based. In this case, the log contains only a window of messages (and any overflow is automatically erased). Using a log as a buffer makes the log size reasonable and also ensures that the data does not stay in the log forever (making it easier to adhere to compliance requirements). However, this also makes the log unusable for certain use cases — one of these use cases is when you want to have all the records available to newly subscribed consumers. The last type of log is the so-called compacted log. In a compacted log, each record has not only a value but also a key. Whenever a new record is appended to the topic, and there is already a record with the same key, Kafka will eventually compact the log and erase the original record. Be aware that this means for a certain time, there will be multiple records with the same key, and the up-to-date value is always in the most recently inserted record — this does not require any additional handling in case you go with at-least-once semantics (it is guaranteed that the message will be delivered, but in case of any uncertainty (for example due to network issues), the message may be delivered multiple times). You can picture the compacted log as a streaming form of a database that allows anyone to subscribe to the newest data. This image of Kafka is a very correct one because there is a duality between a stream and a table. Both these concepts are merely different views of the same thing — in SQL DB, we also use tables, but under the hood, there is a commit log. Similarly, any Kafka topic (compacted included) can be viewed as a table. In fact, the Kafka Streams library builds on this duality. There is even ksqlDB (Kafka SQL) that allows you to issue SQL statements over records in Kafka. In the topology above, we see that the inbound measurement topics (temperature, humidity, co2…) are normal topics with retention set to seven days. The retention allows the developers to time travel a week back in case they find a bug in their implementation. From these inbound topics, the data are read by two services (each in a separate consumer group). The measurements history service stores the telemetry into a time-series database (long-term storage), which may be used as a source for graphs and widgets in the UI of the system. The trends service aggregates the data (creates 24h windows of the measurements in the given room), so these can be used by downstream controllers and sends the results through a compacted topic. The topic is compacted because there is no need to keep any historical records (only the latest trend is valid). On the other hand, the customer may add a new device (and associated controller) at any time, so we want to ensure that the latest readings for the given room are always present. Patterns and Principles In the previous paragraphs, we presented basic concepts. In this section, we'll expand on those and discuss a few other Kafka patterns. Eventually Consistent Architecture In data synchronization architecture based on messaging, we want to ensure that whenever new data is produced in one product, it will be available to all relevant products in near-real-time. This means that if the user creates/modifies some entity in product A and navigates to product B, he/she should (ideally) see the up-to-date version of this entity. However, since the individual products use multiple independent databases, it is not practical to have a distributed transaction mechanism and have atomical consistency between these databases. Instead, we go with the eventual consistency. In this model, the data producer is responsible for publishing any record it creates/updates/deletes to Kafka, from which an interested consumer may retrieve the record and store it locally. This propagation between systems takes some time. Less than a second (expected) between the publishing of the record and the moment when the record is available to subscribers Also, the consumer may optimize writes to his database (e.g., batch writes). During this time period, some of the systems (the replicas) have slightly stale data. It may also happen that some of the replicas will not be able to catch up for some time (downtime, network partition). But structurally, it is guaranteed that all the systems will eventually converge to the same results and will hold a consistent dataset — hence the term "eventual consistency." Optimizing Writes to the Local Database As alluded to in the previous paragraph, consumers may want to optimize writes to their local database. For example, it is highly undesirable to commit on a per-record basis in relational databases because transaction commit is a relatively expensive operation. It may be much wiser to commit in batches (commit every 5000 records; at a maximum of 500ms intervals — whatever comes first). Kafka is well able to support this (because committing to an offset is in hands of the consumer). Another example is AWS Redshift, which is a data warehouse/OLAP database in which commits are very expensive. Also, in Redshift, every commit invalidates its query caches. And as a result, the cluster takes the hit of the commit twice — once to perform the commit itself and for the second time when all previously cached queries must be re-evaluated. For these reasons, you may want to commit to Redshift (and similar technologies) on a scheduled basis every X minutes to limit the blast radius of this action. The last example may be NoSQL databases that do not support transactions. It may be just fine to stream the data on a per-record basis (obviously, depending on the capabilities of the DB engine). There is one takeaway: different replicas may use a slightly different persistence strategy, even if they consume the same data. Always assume that there is a possibility that the other side does not have the data available yet. Referential Integrity Between Topics It is important to understand that since the Kafka-based data synchronization is eventually consistent, there is no implicit referential integrity or causal integrity between the individual topics (or partitions). When it comes to referential integrity, the consumers should be written in a way that they expect that they may receive, for example, measurements for a room that they have not received yet. Some of the consumers may overcome this situation either by not showing the data at all till all the dimensions are present (for example, you can't turn on ventilation when you do not know the room). For other systems, the missing reference is not really an issue: the average temperature in the house will be the same, regardless of the presence of room details. For these reasons, it may be impractical to impose any strict restrictions centrally. Stateful Processing Kafka consumers may require stateful processing — such as aggregation, window function, and deduplication. Also, the state itself may not be of a trivial size, or there may be a requirement that in case of a failure, the replica is able to continue almost instantly. In these cases, storing the results in the RAM of the consumer is not the best choice. Luckily, the Kafka Streams library has out-of-the-box support for RocksDB — a high-performance embedded key-value store. RocksDB is able to store the results both in RAM and on disk. Caching Strategy and Parallelism Closely related to stateful processing is a caching strategy. Kafka is, by its design, not well suited for the competing consumer's style of work because each partition is assigned to exactly one processor. If one wants to implement competing consumers, he needs to create significantly more partitions than there are consumers within the system to emulate the behavior. However, this is not the way parallelism should be handled in Kafka-based systems (in case you really need a job queue of unrelated jobs, you will be much better off with RabbitMQ, SQS, and ActiveMQ…). Kafka is a stream processing system, and it is expected that the records in one partition somehow relate to each other and should be processed together. The individual partitions act as data shards, and since Kafka guarantees that each of these partitions will be assigned to one and exactly one consumer, the consumer can be sure that there is no other competing processor — so it can cache the results as it sees fit in its local cache and does not need to implement any distributed caching (such as Redis). In case the processor fails/crashes, Kafka will just reassign the partition to another consumer, which will populate its local caches and continue. This design of stream processing is significantly easier than competing consumers. There is one consideration, though. That is the partitioning strategy because that is defined by default by the producer, while different consumers may have multiple mutually incompatible needs. For this reason, it is common in Kafka's world to re-partition the topic. In our scenario, it would work the following way: In the diagram, we can see that Trends produce trends in its topic. This topic is round-robin partitioned and compacted. ProductX, which focuses on large industrial customers, needs to partition the data in some other way, for example, by customerId. In this case, ProductX may write a simple application that re-partitions the data (re-partitioning is often managed under the hood by the Kafka Streams library). In other words, it reads the data from the source topic and writes it into another topic, managed by ProductX, which partitions the data differently (per business unit in this case). With this partitioning, ProductX is able to shard the non-overlapping business units to dedicated processing nodes, massively increasing the processing parallelism. The internal ProductX topic may have just short retention (such as 24h) because it does not hold the authoritative copy of data, and the data can be easily replayed from the original topic, if necessary. In Kafka Streams, you may want to join several entities in order to combine the data (this is a common use case). Beware that in case you have multiple consumers, you need to have the inbound topics partitioned in the exact same way (same partitioner (join key based), same number of partitions). Only this way, you have guaranteed that the entities with matching join keys will be received by the same consumer (processor). Summary In this article, we have discussed how the overall architecture of Kafka and how this low-level architecture allows the broker to easily scale horizontally (thanks to partitioning) and ensure high availability and durability (thanks to leader/replica design and master election). We also went through the basics of designing Kafka-based topologies, explained eventual consistency and how that affects the guarantees given to our applications, and learned how to use Kafka's different types of logs and their retention. While Kafka may seem overwhelming at first, it is important to realize that internally it is based on the plain old good distributed log. This relatively simple internal structure is what gives Kafka its straightforward semantics, high throughput, and low data propagation latencies. Qualities are crucial for building any data pipeline. More
User Data Governance and Processing Using Serverless Streaming
User Data Governance and Processing Using Serverless Streaming
By Maharshi Jha
Apache Kafka + Apache Flink = Match Made in Heaven
Apache Kafka + Apache Flink = Match Made in Heaven
By Kai Wähner CORE
How to Stream Sensor Data to Apache Pinot for Real Time Analysis
How to Stream Sensor Data to Apache Pinot for Real Time Analysis
By David G. Simmons CORE
Handling Bad Messages via DLQ by Configuring JDBC Kafka Sink Connector
Handling Bad Messages via DLQ by Configuring JDBC Kafka Sink Connector

Any trustworthy data streaming pipeline needs to be able to identify and handle faults. Exceptionally while IoT devices ingest endlessly critical data/events into permanent persistence storage like RDBMS for future analysis via multi-node Apache Kafka cluster. (Please click here to read how to set up a multi-node Apache Kafka Cluster). There could be scenarios where IoT devices might send fault/bad events due to various reasons at the source points, and henceforth appropriate actions can be executed to correct it further. The Apache Kafka architecture does not include any filtering and error handling mechanism within the broker so that maximum performance/scale can be yielded. Instead, it is included in Kafka Connect, which is an integration framework of Apache Kafka. As a default behavior, if a problem arises as a result of consuming an invalid message, the Kafka Connect task terminates, and the same applies to JDBC Sink Connector. Kafka Connect has been classified into two categories, namely Source (to ingest data from various data generation sources and transport to the topic) and Sink (to consume data/messages from the topic and send them eventually to various destinations). Without implementing a strict filtering mechanism or exception handling, we can ingest/publishes messages inclusive of wrong formatted to the Kafka topic because the Kafka topic accepts all messages or records as byte arrays in key-value pairs. But by default, the Kafka Connect task stops if an error occurs because of consuming an invalid message, and on top of that JDBC sink connector additionally won’t work if there is an ambiguity in the message schema. The biggest difficulty with the JDBC sink connector is that it requires knowledge of the schema of data that has already landed on the Kafka topic. Schema Registry must therefore be integrated as a separate component with the exiting Kafka cluster to transfer the data into the RDBMS. Therefore, to sink data from the Kafka topic to the RDBMS, the producers must publish messages/data containing the schema. You could read here to learn streaming Data via Kafka JDBC Sink Connector without leveraging Schema Registry from the Kafka topic. Since Apache Kafka 2.0, Kafka Connect has integrated error management features, such as the ability to reroute messages to a dead letter queue. In the Kafka cluster, a dead letter queue (DLQ) is a straightforward topic that serves as the destination for messages that, for some reason, were unable to reach their intended recipients, especially for JDBC sink connector, tables in RDBMS There might be two major reasons why the JDBC Kafka sink connector stops working abruptly while consuming messages from the topic: Ambiguity between data types and the actual payload Junk data in payload or wrong schema There is no complicacy of DLQ configuration in the JDBC sink connector. The following parameters need to be added in the sink configuration file (.properties file): errors.tolerance=allerrors.deadletterqueue.topic.name= <<Name of the DLQ Toic>>errors.deadletterqueue.topic.replication.factor= <<No of replication>>Note:- No of replication should be equal or less then the number of Kafka broker in the cluster. The DLQ topic would be created automatically with the above-mentioned replication factor when we start the JDBC sink connector for the first time. When an error occurs, or bad data is encountered by the JDBC sink connector while consuming messages from the topic, these unprocessed messages/bad data would be forwarded straightly to the DLQ. Subsequently, correct messages or data will send to the respective RDBMS tables continuously and again in between. If bad messages are encountered, then the same would be forwarded to the DLQ and so on. After landing the bad or erroneous messages on the DLQ, we will have two options either manually introspect each message to understand the root cause of the error or implement a mechanism to reprocess the bad messages and push them eventually to the consumers for JDBC sink connector the destination should be RDBMS tables. Dead letter queues are not enabled by default in Kafka Connect due to the above reason. Even though Kafka Connect supports several error-handling strategies, such as dead letter queues, silently ignoring, and failing quickly, the adoption of DLQ would be the best approach while configuring the JDBC sink connector. Decoupling completely the bad/error messages handling from the normal messages/data transportation from the Kafka topic would boost the overall efficiency of the entire system as well as allow the development team to develop an independent error handling mechanism from easy maintainability perspectives. Hope you have enjoyed this read. Please like and share if you feel this composition is valuable.

By Gautam Goswami CORE
Delta, Hudi, and Iceberg: The Data Lakehouse Trifecta
Delta, Hudi, and Iceberg: The Data Lakehouse Trifecta

As data becomes increasingly important for businesses, the need for scalable, efficient, and cost-effective data storage and processing solutions is more critical than ever. Data Lakehouses have emerged as a powerful tool to help organizations harness the benefits of both Data Lakes and Data Warehouses. In the first article, we highlighted key benefits of Data Lakehouses for businesses, while the second article delved into the architectural details. In this article, we will focus on three popular Data Lakehouse solutions: Delta Lake, Apache Hudi, and Apache Iceberg. We will explore the key features, strengths, and weaknesses of each solution to help you make an informed decision about the best fit for your organization's data management needs. Data Lakehouse Innovations: Exploring the Genesis and Features of Delta Lake, Apache Hudi, and Apache Iceberg The three Data Lakehouse solutions we will discuss in this article — Delta Lake, Apache Hudi, and Apache Iceberg — have all emerged to address the challenges of managing massive amounts of data and providing efficient query performance for big data workloads. Although they share some common goals and characteristics, each solution has its unique features, strengths, and weaknesses. Delta Lake was created by Databricks and is built on top of Apache Spark, a popular distributed computing system for big data processing. It was designed to bring ACID transactions, scalable metadata handling, and unification of batch and streaming data processing to Data Lakes. Delta Lake has quickly gained traction in the big data community due to its compatibility with a wide range of data platforms and tools, as well as its seamless integration with the Apache Spark ecosystem. Apache Hudi (Hadoop Upserts Deletes and Incrementals) is an open-source project developed by Uber to efficiently manage large-scale analytical datasets on Hadoop-compatible distributed storage systems. Hudi provides upserts and incremental processing capabilities to handle real-time data ingestion, allowing for faster data processing and improved query performance. With its flexible storage and indexing mechanisms, Hudi supports a wide range of analytical workloads and data processing pipelines. Apache Iceberg is an open table format for large-scale, high-performance data management, initially developed by Netflix. Iceberg aims to provide a more robust and efficient foundation for data lake storage, addressing the limitations of existing storage solutions like Apache Hive and Apache Parquet. One of its most significant innovations is the use of a flexible and powerful schema evolution mechanism, which allows users to evolve table schema without rewriting existing data. Iceberg also focuses on improving metadata management, making it scalable and efficient for very large datasets. Each of these solutions has evolved in response to specific needs and challenges in the big data landscape, and they all bring valuable innovations to the Data Lakehouse concept. In the following sections, we will delve into the technical aspects of each solution, examining their data storage and file formats, data versioning and history, data processing capabilities, query performance optimizations, and the technologies and infrastructure required for their deployment. Navigating Delta Lake: Key Aspects of Data Storage, Processing, and Access Delta Lake employs the open-source Parquet file format, a columnar storage format optimized for analytical workloads. It enhances the format by introducing an ACID transaction log, which maintains a record of all operations performed on the dataset. This transaction log, combined with the file storage structure, ensures reliability and consistency in the data. Data versioning and history are essential aspects of Delta Lake, enabling users to track changes and roll back to previous versions if necessary. The transaction log records every operation, thus providing a historical view of the data and allowing for time-travel queries. Delta Lake ensures efficient query performance by implementing various optimization techniques. One such technique is data compaction, which combines small files into larger ones to improve read performance. Furthermore, it employs a mechanism called Z-Ordering to optimize the organization of data on disk, which reduces the amount of data read during queries. For data access, Delta Lake provides a simple and unified API to read and query data from the tables. You can use time-travel queries to access historical versions of your data or perform complex analytical operations using the supported query engines. To store data in Delta Lake format, data must first be processed and saved in the appropriate file format. Here's an example code snippet for writing data to a Delta Lake table using Apache Spark with following reading: Python from pyspark.sql import SparkSession spark = SparkSession.builder \ .appName("Delta Lake Write and Read Example") \ .config("spark.jars.packages", "io.delta:delta-core_2.12:2.3.0") \ .getOrCreate() # Read data from a source, e.g., a CSV file data = spark.read.format("csv").load("/path/to/csv-file") # Write data to a Delta Lake table data.write.format("delta") \ .mode("overwrite") \ .save("/path/to/delta-lake-table") # Read data from the Delta Lake table delta_data = spark.read.format("delta") \ .load("/path/to/delta-lake-table") # Perform some transformations and actions on the data result = delta_data.filter("some_condition").groupBy("some_column").count() result.show() In the code snippet above, we use the "delta" format to write and read data to and from a Delta Lake table. The Delta Lake library is included in the Spark session by adding the "io.delta:delta-core_2.12:2.3.0" package to the "spark.jars.packages" configuration. Delta Lake supports a wide range of query engines, including Apache Spark, Databricks Runtime, and Presto. It also provides APIs for programming languages such as Scala, Python, SQL, and Java, enabling seamless integration with existing data processing pipelines. Delta Lake integrates with various data platforms and tools, such as Apache Hive, Apache Flink, and Apache Kafka. In terms of deployment, it can be utilized in on-premises environments, as well as in cloud platforms like AWS, Azure, and GCP. For storage, Delta Lake can work with distributed file systems like HDFS or cloud-based storage services such as Amazon S3, Azure Data Lake Storage, and Google Cloud Storage. Data Management in Apache Hudi: Exploring Its Core Components Apache Hudi is another powerful Data Lakehouse solution that provides efficient data storage and querying capabilities. Like Delta Lake, it also uses Parquet as its underlying file format and adds a transaction log for ACID compliance. Hudi's storage management system enables upserts, incremental processing, and rollback support, allowing for efficient data ingestion and access. One of the key aspects of Apache Hudi is its built-in support for data partitioning, which helps optimize query performance by reducing the amount of data scanned during query execution. Hudi also provides a mechanism called "indexing" to enable fast record-level lookups, updates, and deletes. Hudi supports various query engines, including Apache Spark, Apache Hive, and Presto, and offers APIs for languages like Scala, Python, SQL, and Java. This flexibility ensures seamless integration with your existing data processing infrastructure. To write and read data using Apache Hudi, you can use the following code snippet: Python from pyspark.sql import SparkSession spark = SparkSession.builder \ .appName("Apache Hudi Write and Read Example") \ .config("spark.jars", "path/to/hudi-spark-bundle.jar") \ .getOrCreate() # Read data from a source, e.g., a CSV file data = spark.read.format("csv").load("/path/to/csv-file") # Write data to an Apache Hudi table data.write.format("org.apache.hudi") \ .options(get_hudi_options()) \ .mode("overwrite") \ .save("/path/to/hudi-table") # Read data from the Apache Hudi table hudi_data = spark.read.format("org.apache.hudi") \ .load("/path/to/hudi-table/*") # Perform some transformations and actions on the data result = hudi_data.filter("some_condition").groupBy("some_column").count() result.show() In the example above, the "org.apache.hudi" format is specified for writing data to an Apache Hudi table. The required Hudi library is added to the Spark session by specifying the "hudi-spark-bundle.jar" path in the "spark.jars" configuration. Apache Iceberg Basics: A Journey Through Data Management Fundamentals Apache Iceberg is a relatively new addition to the Data Lakehouse landscape. It is an open table format that provides strong consistency, snapshot isolation, and efficient query performance. Like Delta Lake and Apache Hudi, Iceberg also uses Parquet as its underlying file format and builds additional features on top of it. Iceberg's schema evolution mechanism is one of its most significant innovations. It allows users to evolve table schema without the need to rewrite existing data. This capability makes it possible to add, delete, or update columns in a table while preserving the existing data layout. Another key aspect of Iceberg is its scalable and efficient metadata management system. It uses a combination of manifest files and metadata tables to store information about table data, making it easier to manage large datasets. Iceberg optimizes query performance by employing techniques like predicate pushdown, which reduces the amount of data read during query execution. Iceberg supports a variety of query engines, including Apache Spark, Apache Flink, and Trino (formerly known as PrestoSQL). It also provides APIs for programming languages such as Scala, Python, SQL, and Java, ensuring seamless integration with your existing data processing infrastructure. To write and read data using Apache Iceberg, you can use the following code snippet: Python from pyspark.sql import SparkSession spark = SparkSession.builder \ .appName("Apache Iceberg Write and Read Demonstration") \ .config("spark.jars.packages", "org.apache.iceberg:iceberg-spark3-runtime:0.13.2") \ .getOrCreate() # Load data from a source, such as a CSV file data = spark.read.format("csv").load("/path/to/csv-file") # Write data to an Apache Iceberg table data.write.format("iceberg") \ .mode("overwrite") \ .save("iceberg_catalog_namespace.table_name") # Load data from the Apache Iceberg table iceberg_data = spark.read.format("iceberg") \ .load("iceberg_catalog_namespace.table_name") # Apply transformations and actions to the data result = iceberg_data.filter("some_condition").groupBy("some_column").count() result.show() In the example above, the "iceberg" format is specified for writing data to an Apache Iceberg table. The Iceberg library is included in the Spark session by adding the "org.apache.iceberg:iceberg-spark3-runtime:0.13.2" package to the "spark.jars.packages" configuration. Iceberg can be deployed in on-premises environments or cloud platforms like AWS, Azure, and GCP. It supports various storage systems, including distributed file systems like HDFS and cloud-based storage services such as Amazon S3, Azure Data Lake Storage, and Google Cloud Storage. Weighing the Pros and Cons: Analyzing Delta Lake, Apache Hudi, and Apache Iceberg To help you make an informed decision about which Data Lakehouse solution is best for your organization, we have compared the features of Delta Lake, Apache Hudi, and Apache Iceberg using a set of factors. In the table below, each factor is evaluated as supported (+), unsupported (-), or partly supported (±). CRITERION DELTA LAKE APACHE HUDI APACHE ICEBERG ACID Transactions + + + Schema Evolution + + + Time Travel (Data Versioning) + + + Data Partitioning + + + Upserts and Deletes + + + Incremental Processing + + +- Data Deduplication +- + +- Metadata Scalability + +- + Compaction Management +- + + Merge on Read and Copy on Write Storage - + - Query Optimization Techniques + +- + Support for Multiple Query Engines + + + Integration with Other Data Platforms and Tools + + + Cloud-native Storage Compatibility + + + Ease of Deployment and Management + +- + This table provides a high-level comparison of the features supported by Delta Lake, Apache Hudi, and Apache Iceberg. It is important to note that each solution has its unique strengths and trade-offs, and the best choice for a specific use case depends on the organization's requirements, existing infrastructure, and familiarity with the technologies involved. Summing Up the Data Lakehouse Landscape: Key Insights and Analysis In conclusion, the Data Lakehouse concept has emerged as a promising solution to address the challenges of traditional data warehouses and data lakes, providing a unified platform for scalable, reliable, and efficient data management. As organizations strive to harness the power of their data, selecting the right Data Lakehouse solution becomes crucial for optimizing performance and adaptability. Throughout this comparison, we have examined the key aspects of three prominent Data Lakehouse solutions: Delta Lake, Apache Hudi, and Apache Iceberg. Each of these solutions has its unique strengths and trade-offs, catering to a variety of use cases and requirements. By assessing their data storage, processing, and access capabilities, as well as their integration with existing technologies and infrastructure, organizations can make informed decisions on which solution best aligns with their needs. While the comparison table highlights the high-level differences between Delta Lake, Apache Hudi, and Apache Iceberg, it is essential to consider the specific requirements and constraints of each organization. Factors such as ease of deployment, compatibility with current infrastructure, and familiarity with the underlying technologies can significantly impact the success of a Data Lakehouse implementation. In our next article, we will delve deeper into the technologies used for implementing Data Lakehouses, exploring the underlying mechanisms, tools, and best practices that can help organizations optimize their data management strategies. Ultimately, the choice between Delta Lake, Apache Hudi, and Apache Iceberg will depend on a careful evaluation of their respective features, trade-offs, and alignment with the organization's objectives. By thoroughly understanding the capabilities of each solution, organizations can ensure a future-proof Data Lakehouse infrastructure that facilitates data-driven decision-making and unlocks new insights to drive business growth.

By Andrey Gusarov
Lambda Architecture: A Powerful Approach to Data Engineering
Lambda Architecture: A Powerful Approach to Data Engineering

In the world of big data and analytics, processing and managing vast amounts of data efficiently and effectively is a critical challenge. Data engineers play a pivotal role in designing and implementing solutions to handle this data deluge. One such approach that has gained popularity in recent years is the Lambda Architecture, a powerful framework for building scalable and robust data processing pipelines. In this article, we will explore the Lambda Architecture in detail, understanding its key concepts, benefits, and challenges. What Is Lambda Architecture? The Lambda Architecture is a data processing architecture that combines batch processing with real-time/stream processing to handle large volumes of data in a distributed and fault-tolerant manner. It was introduced by Nathan Marz in his book "Big Data: Principles and best practices of scalable real-time data systems" and has since become a widely adopted approach in the field of data engineering. The Lambda Architecture follows a "speed layer" and a "batch layer" approach, where data is processed in parallel through both layers, and the results are combined to produce a single output. The speed layer deals with real-time data processing and provides low-latency responses, while the batch layer handles large-scale data processing and provides comprehensive results. The combination of both layers allows for near-real-time processing of incoming data while also supporting historical data analysis. Key Concepts of Lambda Architecture The Lambda Architecture is based on a few fundamental concepts that make it unique and powerful: Batch Layer: The batch layer is responsible for processing and analyzing large volumes of data in batch mode. It can handle data in a distributed and parallel manner, making it highly scalable. Typically, it uses batch processing frameworks such as Apache Hadoop or Apache Spark to process data stored in distributed file systems like Hadoop Distributed File System (HDFS) or Amazon S3. The batch layer generates batch views, which are immutable and historical representations of the data. Speed Layer: The speed layer is responsible for processing and analyzing real-time data streams in near real-time. It deals with high-velocity data and provides low-latency responses. It uses stream processing frameworks such as Apache Kafka or Apache Flink to process data in real time as it arrives. The speed layer generates real-time views, which are continuously updated and provide up-to-date insights. Serving Layer: The serving layer is responsible for serving the results generated by the batch and speed layers to the end users. It combines batch views and real-time views to provide a comprehensive view of the data. The serving layer uses technologies like Apache Cassandra or Apache HBase to store and serve the computed results in a distributed and fault-tolerant manner. Data Lake: The data lake is a central repository that stores all the raw and processed data. It acts as the source of truth for the Lambda Architecture, providing a scalable and durable storage solution for all the data ingested into the system. Popular data lake technologies include Apache Hadoop, Amazon S3, and Google Cloud Storage. Benefits of Lambda Architecture The Lambda Architecture offers several benefits that make it a popular choice for data engineering: Scalability: The Lambda Architecture is highly scalable, as it can process large volumes of data in a distributed and parallel manner. This makes it suitable for handling big data workloads and allows for horizontal scaling as data volumes grow. Fault-tolerance: The Lambda Architecture is designed to be fault-tolerant, as it replicates data across multiple nodes and uses distributed file systems and databases. This ensures high availability and data durability, even in the presence of hardware failures or other issues. Real-time processing: The Lambda Architecture allows for the processing of real-time data streams, providing low-latency responses and enabling near-real-time analytics. This is crucial for use cases that require real-time insights and actions, such as fraud detection, anomaly detection, recommendation systems, and IoT applications. Flexibility: The Lambda Architecture provides flexibility in data processing, as it allows for both batch processing and real-time/stream processing. This enables organizations to handle a wide variety of data types, including structured and unstructured data, and process them in a way that best suits their needs. Data integrity: The Lambda Architecture ensures data integrity by maintaining immutable batch views and continuously updated real-time views. This makes it easier to trace and audit changes in the data over time, ensuring data consistency and reliability. Extensibility: The Lambda Architecture is highly extensible, as it allows for incorporating new data sources, processing frameworks, or analytics algorithms as needed. This makes it adaptable to changing business requirements and evolving data landscapes. Challenges of Lambda Architecture While Lambda Architecture offers many benefits, it also comes with some challenges: Complexity: The Lambda Architecture can be complex to implement and manage, as it requires a combination of batch processing, real-time/stream processing, and serving layer technologies. This may require specialized skills and expertise in different technologies, making it challenging to set up and maintain. Data consistency: Maintaining consistency between batch views and real-time views can be challenging, as batch processing and real-time/stream processing may produce different results due to differences in processing times and windowing techniques. Ensuring data consistency across both layers requires careful attention to data synchronization and versioning. System complexity: The Lambda Architecture introduces additional complexity in managing and monitoring multiple layers, such as the batch layer, speed layer, serving layer, and data lake. This may require sophisticated monitoring, logging, and alerting mechanisms to ensure smooth operations and timely issue detection. Operational overhead: Managing a distributed and fault-tolerant system like the Lambda Architecture may require additional operational overhead, such as setting up and managing clusters, monitoring performance, optimizing resource utilization, and handling failures. This may require additional resources and effort to manage the system effectively. Conclusion The Lambda Architecture is a powerful approach to data engineering that combines batch processing and real-time/stream processing to handle large volumes of data in a distributed and fault-tolerant manner. It offers benefits such as scalability, fault tolerance, real-time processing, flexibility, data integrity, and extensibility. However, it also comes with challenges such as complexity, data consistency, system complexity, and operational overhead. Organizations need to carefully consider their specific requirements, resources, and expertise before implementing the Lambda Architecture. When implemented correctly, the Lambda Architecture can provide a robust and scalable solution for processing big data and generating valuable insights in real-time and batch mode.

By Amlan Patnaik
Integrate Apache Spark and QuestDB for Time-Series Analytics
Integrate Apache Spark and QuestDB for Time-Series Analytics

Spark is an analytics engine for large-scale data engineering. Despite its long history, it still has its well-deserved place in the big data landscape. QuestDB, on the other hand, is a time-series database with a very high data ingestion rate. This means that Spark desperately needs data, a lot of it! ...and QuestDB has it, a match made in heaven. Of course, there are pandas for data analytics! The key here is the expression large-scale. Unlike pandas, Spark is a distributed system and can scale really well. What does this mean exactly? Let's take a look at how data is processed in Spark. For the purposes of this article, we only need to know that a Spark job consists of multiple tasks, and each task works on a single data partition. Tasks are executed parallel in stages, distributed on the cluster. Stages have a dependency on the previous ones; tasks from different stages cannot run in parallel. The schematic diagram below depicts an example job: In this tutorial, we will load data from a QuestDB table into a Spark application and explore the inner working of Spark to refine data loading. Finally, we will modify and save the data back to QuestDB. Loading Data to Spark First thing first, we need to load time-series data from QuestDB. I will use an existing table, trades, with just over 1.3 million rows. It contains bitcoin trades spanning over 3 days: not exactly a big data scenario but good enough to experiment. The table contains the following columns: Column Name Column Type symbol SYMBOL side SYMBOL price DOUBLE amount DOUBLE timestamp TIMESTAMP The table is partitioned by day and the timestamp column serves as the designated timestamp. QuestDB accepts connections via Postgres wire protocol, so we can use JDBC to integrate. You can choose from various languages to create Spark applications, and here we will go for Python. Create the script, sparktest.py: Python from pyspark.sql import SparkSession # create Spark session spark = SparkSession.builder.appName("questdb_test").getOrCreate() # load 'trades' table into the dataframe df = spark.read.format("jdbc") \ .option("url", "jdbc:postgresql://localhost:8812/questdb") \ .option("driver", "org.postgresql.Driver") \ .option("user", "admin").option("password", "quest") \ .option("dbtable", "trades") \ .load() # print the number of rows print(df.count()) # do some filtering and print the first 3 rows of the data df = df.filter(df.symbol == 'BTC-USD').filter(df.side == 'buy') df.show(3, truncate=False) Believe it or not, this tiny application already reads data from the database when submitted as a Spark job. Shell spark-submit --jars postgresql-42.5.1.jar sparktest.py The job prints the following row count: Shell 1322570 And these are the first 3 rows of the filtered table: Shell +-------+----+--------+---------+--------------------------+ |symbol |side|price |amount |timestamp | +-------+----+--------+---------+--------------------------+ |BTC-USD|buy |23128.72|6.4065E-4|2023-02-01 00:00:00.141334| |BTC-USD|buy |23128.33|3.7407E-4|2023-02-01 00:00:01.169344| |BTC-USD|buy |23128.33|8.1796E-4|2023-02-01 00:00:01.184992| +-------+----+--------+---------+--------------------------+ only showing top 3 rows Although sparktest.py speaks for itself, it is still worth mentioning that this application has a dependency on the JDBC driver located in postgresql-42.5.1.jar. It cannot run without this dependency; hence it has to be submitted to Spark together with the application. Optimizing Data Loading With Spark We have loaded data into Spark. Now we will look at how this was completed and some aspects to consider. The easiest way to peek under the hood is to check QuestDB's log, which should tell us how Spark interacted with the database. We will also make use of the Spark UI, which displays useful insights of the execution, including stages and tasks. Spark Connection to QuestDB: Spark Is Lazy QuestDB log shows that Spark connected three times to the database. For simplicity, I only show the relevant lines in the log: Shell 2023-03-21T21:12:24.031443Z I pg-server connected [ip=127.0.0.1, fd=129] 2023-03-21T21:12:24.060520Z I i.q.c.p.PGConnectionContext parse [fd=129, q=SELECT * FROM trades WHERE 1=0] 2023-03-21T21:12:24.072262Z I pg-server disconnected [ip=127.0.0.1, fd=129, src=queue] Spark first queried the database when we created the DataFrame, but as it turns out, it was not too interested in the data itself. The query looked like this: SELECT * FROM trades WHERE 1=0 The only thing Spark wanted to know was the schema of the table in order to create an empty DataFrame. Spark evaluates expressions lazily and only does the bare minimum required at each step. After all, it is meant to analyze big data, so resources are incredibly precious for Spark. Especially memory: data is not cached by default. The second connection happened when Spark counted the rows of the DataFrame. It did not query the data this time, either. Interestingly, instead of pushing the aggregation down to the database by running SELECT count(*) FROM trades, it just queried a 1 for each record: SELECT 1 FROM trades. Spark adds the 1s together to get the actual count. Shell 2023-03-21T21:12:25.692098Z I pg-server connected [ip=127.0.0.1, fd=129] 2023-03-21T21:12:25.693863Z I i.q.c.p.PGConnectionContext parse [fd=129, q=SELECT 1 FROM trades ] 2023-03-21T21:12:25.695284Z I i.q.c.TableReader open partition /Users/imre/Work/dbroot/db/trades~10/2023-02-01.2 [rowCount=487309, partitionNameTxn=2, transientRowCount=377783, partitionIndex=0, partitionCount=3] 2023-03-21T21:12:25.749986Z I i.q.c.TableReader open partition /Users/imre/Work/dbroot/db/trades~10/2023-02-02.1 [rowCount=457478, partitionNameTxn=1, transientRowCount=377783, partitionIndex=1, partitionCount=3] 2023-03-21T21:12:25.800765Z I i.q.c.TableReader open partition /Users/imre/Work/dbroot/db/trades~10/2023-02-03.0 [rowCount=377783, partitionNameTxn=0, transientRowCount=377783, partitionIndex=2, partitionCount=3] 2023-03-21T21:12:25.881785Z I pg-server disconnected [ip=127.0.0.1, fd=129, src=queue] Working with the data itself eventually forced Spark to get a taste of the table's content too. Filters are pushed down to the database by default. Shell 2023-03-21T21:12:26.132570Z I pg-server connected [ip=127.0.0.1, fd=28] 2023-03-21T21:12:26.134355Z I i.q.c.p.PGConnectionContext parse [fd=28, q=SELECT "symbol","side","price","amount","timestamp" FROM trades WHERE ("symbol" IS NOT NULL) AND ("side" IS NOT NULL) AND ("symbol" = 'BTC-USD') AND ("side" = 'buy') ] 2023-03-21T21:12:26.739124Z I pg-server disconnected [ip=127.0.0.1, fd=28, src=queue] We can see that Spark's interaction with the database is rather sophisticated and optimized to achieve good performance without wasting resources. The Spark DataFrame is the key component that takes care of the optimization, and it deserves some further analysis. What Is a Spark DataFrame? The name DataFrame sounds like a container to hold data, but we have seen earlier that this is not really true. So, what is a Spark DataFrame, then? One way to look at Spark SQL, with the risk of oversimplifying it, is that it is a query engine. df.filter(predicate) is really just another way of saying WHERE predicate. With this in mind, the DataFrame is pretty much a query, or actually more like a query plan. Most databases come with functionality to display query plans, and Spark has it too! Let's check the plan for the above DataFrame we just created: Python df.explain(extended=True) Shell == Parsed Logical Plan == Filter (side#1 = buy) +- Filter (symbol#0 = BTC-USD) +- Relation [symbol#0,side#1,price#2,amount#3,timestamp#4] JDBCRelation(trades) [numPartitions=1] == Analyzed Logical Plan == symbol: string, side: string, price: double, amount: double, timestamp: timestamp Filter (side#1 = buy) +- Filter (symbol#0 = BTC-USD) +- Relation [symbol#0,side#1,price#2,amount#3,timestamp#4] JDBCRelation(trades) [numPartitions=1] == Optimized Logical Plan == Filter ((isnotnull(symbol#0) AND isnotnull(side#1)) AND ((symbol#0 = BTC-USD) AND (side#1 = buy))) +- Relation [symbol#0,side#1,price#2,amount#3,timestamp#4] JDBCRelation(trades) [numPartitions=1] == Physical Plan == *(1) Scan JDBCRelation(trades) [numPartitions=1] [symbol#0,side#1,price#2,amount#3,timestamp#4] PushedFilters: [*IsNotNull(symbol), *IsNotNull(side), *EqualTo(symbol,BTC-USD), *EqualTo(side,buy)], ReadSchema: struct<symbol:string,side:string,price:double,amount:double,timestamp:timestamp> If the DataFrame knows how to reproduce the data by remembering the execution plan, it does not need to store the actual data. This is precisely what we have seen earlier. Spark desperately tried not to load our data, but this can have disadvantages too. Caching Data Not caching the data radically reduces Spark's memory footprint, but there is a bit of jugglery here. Data does not have to be cached because the plan printed above can be executed again and again and again... Now imagine how a mere decently-sized Spark cluster could make our lonely QuestDB instance suffer martyrdom. With a massive table containing many partitions, Spark would generate a large number of tasks to be executed parallel across different nodes of the cluster. These tasks would query the table almost simultaneously, putting a heavy load on the database. So, if you find your colleagues cooking breakfast on your database servers, consider forcing Spark to cache some data to reduce the number of trips to the database. This can be done by calling df.cache(). In a large application, it might require a bit of thinking about what is worth caching and how to ensure that Spark executors have enough memory to store the data. In practice, you should consider caching smallish datasets used frequently throughout the application's life. Let's rerun our code with a tiny modification, adding .cache(): Python from pyspark.sql import SparkSession # create Spark session spark = SparkSession.builder.appName("questdb_test").getOrCreate() # load 'trades' table into the dataframe df = spark.read.format("jdbc") \ .option("url", "jdbc:postgresql://localhost:8812/questdb") \ .option("driver", "org.postgresql.Driver") \ .option("user", "admin").option("password", "quest") \ .option("dbtable", "trades") \ .load() \ .cache() # print the number of rows print(df.count()) # print the first 3 rows of the data df.show(3, truncate=False) This time Spark hit the database only twice. First, it came for the schema, the second time for the data: SELECT "symbol","side","price","amount","timestamp" FROM trades. Shell 2023-03-21T21:13:04.122390Z I pg-server connected [ip=127.0.0.1, fd=129] 2023-03-21T21:13:04.147353Z I i.q.c.p.PGConnectionContext parse [fd=129, q=SELECT * FROM trades WHERE 1=0] 2023-03-21T21:13:04.159470Z I pg-server disconnected [ip=127.0.0.1, fd=129, src=queue] 2023-03-21T21:13:05.873960Z I pg-server connected [ip=127.0.0.1, fd=129] 2023-03-21T21:13:05.875951Z I i.q.c.p.PGConnectionContext parse [fd=129, q=SELECT "symbol","side","price","amount","timestamp" FROM trades ] 2023-03-21T21:13:05.876615Z I i.q.c.TableReader open partition /Users/imre/Work/dbroot/db/trades~10/2023-02-01.2 [rowCount=487309, partitionNameTxn=2, transientRowCount=377783, partitionIndex=0, partitionCount=3] 2023-03-21T21:13:06.259472Z I i.q.c.TableReader open partition /Users/imre/Work/dbroot/db/trades~10/2023-02-02.1 [rowCount=457478, partitionNameTxn=1, transientRowCount=377783, partitionIndex=1, partitionCount=3] 2023-03-21T21:13:06.653769Z I i.q.c.TableReader open partition /Users/imre/Work/dbroot/db/trades~10/2023-02-03.0 [rowCount=377783, partitionNameTxn=0, transientRowCount=377783, partitionIndex=2, partitionCount=3] 2023-03-21T21:13:08.479209Z I pg-server disconnected [ip=127.0.0.1, fd=129, src=queue] Clearly, even a few carefully placed .cache() calls can improve the overall performance of an application, sometimes significantly. What else should we take into consideration when thinking about performance? Earlier, we mentioned that our Spark application consists of tasks, which are working on the different partitions of the data parallel. So, partitioned data mean parallelism, which results in better performance. Spark Data Partitioning Now we turn to the Spark UI. It tells us that the job was done in a single task: The truth is that we have already suspected this. The execution plan told us (numPartitions=1) and we did not see any parallelism in the QuestDB logs either. We can display more details about this partition with a bit of additional code: Python from pyspark.sql.functions import spark_partition_id, min, max, count df = df.withColumn("partitionId", spark_partition_id()) df.groupBy(df.partitionId) \ .agg(min(df.timestamp), max(df.timestamp), count(df.partitionId)) \ .show(truncate=False) Shell +-----------+--------------------------+--------------------------+------------------+ |partitionId|min(timestamp) |max(timestamp) |count(partitionId)| +-----------+--------------------------+--------------------------+------------------+ |0 |2023-02-01 00:00:00.078073|2023-02-03 23:59:59.801778|1322570 | +-----------+--------------------------+--------------------------+------------------+ The UI helps us confirm that the data is loaded as a single partition. QuestDB stores this data in 3 partitions. We should try to fix this. Although it is not recommended, we can try to use DataFrame.repartition(). This call reshuffles data across the cluster while partitioning the data, so it should be our last resort. After running df.repartition(3, df.timestamp), we see 3 partitions, but not exactly the way we expected. The partitions overlap with one another: Shell +-----------+--------------------------+--------------------------+------------------+ |partitionId|min(timestamp) |max(timestamp) |count(partitionId)| +-----------+--------------------------+--------------------------+------------------+ |0 |2023-02-01 00:00:00.698152|2023-02-03 23:59:59.801778|438550 | |1 |2023-02-01 00:00:00.078073|2023-02-03 23:59:57.188894|440362 | |2 |2023-02-01 00:00:00.828943|2023-02-03 23:59:59.309075|443658 | +-----------+--------------------------+--------------------------+------------------+ It seems that DataFrame.repartition() used hashes to distribute the rows across the 3 partitions. This would mean that all 3 tasks would require data from all 3 QuestDB partitions. Let's try this instead: df.repartitionByRange(3, "timestamp"): Shell +-----------+--------------------------+--------------------------+------------------+ |partitionId|min(timestamp) |max(timestamp) |count(partitionId)| +-----------+--------------------------+--------------------------+------------------+ |0 |2023-02-01 00:00:00.078073|2023-02-01 21:22:35.656399|429389 | |1 |2023-02-01 21:22:35.665599|2023-02-02 21:45:02.613339|470372 | |2 |2023-02-02 21:45:02.641778|2023-02-03 23:59:59.801778|422809 | +-----------+--------------------------+--------------------------+------------------+ This looks better but still not ideal. That is because DaraFrame.repartitionByRange() samples the dataset and then estimates the borders of the partitions. What we really want is for the DataFrame partitions to match exactly the partitions we see in QuestDB. This way, the tasks running parallel in Spark do not cross their way in QuestDB, likely to result in better performance. Data source options are to the rescue! Let's try the following: Python from pyspark.sql import SparkSession from pyspark.sql.functions import spark_partition_id, min, max, count # create Spark session spark = SparkSession.builder.appName("questdb_test").getOrCreate() # load 'trades' table into the dataframe df = spark.read.format("jdbc") \ .option("url", "jdbc:postgresql://localhost:8812/questdb") \ .option("driver", "org.postgresql.Driver") \ .option("user", "admin").option("password", "quest") \ .option("dbtable", "trades") \ .option("partitionColumn", "timestamp") \ .option("numPartitions", "3") \ .option("lowerBound", "2023-02-01T00:00:00.000000Z") \ .option("upperBound", "2023-02-04T00:00:00.000000Z") \ .load() df = df.withColumn("partitionId", spark_partition_id()) df.groupBy(df.partitionId) \ .agg(min(df.timestamp), max(df.timestamp), count(df.partitionId)) \ .show(truncate=False) Shell +-----------+--------------------------+--------------------------+------------------+ |partitionId|min(timestamp) |max(timestamp) |count(partitionId)| +-----------+--------------------------+--------------------------+------------------+ |0 |2023-02-01 00:00:00.078073|2023-02-01 23:59:59.664083|487309 | |1 |2023-02-02 00:00:00.188002|2023-02-02 23:59:59.838473|457478 | |2 |2023-02-03 00:00:00.565319|2023-02-03 23:59:59.801778|377783 | +-----------+--------------------------+--------------------------+------------------+ After specifying partitionColumn, numPartitions, lowerBound, and upperBound, the situation is much better: the row counts in the partitions match what we have seen in the QuestDB logs earlier: rowCount=487309, rowCount=457478 and rowCount=377783. Looks like we did it! Shell 2023-03-21T21:13:05.876615Z I i.q.c.TableReader open partition /Users/imre/Work/dbroot/db/trades~10/2023-02-01.2 [rowCount=487309, partitionNameTxn=2, transientRowCount=377783, partitionIndex=0, partitionCount=3] 2023-03-21T21:13:06.259472Z I i.q.c.TableReader open partition /Users/imre/Work/dbroot/db/trades~10/2023-02-02.1 [rowCount=457478, partitionNameTxn=1, transientRowCount=377783, partitionIndex=1, partitionCount=3] 2023-03-21T21:13:06.653769Z I i.q.c.TableReader open partition /Users/imre/Work/dbroot/db/trades~10/2023-02-03.0 [rowCount=377783, partitionNameTxn=0, transientRowCount=377783, partitionIndex=2, partitionCount=3] We can check Spark UI again; it also confirms that the job was completed in 3 separate tasks, each of them working on a single partition. Sometimes it might be tricky to know the minimum and maximum timestamps when creating the DataFrame. In the worst case, you could query the database for those values via an ordinary connection. We have managed to replicate our QuestDB partitions in Spark, but data does not always come from a single table. What if the data required is the result of a query? Can we load that, and is it possible to partition it? Options to Load Data: SQL Query vs. Table We can use the query option to load data from QuestDB with the help of a SQL query: Python # 1-minute aggregated trade data df = spark.read.format("jdbc") \ .option("url", "jdbc:postgresql://localhost:8812/questdb") \ .option("driver", "org.postgresql.Driver") \ .option("user", "admin").option("password", "quest") \ .option("query", "SELECT symbol, sum(amount) as volume, " "min(price) as minimum, max(price) as maximum, " "round((max(price)+min(price))/2, 2) as mid, " "timestamp as ts " "FROM trades WHERE symbol = 'BTC-USD' " "SAMPLE BY 1m ALIGN to CALENDAR") \ .load() Depending on the amount of data and the actual query, you might find that pushing the aggregations to QuestDB is faster than completing it in Spark. Spark definitely has an edge when the dataset is really large. Now let's try partitioning this DataFrame with the options used before with the option dbtable. Unfortunately, we will get an error message: Shell Options 'query' and 'partitionColumn' can not be specified together. However, we can trick Spark by just giving the query an alias name. This means we can go back to using the dbtable option again, which lets us specify partitioning. See the example below: Python from pyspark.sql import SparkSession from pyspark.sql.functions import spark_partition_id, min, max, count # create Spark session spark = SparkSession.builder.appName("questdb_test").getOrCreate() # load 1-minute aggregated trade data into the dataframe df = spark.read.format("jdbc") \ .option("url", "jdbc:postgresql://localhost:8812/questdb") \ .option("driver", "org.postgresql.Driver") \ .option("user", "admin").option("password", "quest") \ .option("dbtable", "(SELECT symbol, sum(amount) as volume, " "min(price) as minimum, max(price) as maximum, " "round((max(price)+min(price))/2, 2) as mid, " "timestamp as ts " "FROM trades WHERE symbol = 'BTC-USD' " "SAMPLE BY 1m ALIGN to CALENDAR) AS fake_table") \ .option("partitionColumn", "ts") \ .option("numPartitions", "3") \ .option("lowerBound", "2023-02-01T00:00:00.000000Z") \ .option("upperBound", "2023-02-04T00:00:00.000000Z") \ .load() df = df.withColumn("partitionId", spark_partition_id()) df.groupBy(df.partitionId) \ .agg(min(df.ts), max(df.ts), count(df.partitionId)) \ .show(truncate=False) Shell +-----------+-------------------+-------------------+------------------+ |partitionId|min(ts) |max(ts) |count(partitionId)| +-----------+-------------------+-------------------+------------------+ |0 |2023-02-01 00:00:00|2023-02-01 23:59:00|1440 | |1 |2023-02-02 00:00:00|2023-02-02 23:59:00|1440 | |2 |2023-02-03 00:00:00|2023-02-03 23:59:00|1440 | +-----------+-------------------+-------------------+------------------+ Looking good. Now it seems that we can load any data from QuestDB into Spark by passing a SQL query to the DataFrame. Do we, really? Our trades table is limited to three data types only. What about all the other types you can find in QuestDB? We expect that Spark will successfully map a double or a timestamp when queried from the database, but what about a geohash? It is not that obvious what is going to happen. As always, when unsure, we should test. Type Mappings I have another table in the database with a different schema. This table has a column for each type currently available in QuestDB. SQL CREATE TABLE all_types ( symbol SYMBOL, string STRING, char CHAR, long LONG, int INT, short SHORT, byte BYTE, double DOUBLE, float FLOAT, bool BOOLEAN, uuid UUID, --long128 LONG128, long256 LONG256, bin BINARY, g5c GEOHASH(5c), date DATE, timestamp TIMESTAMP ) timestamp (timestamp) PARTITION BY DAY; INSERT INTO all_types values('sym1', 'str1', 'a', 456, 345, 234, 123, 888.8, 11.1, true, '9f9b2131-d49f-4d1d-ab81-39815c50d341', --to_long128(10, 5), rnd_long256(), rnd_bin(10,20,2), rnd_geohash(35), to_date('2022-02-01', 'yyyy-MM-dd'), to_timestamp('2022-01-15T00:00:03.234', 'yyyy-MM-ddTHH:mm:ss.SSS')); long128 is not fully supported by QuestDB yet, so it is commented out. Let's try to load and print the data; we can also take a look at the schema of the DataFrame: Python from pyspark.sql import SparkSession # create Spark session spark = SparkSession.builder.appName("questdb_test").getOrCreate() # create dataframe df = spark.read.format("jdbc") \ .option("url", "jdbc:postgresql://localhost:8812/questdb") \ .option("driver", "org.postgresql.Driver") \ .option("user", "admin").option("password", "quest") \ .option("dbtable", "all_types") \ .load() # print the schema print(df.schema) # print the content of the dataframe df.show(truncate=False) Much to my surprise, Spark managed to create the DataFrame and mapped all types. Here is the schema: Shell StructType([ StructField('symbol', StringType(), True), StructField('string', StringType(), True), StructField('char', StringType(), True), StructField('long', LongType(), True), StructField('int', IntegerType(), True), StructField('short', ShortType(), True), StructField('byte', ShortType(), True), StructField('double', DoubleType(), True), StructField('float', FloatType(), True), StructField('bool', BooleanType(), True), StructField('uuid', StringType(), True), # StructField('long128', StringType(), True), StructField('long256', StringType(), True), StructField('bin', BinaryType(), True), StructField('g5c', StringType(), True), StructField('date', TimestampType(), True), StructField('timestamp', TimestampType(), True) ]) It looks pretty good, but you might wonder if it is a good idea to map long256 and geohash types to String. QuestDB does not provide arithmetics for these types, so it is not a big deal. Geohashes are basically 32-base numbers, represented and stored in their string format. The 256-bit long values are also treated as string literals. long256 is used to store cryptocurrency private keys. Now let's see the data: Shell +------+------+----+----+---+-----+----+------+-----+----+------------------------------------+ |symbol|string|char|long|int|short|byte|double|float|bool|uuid | +------+------+----+----+---+-----+----+------+-----+----+------------------------------------+ |sym1 |str1 |a |456 |345|234 |123 |888.8 |11.1 |true|9f9b2131-d49f-4d1d-ab81-39815c50d341| +------+------+----+----+---+-----+----+------+-----+----+------------------------------------+ +------------------------------------------------------------------+----------------------------------------------------+ |long256 |bin | +------------------------------------------------------------------+----------------------------------------------------+ |0x8ee3c2a42acced0bb0bdb411c82bb01e7e487689228c189d1e865fa0e025973c|[F2 4D 4B F6 18 C2 9A A7 87 C2 D3 19 4A 2C 4B 92 C4]| +------------------------------------------------------------------+----------------------------------------------------+ +-----+-------------------+-----------------------+ |g5c |date |timestamp | +-----+-------------------+-----------------------+ |q661k|2022-02-01 00:00:00|2022-01-15 00:00:03.234| +-----+-------------------+-----------------------+ It also looks good, but we could omit the 00:00:00 from the end of the date field. We can see that it is mapped to Timestamp and not Date. We could also try to map one of the numeric fields to Decimal. This can be useful if later we want to do computations that require high precision. We can use the customSchema option to customize the column types. Our modified code: Python from pyspark.sql import SparkSession # create Spark session spark = SparkSession.builder.appName("questdb_test").getOrCreate() # create dataframe df = spark.read.format("jdbc") \ .option("url", "jdbc:postgresql://localhost:8812/questdb") \ .option("driver", "org.postgresql.Driver") \ .option("user", "admin").option("password", "quest") \ .option("dbtable", "all_types") \ .option("customSchema", "date DATE, double DECIMAL(38, 10)") \ .load() # print the schema print(df.schema) # print the content of the dataframe df.show(truncate=False) The new schema: Shell StructType([ StructField('symbol', StringType(), True), StructField('string', StringType(), True), StructField('char', StringType(), True), StructField('long', LongType(), True), StructField('int', IntegerType(), True), StructField('short', ShortType(), True), StructField('byte', ShortType(), True), StructField('double', DecimalType(38,10), True), StructField('float', FloatType(), True), StructField('bool', BooleanType(), True), StructField('uuid', StringType(), True), # StructField('long128', StringType(), True), StructField('long256', StringType(), True), StructField('bin', BinaryType(), True), StructField('g5c', StringType(), True), StructField('date', DateType(), True), StructField('timestamp', TimestampType(), True) ]) And the data is displayed as: Shell +------+------+----+----+---+-----+----+--------------+-----+----+------------------------------------+ |symbol|string|char|long|int|short|byte|double |float|bool|uuid | +------+------+----+----+---+-----+----+--------------+-----+----+------------------------------------+ |sym1 |str1 |a |456 |345|234 |123 |888.8000000000|11.1 |true|9f9b2131-d49f-4d1d-ab81-39815c50d341| +------+------+----+----+---+-----+----+--------------+-----+----+------------------------------------+ +------------------------------------------------------------------+----------------------------------------------------+ |long256 |bin | +------------------------------------------------------------------+----------------------------------------------------+ |0x8ee3c2a42acced0bb0bdb411c82bb01e7e487689228c189d1e865fa0e025973c|[F2 4D 4B F6 18 C2 9A A7 87 C2 D3 19 4A 2C 4B 92 C4]| +------------------------------------------------------------------+----------------------------------------------------+ +-----+----------+-----------------------+ |g5c |date |timestamp | +-----+----------+-----------------------+ |q661k|2022-02-01|2022-01-15 00:00:03.234| +-----+----------+-----------------------+ It seems that Spark can handle almost all database types. The only issue is long128, but this type is a work in progress currently in QuestDB. When completed, it will be mapped as String, just like long256. Writing Data Back Into the Database There is only one thing left: writing data back into QuestDB. In this example, first, we will load some data from the database and add two new features: 10-minute moving average standard deviation, also calculated over the last 10-minute window Then we will try to save the modified DataFrame back into QuestDB as a new table. We need to take care of some type mappings as Double columns are sent as FLOAT8 to QuestDB by default, so we end up with this code: Python from pyspark.sql import SparkSession from pyspark.sql.window import Window from pyspark.sql.functions import avg, stddev, when # create Spark session spark = SparkSession.builder.appName("questdb_test").getOrCreate() # load 1-minute aggregated trade data into the dataframe df = spark.read.format("jdbc") \ .option("url", "jdbc:postgresql://localhost:8812/questdb") \ .option("driver", "org.postgresql.Driver") \ .option("user", "admin").option("password", "quest") \ .option("dbtable", "(SELECT symbol, sum(amount) as volume, " "round((max(price)+min(price))/2, 2) as mid, " "timestamp as ts " "FROM trades WHERE symbol = 'BTC-USD' " "SAMPLE BY 1m ALIGN to CALENDAR) AS fake_table") \ .option("partitionColumn", "ts") \ .option("numPartitions", "3") \ .option("lowerBound", "2023-02-01T00:00:00.000000Z") \ .option("upperBound", "2023-02-04T00:00:00.000000Z") \ .load() # add new features window_10 = Window.partitionBy(df.symbol).rowsBetween(-10, Window.currentRow) df = df.withColumn("ma10", avg(df.mid).over(window_10)) df = df.withColumn("std", stddev(df.mid).over(window_10)) df = df.withColumn("std", when(df.std.isNull(), 0.0).otherwise(df.std)) # save the data as 'trades_enriched' df.write.format("jdbc") \ .option("url", "jdbc:postgresql://localhost:8812/questdb") \ .option("driver", "org.postgresql.Driver") \ .option("user", "admin").option("password", "quest") \ .option("dbtable", "trades_enriched") \ .option("createTableColumnTypes", "volume DOUBLE, mid DOUBLE, ma10 DOUBLE, std DOUBLE") \ .save() All works but…we soon realize that our new table, trades_enriched is not partitioned and does not have a designated timestamp, which is not ideal. Obviously Spark has no idea of QuestDB specifics. It would work better if we created the table upfront and Spark only saved the data into it. We drop the table and re-create it; this time, it is partitioned and has a designated timestamp: SQL DROP TABLE trades_enriched; CREATE TABLE trades_enriched ( volume DOUBLE, mid DOUBLE, ts TIMESTAMP, ma10 DOUBLE, std DOUBLE ) timestamp (ts) PARTITION BY DAY; The table is empty and waiting for the data. We rerun the code; all works, no complaints. The data is in the table, and it is partitioned. One aspect of writing data into the database is if we are allowed to create duplicates. What if I try to rerun the code again without dropping the table? Will Spark let me save the data this time? No, we get an error: Shell pyspark.sql.utils.AnalysisException: Table or view 'trades_enriched' already exists. SaveMode: ErrorIfExists. The last part of the error message looks interesting: SaveMode: ErrorIfExists. What is SaveMode? It turns out we can configure what should happen if the table already exists. Our options are: errorifexists: the default behavior is to return an error if the table already exists, Spark is playing safe here append : data will be appended to the existing rows already present in the table overwrite: the content of the table will be replaced entirely by the newly saved data ignore: if the table is not empty, our save operation gets ignored without any error We have already seen how errorifexists behaves, append and ignore seem to be simple enough just to work. However, overwrite is not straightforward. The content of the table must be cleared before the new data can be saved. Spark will delete and re-create the table by default, which means losing partitioning and the designated timestamp. In general, we do not want Spark to create tables for us. Luckily, with the truncate option we can tell Spark to use TRUNCATE to clear the table instead of deleting it: Python # save the data as 'trades_enriched', overwrite if already exists df.write.format("jdbc") \ .option("url", "jdbc:postgresql://localhost:8812/questdb") \ .option("driver", "org.postgresql.Driver") \ .option("user", "admin").option("password", "quest") \ .option("dbtable", "trades_enriched") \ .option("truncate", True) \ .option("createTableColumnTypes", "volume DOUBLE, mid DOUBLE, ma10 DOUBLE, std DOUBLE") \ .save(mode="overwrite") The above works as expected. Conclusion Our ride might seem bumpy, but we finally have everything working. Our new motto should be "There is a config option for everything!". To summarize what we have found: We can use Spark's JDBC data source to integrate with QuestDB. It is recommended to use the dbtable option, even if we use a SQL query to load data. Always try to specify partitioning options (partitionColumn, numPartitions, lowerBound and upperBound) when loading data, partitions ideally should match with the QuestDB partitions. Sometimes it makes sense to cache some data in Spark to reduce the number of trips to the database. It can be beneficial to push work down into the database, depending on the task and how much data is involved. It makes sense to make use of QuestDB's time-series-specific features, such as SAMPLE BY, instead of trying to rewrite it in Spark. Type mappings can be customized via the customSchema option when loading data. When writing data into QuestDB always specify the desired saving mode. Generally works better if you create the table upfront and do not let Spark create it, because this way you can add partitioning and designated timestamp. If selected the overwrite saving mode, you should enable the truncate option too to make sure Spark does not delete the table; hence partitioning and the designated timestamp will not get lost. Type mappings can be customized via the createTableColumnTypes option when saving data. I mentioned only the config options which are most likely to be tweaked when working with QuestDB; the complete set of options can be found here: Spark data source options. What Could the Future Bring? Overall everything works, but it would be nice to have a much more seamless way of integration, where partitioning would be taken care of automagically. Some type mappings could use better defaults, too, when saving data into QuestDB. The overwrite saving mode could default to use TRUNCATE. More seamless integration is not impossible to achieve. If QuestDB provided its own JDBCDialect implementation for Spark, the above nuances could be handled. We should probably consider adding this. Finally, there is one more thing we did not mention yet, data locality. That is because, currently QuestDB cannot run as a cluster. However, we are actively working on a solution — check out The Inner Workings of Distributed Databases for more information. When the time comes, we should ensure that data locality is also considered. Ideally, each Spark node would work on tasks that require partitions loaded from the local (or closest) QuestDB instance. However, this is not something we should be concerned about at this moment... for now, just enjoy data crunching!

By Imre Aranyosi
ActiveMQ JMS (Java Messaging Service) vs. Data Streaming Kafka With Camel Code Sample
ActiveMQ JMS (Java Messaging Service) vs. Data Streaming Kafka With Camel Code Sample

ActiveMQ and Kafka are both messaging systems used for real-time data processing and streaming. Both of these systems are open-source and offer different features that cater to specific use cases. While ActiveMQ JMS and Kafka are both used for message queuing and real-time data processing, there are significant differences between them. ActiveMQ JMS is a traditional message broker that supports multiple messaging protocols such as JMS, AMQP, and MQTT. It is designed to provide reliable message delivery and offers features such as message persistence, clustering, and transaction support. ActiveMQ JMS is commonly used in enterprise systems for mission-critical applications where reliability is of utmost importance. Sample Example for consuming message from rest API and Writing to AMQ queues: Java import org.apache.camel.CamelContext; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.impl.DefaultCamelContext; public class RestApiToActiveMq { public static void main(String[] args) throws Exception { CamelContext context = new DefaultCamelContext(); // define a route to consume messages from REST API and write them to ActiveMQ RouteBuilder builder = new RouteBuilder() { public void configure() { from("rest:get:/api/messages") .to("activemq:queue:myQueue"); } }; // add the route to the Camel context context.addRoutes(builder); // start the Camel context context.start(); // keep the program running to continue consuming messages Thread.sleep(Long.MAX_VALUE); // stop the Camel context context.stop(); } } Sample Example for consuming message AMQ queues and writing to snowflakes Data warehouse: Java import org.apache.camel.CamelContext; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.impl.DefaultCamelContext; public class AmqToSnowflake { public static void main(String[] args) throws Exception { CamelContext context = new DefaultCamelContext(); // define a route to consume messages from AMQ and write them to Snowflake RouteBuilder builder = new RouteBuilder() { public void configure() { from("activemq:queue:myQueue") .to("snowflake-jdbc:myDatabase?query=INSERT INTO myTable (message) VALUES (:?message)"); } }; // add the route to the Camel context context.addRoutes(builder); // start the Camel context context.start(); // keep the program running to continue consuming messages Thread.sleep(Long.MAX_VALUE); // stop the Camel context context.stop(); } } On the other hand, Kafka is a distributed streaming platform designed for handling large-scale data streaming. It is optimized for horizontal scalability, fault tolerance, and high throughput, making it an excellent choice for big data applications. In addition, Kafka offers features such as real-time data streaming, high-performance messaging, and distributed data storage. Sample Example for consuming message from rest API and Writing to Kafak Topic: Java import org.apache.camel.CamelContext; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.impl.DefaultCamelContext; public class RestApiToKafka { public static void main(String[] args) throws Exception { CamelContext context = new DefaultCamelContext(); // define a route to consume messages from REST API and write them to Kafka RouteBuilder builder = new RouteBuilder() { public void configure() { from("rest:get:/api/messages") .to("kafka:myTopic?brokers=localhost:9092"); } }; // add the route to the Camel context context.addRoutes(builder); // start the Camel context context.start(); // keep the program running to continue consuming messages Thread.sleep(Long.MAX_VALUE); // stop the Camel context context.stop(); } } Sample Example for consuming message Kafka Topic and writing to snowflakes Data warehouse: Java import org.apache.camel.CamelContext; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.impl.DefaultCamelContext; public class KafkaToSnowflake { public static void main(String[] args) throws Exception { CamelContext context = new DefaultCamelContext(); // define a route to consume messages from Kafka and write them to Snowflake RouteBuilder builder = new RouteBuilder() { public void configure() { from("kafka:myTopic?brokers=localhost:9092") .to("snowflake-jdbc:myDatabase?query=INSERT INTO myTable (message) VALUES (:?message)"); } }; // add the route to the Camel context context.addRoutes(builder); // start the Camel context context.start(); // keep the program running to continue consuming messages Thread.sleep(Long.MAX_VALUE); // stop the Camel context context.stop(); } } One of the key differences between ActiveMQ JMS and Kafka is their architecture. ActiveMQ JMS is a traditional messaging system that is based on a hub-and-spoke model, where the message broker acts as a centralized hub for all message exchanges. On the other hand, Kafka is designed as a distributed system that uses a publish-subscribe model, where messages are published to a topic, and subscribers consume the messages from that topic. Kafka's distributed architecture provides fault tolerance and high availability, making it an ideal choice for mission-critical applications. Another difference between ActiveMQ JMS and Kafka is their performance. ActiveMQ JMS is designed to provide reliable message delivery with features such as message persistence and transaction support. While this provides a high level of reliability, it can also impact performance. In contrast, Kafka's architecture is designed for high throughput and low latency, making it an excellent choice for real-time data processing and analysis. In terms of use cases, ActiveMQ JMS is an excellent choice for traditional messaging applications where reliability and message ordering are more important than speed. It is commonly used in enterprise systems for mission-critical applications where reliability is of utmost importance. On the other hand, Kafka is an excellent choice for real-time data processing and analysis. It is commonly used for big data applications where high throughput and low latency are critical. In conclusion, both ActiveMQ JMS and Kafka are excellent messaging systems that offer different features for different use cases. ActiveMQ JMS is an excellent choice for traditional messaging applications where reliability is of utmost importance, while Kafka is an excellent choice for real-time data processing and analysis. Therefore, it is important to consider the specific requirements of your application when choosing between ActiveMQ JMS and Kafka.

By Kiran Peddireddy
Data Stream Using Apache Kafka and Camel Application
Data Stream Using Apache Kafka and Camel Application

Apache Kafka is an event streaming platform that was developed by LinkedIn and later made open-source under the Apache Software Foundation. Its primary function is to handle high-volume real-time data streams and provide a scalable and fault-tolerant architecture for creating data pipelines, streaming applications, and microservices. Kafka employs a publish-subscribe messaging model, in which data is sorted into topics, and publishers send messages to those topics. Subscribers can then receive those messages in real time. The platform offers a scalable and fault-tolerant architecture by spreading data across multiple nodes and replicating data across multiple brokers. This guarantees that data is consistently available, even if a node fails. Kafka's architecture is based on several essential components, including brokers, producers, consumers, and topics. Brokers manage the message queues and handle message persistence, while producers and consumers are responsible for publishing and subscribing to Kafka topics, respectively. Topics function as the communication channels through which messages are sent and received. Kafka also provides an extensive range of APIs and tools to manage data streams and build real-time applications. Kafka Connect, one of its most popular tools and APIs, enables the creation of data pipelines that integrate with other systems. Kafka Streams, on the other hand, allows developers to build streaming applications using a high-level API. In summary, Kafka is a robust and adaptable platform that can be used to construct real-time data pipelines and streaming applications. It has been widely adopted in various sectors, including finance, healthcare, e-commerce, and more. To create a Kafka data stream using Camel, you can use the Camel-Kafka component, which is already included in Apache Camel. Below are the steps to follow for creating a Kafka data stream using Camel: Prepare a Kafka broker and create a topic for the data stream. Set up a new Camel project on your IDE and include the required Camel dependencies, including the Camel-Kafka component. Create a new Camel route within your project that defines the data stream. The route should use the Kafka component and specify the topic to which the data should be sent or received. Select the appropriate data format for the data stream. For instance, if you want to send JSON data, use the Jackson data format to serialize and deserialize the data. Launch the Camel context and the Kafka producer or consumer to start sending or receiving data. Overall, using the Camel-Kafka component with Apache Camel is a simple way to create data streams between applications and a Kafka cluster. Here is the code for reading Table form DB and writing to Kafka cluster: Apache Camel Producer Application: Java import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.kafka.KafkaConstants; import org.springframework.stereotype.Component; @Component public class OracleDBToKafkaRouteBuilder extends RouteBuilder { @Override public void configure() throws Exception { // Configure Oracle DB endpoint String oracleDBEndpoint = "jdbc:oracle:thin:@localhost:1521:orcl"; String oracleDBUser = "username"; String oracleDBPassword = "password"; String oracleDBTable = "mytable"; String selectQuery = "SELECT * FROM " + oracleDBTable; // Configure Kafka endpoint String kafkaEndpoint = "kafka:my-topic?brokers=localhost:9092"; String kafkaSerializer = "org.apache.kafka.common.serialization.StringSerializer"; from("timer:oracleDBPoller?period=5000") // Read from Oracle DB .to("jdbc:" + oracleDBEndpoint + "?user=" + oracleDBUser + "&password=" + oracleDBPassword) .setBody(simple(selectQuery)) .split(body()) // Serialize to Kafka .setHeader(KafkaConstants.KEY, simple("${body.id}")) .marshal().string(kafkaSerializer) .to(kafkaEndpoint); } } Here is the code for reading Kafka Topic and writing the Oracle DB table: Apache Camel Camel Application; Java import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.kafka.KafkaConstants; import org.springframework.stereotype.Component; @Component public class KafkaToOracleDBRouteBuilder extends RouteBuilder { @Override public void configure() throws Exception { // Configure Kafka endpoint String kafkaEndpoint = "kafka:my-topic?brokers=localhost:9092"; String kafkaDeserializer = "org.apache.kafka.common.serialization.StringDeserializer"; // Configure Oracle DB endpoint String oracleDBEndpoint = "jdbc:oracle:thin:@localhost:1521:orcl"; String oracleDBUser = "username"; String oracleDBPassword = "password"; String oracleDBTable = "mytable"; from(kafkaEndpoint) // Deserialize from Kafka .unmarshal().string(kafkaDeserializer) .split(body().tokenize("\n")) // Write to Oracle DB .to("jdbc:" + oracleDBEndpoint + "?user=" + oracleDBUser + "&password=" + oracleDBPassword) .setBody(simple("INSERT INTO " + oracleDBTable + " VALUES(${body})")) .to("jdbc:" + oracleDBEndpoint + "?user=" + oracleDBUser + "&password=" + oracleDBPassword); } }

By Kiran Peddireddy
What Is Advertised Kafka Address?
What Is Advertised Kafka Address?

Let’s start with the basics. After successfully starting a Redpanda or Apache Kafka® cluster, you want to stream data into it right away. No matter what tool and language you chose, you will immediately be asked for a list of bootstrap servers for your client to connect to it. This bootstrap server is just for your client to initiate the connection to one of the brokers in the cluster, and then it will provide your client with initial sets of metadata. The metadata tells the client the currently available brokers, and which the leader of each partition is hosted by which brokers so that the client can initiate a direct connection to all brokers individually. The diagram below will give you a better idea. The client figures out where to stream the data based on the info given. Depending on the number of partitions and where they are hosted, your client will push or pull from the partitions to/from its host brokers. Both the Kafka address and the advertised Kafka address are needed. Kafka Address is used for Kafka brokers to locate each other, and the advertised address for the client to find them. In this post, we’ll help you understand the advertised Kafka address, how to use it in Docker and Kubernetes (K8s), and how to debug it. When To Use Kafka Address and Advertised Kafka Address When starting up your Redpanda cluster, Kafka Address is used to bind the Redpanda service to its host and use the established endpoint to start accepting requests. {LISTENER_NAME}://{HOST_NAME}:{PORT} The broker uses the advertised Kafka address in the metadata, so your client will take the address to locate other brokers. To set it, use --kafka-addr and --advertise-kafka-addr with RPK or kafka_api or advertised_kafka_api inside /etc/redpanda/redpanda.yaml for each broker. Until this point, everything seems straightforward. And, you might start to wonder whether the Kafka address and advertised Kafka address are actually redundant. It starts to get tricky when your client has no visibility into the cluster host, and if you pass the same internal address to the client, it won’t be able to resolve it. So, we need to modify the advertised Kafka address to let the Kafka client understand and be reachable from outside (i.e., external IP). How To Use Kafka Address and Advertised Kafka Address in Docker (Container) Another problem that often comes up is while running Redpanda brokers in Docker (container). The same applies to other more complex network topologies. But fear not, you already know the mechanics. All you need to do is put the right address for clients that reside in different places. When running the Docker container, it creates its own network by default, but in the case where you need to have multiple of them communicating, you will need to set up a network (sometimes even multiple layers of network) by bridging them together. We know that the Kafka address is used for binding to the host, we’ll just use 0.0.0.0 as it will bind to all interfaces in the host and any port of your wish (do not use an already occupied port). An example could be 0.0.0.0:9092 and 0.0.0.0:9095 for each broker running in the Docker container, you will register a name in the network, if your client is trying to access the broker within the network, all you need to do is set the advertised Kafka address to it’s registered hostname in the network. For example, if your first Redpanda container registered its name as Building-A, you can set the advertised Kafka address to Building-A:9092. For clients outside of the Docker network, where they don’t have access to the network’s routing table, the advertised Kafka address will need to be set to the host where the Docker containers are running on, so the client can find it. And don’t forget that you also need to expose the port and associate that with the host. But, what happens if you have clients who both want to access the cluster at the same time? Simple, add multiple listeners! Each listener will return a set of advertised Kafka addresses for clients in a different environment. Here’s a diagram for you. Using Kafka Address and Advertised Kafka Address in K8s Since Kubernetes is a platform that orchestrates containers, the concept is very similar to running Docker containers, but on a larger scale. In a typical Redpanda cluster, you will want to install a single Redpanda broker in an individual worker node. All the pods running in the K8s would get assigned an internal address, that is only visible inside the Kubernetes environment, if the client is running outside of Kubernetes, it will need a way to find the brokers. So you can use NodePort to expose the port and use the public IP address of the hosting worker node. For the Kafka address, as usual, just bind it to the local container. For example, 0.0.0.0:9092 and 0.0.0.0:9095. As for the advertised Kafka address, we will need to set two listeners: one for internal connection, and one for external. For internal clients, we can simply use the generated internal service name, for example, if your service name is set to Building-A, the advertised Kafka address will be internal://Building-A:9092.For the external listener use the hosting worker node’s public IP (or domain name ) with the port exposed in NodePort, where you will be assigned a new port. For example, if your first work node has public IP (Domain) as XXX-Blvd, and the new port assigned is 39092, you can set the advertised Kafka address to external://XXX-Blvd:39092 . How to Debug the Advertised Kafka Address When you are able to connect to your cluster with Redpanda Keep (rpk) and your client throws errors like “ENOTFOUND”. Check if the advertised_kafka_api is correctly set, with an address that can be resolved by your client. Shell > curl localhost:9644/v1/node_config {"advertised_kafka_api":[{"name":"internal","address":"0.0.0.0","port":9092},{"name":"external","address":"192.186.0.3","port":19092}]....} If you are running Docker, find out which port 9644 was exposed from Docker. Shell > docker port redpanda-0 8081/tcp -> 0.0.0.0:18081 9644/tcp -> 0.0.0.0:19644 18082/tcp -> 0.0.0.0:18082 19092/tcp -> 0.0.0.0:19092 And cURL. Shell > curl localhost:19644/v1/node_config {"advertised_kafka_api":[{"name":"internal","address":"redpanda-0","port":9092},{"name":"external","address":"localhost","port":19092}]....} If you are running Kubernetes, find out what is the exposed admin port. Shell > kubectl get svc redpanda-external -n redpanda NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) redpanda-external NodePort 10.100.87.34 <none> 9644:31644/TCP,9094:31092/TCP,8083:30082/TCP,8084:30081/TCP 3h53m Shell > kubectl get pod -o=custom-columns=NAME:.metadata.name,STATUS:.status.phase,NODE:.spec.nodeName -n redpanda NAME NODE redpanda-0 ip-1-168-57-208.xx.compute.internal redpanda-1 ip-1-168-1-231.xx.compute.internal redpanda-2 ip-1-168-83-90.xx.compute.internal Shell >kubectl get nodes -o=custom-columns='NAME:.metadata.name,IP:.status.addresses[?(@.type=="ExternalIP")].address' NAME IP ip-1.us-east-2.compute.internal 3.12.84.230 ip-1.us-east-2.compute.internal 3.144.255.61 ip-1.us-east-2.compute.internal 3.144.144.138 And cURL. Shell > curl 3.12.84.230:31644/v1/node_config {"advertised_kafka_api":[{"name":"internal","address":"redpanda-1.redpanda.redpanda.svc.cluster.local.","port":9093},{"name":"default","address":"3.12.84.230","port":31092}].....} Lastly, check if you are connecting to the correct listener(Port). And you’re all done! Conclusion If you made it this far, you should now have a better understanding of what the advertised Kafka address is and how you can use it in Docker and K8s. To learn more about Redpanda, check out our documentation and browse the Redpanda blog for tutorials and guides on how to easily integrate with Redpanda. For a more hands-on approach, take Redpanda's free Community edition for a test drive!

By Christina Lin CORE
Top 5 Data Streaming Trends for 2023
Top 5 Data Streaming Trends for 2023

Data streaming is one of the most relevant buzzwords in tech to build scalable real-time applications in the cloud and innovative business models. Do you wonder about my predicted TOP 5 data streaming trends in 2023 to set data in motion? Check out the following presentation and learn what role Apache Kafka plays. Learn about decentralized data mesh, cloud-native lakehouse, data sharing, improved user experience, and advanced data governance. Some followers might notice that this became a series with past posts about the top 5 data streaming trends for 2021 and the top 5 for 2022. Data streaming with Apache Kafka is a journey and evolution to set data in motion. Trends change over time, but the core value of a scalable real-time infrastructure as the central data hub stays. Gartner Top Strategic Technology Trends for 2023 The research and consulting company Gartner defines the top strategic technology trends every year. This time, the trends are more focused on particular niche concepts. On a higher level, it is all about optimizing, scaling, and pioneering. Here is what Gartner expects for 2023: Source Gartner It is funny (but not surprising): Gartner’s predictions overlap and complement the five trends I focus on for data streaming with Apache Kafka looking forward to 2023. I explore how data streaming enables better time to market with decentralized optimized architectures, cloud-native infrastructure for elastic scale, and pioneering innovative use cases to build valuable data products. Hence, here you go with the top 5 trends in data streaming for 2023. The Top 5 Data Streaming Trends for 2023 I see the following topics coming up more regularly in conversations with customers, prospects, and the broader Kafka community across the globe: Cloud-native lakehouses Decentralized data mesh Data sharing in real-time Improved developer and user experience Advanced data governance and policy enforcement The following sections describe each trend in more detail. The end of the article contains the complete slide deck. The trends are relevant for various scenarios. No matter if you use open source Apache Kafka, a commercial platform, or a fully-managed cloud service like Confluent Cloud. Kafka as Data Fabric for Cloud-Native Lakehouses Many data platform vendors pitch the lakehouse vision today. That's the same story as the data lake in the Hadoop era with few new nuances. Put all your data into a single data store to save the world and solve every problem and use case: In the last ten years, most enterprises realized this strategy did not work. The data lake is great for reporting and batch analytics, but not the right choice for every problem. Besides technical challenges, new challenges emerged: data governance, compliance issues, data privacy, and so on. Applying a best-of-breed enterprise architecture for real-time and batch data analytics using the right tool for each job is a much more successful, flexible, and future-ready approach: Data platforms like Databricks, Snowflake, Elastic, MongoDB, BigQuery, etc., have their sweet spots and trade-offs. Data streaming increasingly becomes the real-time data fabric between all the different data platforms and other business applications leveraging the real-time Kappa architecture instead of the much more batch-focused Lamba architecture. Decentralized Data Mesh With Valuable Data Products Focusing on business value by building data products in independent domains with various technologies is key to success in today's agile world with ever-changing requirements and challenges. Data mesh came to the rescue and emerged as a next-generation design pattern, succeeding service-oriented architectures and microservices. Two main proposals exist by vendors for building a data mesh: Data integration with data streaming enables fully decentralized business products. On the other side, data virtualization provides centralized queries: Centralized queries are simple but do not provide a clean architecture and decoupled domains and applications. It might work well to solve a single problem in a project. However, I highly recommend building a decentralized data mesh with data streaming to decouple the applications, especially for strategic enterprise architectures. Collaboration Within and Across Organizations in Real Time Collaborating within and outside the organization with data sharing using Open APIs, streaming data exchange, and cluster linking enable many innovative business models: The difference between data streaming to a database, data warehouse, or data lake is crucial: All these platforms enable data sharing at rest. The data is stored on a disk before it is replicated and shared within the organization or with partners. This is not real time. You cannot connect a real-time consumer to data at rest. However, real-time data beats slow data. Hence, sharing data in real time with data streaming platforms like Apache Kafka or Confluent Cloud enables accurate data as soon as a change happens. A consumer can be real-time, near real-time, or batch. A streaming data exchange puts data in motion within the organization or for B2B data sharing and Open API business models. AsyncAPI Spec for Apache Kafka API Schemas AsyncAPI allows developers to define the interfaces of asynchronous APIs. It is protocol agnostic. Features include: Specification of OpenAPI contracts (= schemas in the data streaming world) Documentation of APIs Code generation for many programming languages Data governance And much more... Confluent Cloud recently added a feature for generating an AsyncAPI specification for Apache Kafka clusters. We don't know yet where the market is going. Will AsynchAPI become the standard for OpenAPI in data streaming? Maybe. I see increasing demand for this specification by customers. Let's review the status of AsynchAPI in a few quarters or years. But it has the potential. Improved Developer Experience With Low-Code/No-Code Tools for Apache Kafka Many analysts and vendors pitch low code/no code tools. Visual coding is nothing new. Very sophisticated, powerful, and easy-to-use solutions exist as IDE or cloud applications. The significant benefit is time-to-market for developing applications and easier maintenance. At least in theory. These tools support various personas like developers, citizen integrators, and data scientists. At least in theory. The reality is that: Code is king Development is about evolution Open platforms win Low code/no code is great for some scenarios and personas. But it is just one option of many. Let's look at a few alternatives for building Kafka-native applications: These Kafka-native technologies have their trade-offs. For instance, the Confluent Stream Designer is perfect for building streaming ETL pipelines between various data sources and sinks. Just click the pipeline and transformations together. Then deploy the data pipeline into a scalable, reliable, and fully-managed streaming application. The difference to separate tools like Apache Nifi is that the generated code run in the same streaming platform, i.e., one infrastructure end-to-end. This makes ensuring SLAs and latency requirements much more manageable and the whole data pipeline more cost-efficient. However, the simpler a tool is, the less flexible it is. It is that easy. No matter which product or vendor you look at. This is not just true for Kafka-native tools. And you are flexible with your tool choice per project or business problem. Add your favorite non-Kafka stream processing engine to the stack, for instance, Apache Flink. Or use a separate iPaaS middleware like Dell Boomi or SnapLogic. Domain-Driven Design With Dumb Pipes and Smart Endpoints The real benefit of data streaming is the freedom of choice for your favorite Kafka-native technology, open-source stream processing framework, or cloud-native iPaaS middleware. Choose the proper library, tool, or SaaS for your project. Data streaming enables a decoupled domain-driven design with dumb pipes and smart endpoints: Data streaming with Apache Kafka is perfect for domain-driven design (DDD). On the contrary, often used point-to-point microservice architecture HTTP/REST web service or push-based message brokers like RabbitMQ create much stronger dependencies between applications. Data Governance Across the Data Streaming Pipeline An enterprise architecture powered by data streaming enables easy access to data in real-time. Many enterprises leverage Apache Kafka as the central nervous system between all data sources and sinks. The consequence of being able to access all data easily across business domains is two conflicting pressures on organizations: Unlock the data to enable innovation versus Lock up the data to keep it safe. Achieving data governance across the end-to-end data streams with data lineage, event tracing, policy enforcement, and time travel to analyze historical events is critical for strategic data streaming in the enterprise architecture. Data governance on top of the streaming platform is required for end-to-end visibility, compliance, and security: Policy Enforcement With Schemas and API Contracts The foundation for data governance is the management of API contracts (so-called schemas in data streaming platforms like Apache Kafka). Solutions like Confluent enforce schemas along the data pipeline, including data producer, server, and consumer: Additional data governance tools like data lineage, catalog, or police enforcement are built on this foundation. The recommendation for any serious data streaming project is to use schema from the beginning. It is unnecessary for the first pipeline. But the following producers and consumers need a trusted environment with enforced policies to establish a decentralized data mesh architecture with independent but connected data products. Slides and Video for Data Streaming Use Cases in 2023 Here is the slide deck from my presentation: Fullscreen Mode And here is the free on-demand video recording. Data Streaming Goes Up in the Maturity Curve in 2023 It is still an early stage for data streaming in most enterprises. But the discussion goes beyond questions like "when to use Kafka?" or "which cloud service to use?"... In 2023, most enterprises look at more sophisticated challenges around their numerous data streaming projects. The new trends are often related to each other. A data mesh enables the building of independent data products that focus on business value. Data sharing is a fundamental requirement for a data mesh. New personas access the data stream. Often, citizen developers or data scientists need easy tools to pioneer new projects. The enterprise architecture requires and enforces data governance across the pipeline for security, compliance, and privacy reasons. Scalability and elasticity need to be there out of the box. Fully-managed data streaming is a brilliant opportunity for getting started in 2023 and moving up in the maturity curve from single projects to a central nervous system of real-time data. What are your most relevant and exciting trends for data streaming and Apache Kafka in 2023 to set data in motion? What are your strategy and timeline?

By Kai Wähner CORE
Using CockroachDB CDC With Apache Pulsar
Using CockroachDB CDC With Apache Pulsar

Previous Articles on CockroachDB CDC Using CockroachDB CDC with Azure Event Hubs Using CockroachDB CDC with Confluent Cloud Kafka and Schema Registry SaaS Galore: Integrating CockroachDB with Confluent Kafka, Fivetran, and Snowflake CockroachDB CDC using Minio as a cloud storage sink CockroachDB CDC using Hadoop Ozone S3 Gateway as a cloud storage sink Motivation Apache Pulsar is a cloud-native distributed messaging and streaming platform. In my customer conversations, it most often comes up when compared to Apache Kafka. I have a customer needing a Pulsar sink support as they rely on Pulsar's multi-region capabilities. CockroachDB does not have a native Pulsar sink; however, the Pulsar project supports Kafka on Pulsar protocol support, and that's the core of today's article. This tutorial assumes you have an enterprise license, you can also leverage our managed offerings where enterprise changefeeds are enabled by default. I am going to demonstrate the steps using a Docker environment instead. High-Level Steps Deploy Apache Pulsar Deploy a CockroachDB cluster with enterprise changefeeds Deploy a Kafka Consumer Verify Conclusion Step-By-Step Instructions Deploy Apache Pulsar Since I'm using Docker, I'm relying on the KoP Docker Compose environment provided by the Stream Native platform, which spearheads the development of Apache Pulsar. I've used the service taken from the KoP example almost as-is aside from a few differences: pulsar: container_name: pulsar hostname: pulsar image: streamnative/sn-pulsar:2.11.0.5 command: > bash -c "bin/apply-config-from-env.py conf/standalone.conf && exec bin/pulsar standalone -nss -nfw" # disable stream storage and functions worker environment: allowAutoTopicCreationType: partitioned brokerDeleteInactiveTopicsEnabled: "false" PULSAR_PREFIX_messagingProtocols: kafka PULSAR_PREFIX_kafkaListeners: PLAINTEXT://pulsar:9092 PULSAR_PREFIX_brokerEntryMetadataInterceptors: org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor PULSAR_PREFIX_webServicePort: "8088" ports: - 6650:6650 - 8088:8088 - 9092:9092 I removed PULSAR_PREFIX_kafkaAdvertisedListeners: PLAINTEXT://127.0.0.1:19092 as I don't need it, I also changed the exposed port for - 19092:9092 to - 9092:9092. My PULSAR_PREFIX_kafkaListeners address points to a Docker container with the hostname pulsar. I will need to access the address from other containers and I can't rely on the localhost. I'm also using a more recent version of the image than the one in their docs. Deploy a CockroachDB Cluster With Enterprise Changefeeds I am using a 3-node cluster in Docker. If you've followed my previous articles, you should be familiar with it. I am using Flyway to set up the schema and seed the tables. The actual schema and data are taken from the changefeed examples we have in our docs. The only difference is I'm using a database called example. To enable CDC we need to execute the following commands: SET CLUSTER SETTING cluster.organization = '<organization name>'; SET CLUSTER SETTING enterprise.license = '<secret>'; SET CLUSTER SETTING kv.rangefeed.enabled = true; Again, if you don't have an enterprise license, you won't be able to complete this tutorial. Feel free to use our Dedicated or Serverless instances if you want to follow along. Finally, after the tables and the data are in place, we can create a changefeed on these tables. CREATE CHANGEFEED FOR TABLE office_dogs, employees INTO 'kafka://pulsar:9092'; Here I am using the Kafka port and the address of the Pulsar cluster, in my case pulsar. job_id ---------------------- 855538618543276035 (1 row) NOTICE: changefeed will emit to topic office_dogs NOTICE: changefeed will emit to topic employees Time: 50ms total (execution 49ms / network 1ms) Everything seems to work and changefeed does not error out. Deploy a Kafka Consumer To validate data is being written to Pulsar, we need to stand up a Kafka client. I've created an image that downloads and installs Kafka. Once the entire Docker Compose environment is running, we can access the client and run the console consumer to verify. /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server pulsar:9092 --topic office_dogs --from-beginning {"after": {"id": 1, "name": "Petee H"} {"after": {"id": 2, "name": "Carl"} If we want to validate that new data is flowing, let's insert another record into CockroachDB: INSERT INTO office_dogs VALUES (3, 'Test'); The consumer will print a new row: {"after": {"id": 3, "name": "Test"} Since we've created two topics, let's now look at the employees topic. /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server pulsar:9092 --topic employees --from-beginning {"after": {"dog_id": 1, "employee_name": "Lauren", "rowid": 855539880336523267} {"after": {"dog_id": 2, "employee_name": "Spencer", "rowid": 855539880336654339} Similarly, let's update a record and see the changes propagate to Pulsar. UPDATE employees SET employee_name = 'Spencer Kimball' WHERE dog_id = 2; {"after": {"dog_id": 2, "employee_name": "Spencer Kimball", "rowid": 855539880336654339} Verify We've confirmed we can produce messages to Pulsar topics using the Kafka protocol via KoP. We've also confirmed we can consume using the Kafka console consumer. We can also use the native Pulsar tooling to confirm the data is consumable from Pulsar. I installed the Pulsar Python client, pip install pulsar-client, on the Kafka client machine and created a Python script with the following code: import pulsar client = pulsar.Client('pulsar://pulsar:6650') consumer = client.subscribe('employees', subscription_name='my-sub') while True: msg = consumer.receive() print("Received message: '%s'" % msg.data()) consumer.acknowledge(msg) client.close() I execute the script: root@kafka-client:/opt/kafka# python3 consume_messages.py 2023-04-11 14:17:21.761 INFO [281473255101472] Client:87 | Subscribing on Topic :employees 2023-04-11 14:17:21.762 INFO [281473255101472] ClientConnection:190 | [<none> -> pulsar://pulsar:6650] Create ClientConnection, timeout=10000 2023-04-11 14:17:21.762 INFO [281473255101472] ConnectionPool:97 | Created connection for pulsar://pulsar:6650 2023-04-11 14:17:21.763 INFO [281473230237984] ClientConnection:388 | [172.28.0.3:33826 -> 172.28.0.6:6650] Connected to broker 2023-04-11 14:17:21.771 INFO [281473230237984] HandlerBase:72 | [persistent://public/default/employees-partition-0, my-sub, 0] Getting connection from pool 2023-04-11 14:17:21.776 INFO [281473230237984] ClientConnection:190 | [<none> -> pulsar://pulsar:6650] Create ClientConnection, timeout=10000 2023-04-11 14:17:21.776 INFO [281473230237984] ConnectionPool:97 | Created connection for pulsar://localhost:6650 2023-04-11 14:17:21.776 INFO [281473230237984] ClientConnection:390 | [172.28.0.3:33832 -> 172.28.0.6:6650] Connected to broker through proxy. Logical broker: pulsar://localhost:6650 2023-04-11 14:17:21.786 INFO [281473230237984] ConsumerImpl:238 | [persistent://public/default/employees-partition-0, my-sub, 0] Created consumer on broker [172.28.0.3:33832 -> 172.28.0.6:6650] 2023-04-11 14:17:21.786 INFO [281473230237984] MultiTopicsConsumerImpl:274 | Successfully Subscribed to a single partition of topic in TopicsConsumer. Partitions need to create : 0 2023-04-11 14:17:21.786 INFO [281473230237984] MultiTopicsConsumerImpl:137 | Successfully Subscribed to Topics Let's insert a record into the employees tables: INSERT INTO employees (dog_id, employee_name) VALUES (3, 'Test'); UPDATE employees SET employee_name = 'Artem' WHERE dog_id = 3; The Pulsar client output is as follows: Received message: 'b'{"after": {"dog_id": 3, "employee_name": "Test", "rowid": 855745376561364994}'' Received message: 'b'{"after": {"dog_id": 3, "employee_name": "Artem", "rowid": 855745376561364994}'' Conclusion This is how you can leverage existing CockroachDB capability with non-standard services like Apache Pulsar. Hopefully, you've found this article useful and can start leveraging the existing Kafka sink with non-standard message brokers.

By Artem Ervits CORE
Processing Time Series Data With QuestDB and Apache Kafka
Processing Time Series Data With QuestDB and Apache Kafka

Apache Kafka is a battle-tested distributed stream-processing platform popular in the financial industry to handle mission-critical transactional workloads. Kafka’s ability to handle large volumes of real-time market data makes it a core infrastructure component for trading, risk management, and fraud detection. Financial institutions use Kafka to stream data from market data feeds, transaction data, and other external sources to drive decisions. A common data pipeline to ingest and store financial data involves publishing real-time data to Kafka and utilizing Kafka Connect to stream that to databases. For example, the market data team may continuously update real-time quotes for a security to Kafka, and the trading team may consume that data to make buy/sell orders. Processed market data and orders may then be saved to a time series database for further analysis. In this article, we’ll create a sample data pipeline to illustrate how this could work in practice. We will poll an external data source (FinnHub) for real-time quotes of stocks and ETFs, and publish that information to Kafka. Kafka Connect will then grab those records and publish them to a time series database (QuestDB) for analysis. Prerequisites Git Docker Engine: 20.10+ Golang 1.19+ FinnHub API Token Setup To run the example locally, first clone the repo. The codebase is organized into three parts: Golang code is located at the root of the repo. Dockerfile for the Kafka Connect QuestDB image and the Docker Compose YAML file is under docker. JSON files for Kafka Connect sinks are under kafka-connect-sinks. Building the Kafka Connect QuestDB Image We first need to build the Kafka Connect docker image with QuestDB Sink connector. Navigate to the docker directory and run docker-compose build. The Dockerfile is simply installing the Kafka QuestDB Connector via Confluent Hub on top of the Confluent Kafka Connect base image: FROM confluentinc/cp-kafka-connect-base:7.3.2 RUN confluent-hub install --no-prompt questdb/kafka-questdb-connector:0.6 Start Kafka, Kafka Connect, and QuestDB Next, we will set up the infrastructure via Docker Compose. From the same docker directory, run Docker Compose in the background: docker-compose up -d This will start Kafka + Zookeeper, our custom Kafka Connect image with the QuestDB Connector installed, as well as QuestDB. The full content of the Docker Compose file is as follows: --- version: '2' services: zookeeper: image: confluentinc/cp-zookeeper:7.3.2 hostname: zookeeper container_name: zookeeper ports: - "2181:2181" environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 broker: image: confluentinc/cp-server:7.3.2 hostname: broker container_name: broker depends_on: - zookeeper ports: - "9092:9092" - "9101:9101" environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092 KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1 KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 KAFKA_JMX_PORT: 9101 KAFKA_JMX_HOSTNAME: localhost CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092 CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1 CONFLUENT_METRICS_ENABLE: 'true' CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous' kafka-connect: image: cp-kafka-connect-questdb build: context: . hostname: connect container_name: connect depends_on: - broker - zookeeper ports: - "8083:8083" environment: CONNECT_BOOTSTRAP_SERVERS: 'broker:29092' CONNECT_REST_ADVERTISED_HOST_NAME: connect CONNECT_REST_PORT: 8083 CONNECT_GROUP_ID: compose-connect-group CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1 CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000 CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1 CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1 CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter questdb: image: questdb/questdb hostname: questdb container_name: questdb ports: - "9000:9000" - "9009:9009" Start the QuestDB Kafka Connect Sink Wait for the Docker containers to be healthy (the kafka-connect image will log “Finished starting connectors and tasks” message), and we can create our Kafka Connect sinks. We will create two sinks: one for Tesla and one for SPY (SPDR S&P 500 ETF) to compare price trends of a volatile stock and the overall market. Issue the following curl command to create the Tesla sink within the kafka-connect-sinks directory: curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" --data @questdb-sink-TSLA.json http://localhost:8083/connectors The JSON file it posts contains the following configurations. { "name": "questdb-sink-SPY", "config": { "connector.class":"io.questdb.kafka.QuestDBSinkConnector", "tasks.max":"1", "topics": "topic_SPY", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": "false", "value.converter.schemas.enable": "false", "host": "questdb", "timestamp.field.name": "timestamp" } } Create the sink for SPY as well: curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" --data @questdb-sink-SPY.json http://localhost:8083/connectors Streaming Real Time Stock Quotes Now that we have our data pipeline set up, we are ready to stream real-time stock quotes to Kafka and store them in QuestDB. First, we need to get a free API token from Finnhub Stock API. Create a free account online and copy the API key. Export that key to our shell under FINNHUB_TOKEN : export FINNHUB_TOKEN=<my-token-here> The real-time quote endpoint returns various attributes such as the current price, high/low/open quotes, as well as previous close price. Since we are just interested in the current price, we only grab the price and add the ticket symbol and timestamp to the Kafka JSON message. The code below will grab the quote every 30 seconds and publish to the Kafka topic: topic_TSLA . package main import ( "encoding/json" "fmt" "io/ioutil" "net/http" "os" "time" "github.com/confluentinc/confluent-kafka-go/v2/kafka" ) type StockData struct { Price float64 `json:"c"` } type StockDataWithTime struct { Symbol string `json:"symbol"` Price float64 `json:"price"` Timestamp int64 `json:"timestamp"` } func main() { // Create a new Kafka producer instance p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092"}) if err != nil { panic(fmt.Sprintf("Failed to create producer: %s\n", err)) } defer p.Close() for { token, found := os.LookupEnv("FINNHUB_TOKEN") if !found { panic("FINNHUB_TOKEN is undefined") } symbol := "TSLA" url := fmt.Sprintf("https://finnhub.io/api/v1/quote?symbol=%s&token=%s", symbol, token) // Retrieve the stock data resp, err := http.Get(url) if err != nil { fmt.Println(err) return } defer resp.Body.Close() // Read the response body body, err := ioutil.ReadAll(resp.Body) if err != nil { fmt.Println(err) return } // Unmarshal the JSON data into a struct var data StockData err = json.Unmarshal(body, &data) if err != nil { fmt.Println(err) return } // Format data with timestamp tsData := StockDataWithTime{ Symbol: symbol, Price: data.Price, Timestamp: time.Now().UnixNano() / 1000000, } jsonData, err := json.Marshal(tsData) if err != nil { fmt.Println(err) return } topic := fmt.Sprintf("topic_%s", symbol) err = p.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: jsonData, }, nil) if err != nil { fmt.Printf("Failed to produce message: %s\n", err) } fmt.Printf("Message published to Kafka: %s", string(jsonData)) time.Sleep(30 * time.Second) } } To start streaming the data, run the code: $ go run main.go To also get data for SPY, open up another terminal window, modify the code for the symbol to SPY and run the code as well with the token value set. Result After running the producer code, it will print out messages that it sends to Kafka like: Message published to Kafka: {“symbol”:”TSLA”,”price”:174.48,”timestamp”:1678743220215} . This data is sent to the Kafka topic topic_TSLA and sent to QuestDB via the Kafka Connect sink. We can then navigate to localhost:9000 to access the QuestDB console. Searching for all records in the topic_TSLA table, we can see our real-time market quotes: SELECT * FROM ‘topic_TSLA’ We can also look at SPY data from topic_SPY : SELECT * FROM ‘topic_SPY’ With the data now in QuestDB, we can query for aggregate information by getting the average price over a 2m window: SELECT avg(price), timestamp FROM topic_SPY SAMPLE BY 2m; Conclusion Kafka is a trusted component of data pipelines handling large amounts of time series data such as financial data. Kafka can be used to stream mission-critical source data to multiple destinations, including time series databases suited for real-time analytics. In this article, we created a reference implementation of how to poll real-time market data and use Kafka to stream that to QuestDB via Kafka Connect. For more information on the QuestDB Kafka connector, check out the overview page on the QuestDB website. It lists more information on the configuration details and FAQs on setting it up. The GitHub repo for the connector also has sample projects including a Node.js and Java example for those looking to extend this reference architecture.

By Yitaek Hwang CORE

Top Big Data Experts

expert thumbnail

Miguel Garcia

VP of Engineering,
Nextail Labs

Miguel has a great background in leading teams and building high-performance solutions for the retail sector. An advocate of platform design as a service and data as a product.
expert thumbnail

Gautam Goswami

Founder,
DataView

Enthusiastic about learning and sharing knowledge on Big Data, Data Science & related headways including data streaming platforms through knowledge sharing platform Dataview.in. Presently serving as CTO at Irisidea TechSolutions, Bangalore, India. https://www.irisidea.com/gautam-goswami/
expert thumbnail

Alexander Eleseev

Full Stack Developer,
First Line Software

‎ ‎ ‎ ‎ ‎ ‎ ‎ ‎ ‎ ‎ ‎ ‎ ‎ ‎ ‎ ‎ ‎ ‎ ‎ ‎ ‎ ‎ ‎ ‎ ‎ ‎ ‎ ‎ ‎ ‎ ‎ ‎ ‎ ‎ ‎ ‎ ‎ ‎ ‎ ‎ ‎ ‎ ‎ ‎ ‎ ‎ ‎ ‎ ‎ ‎ ‎ ‎ ‎ ‎ ‎ ‎ ‎ ‎ ‎ ‎ ‎ ‎ ‎ ‎ ‎ ‎ ‎ ‎ ‎
expert thumbnail

Ben Herzberg

Chief Scientist,
Satori

Ben is an experienced hacker, developer, and author. Ben is experienced in endpoint security, behavioral analytics, application security, and data security. Ben filled roles such as the CTO of Cynet, and Director of threat research at Imperva. Ben is the Chief Scientist for Satori, streamlining data access and security with DataSecOps.

The Latest Big Data Topics

article thumbnail
Enriching Kafka Applications With Contextual Data
Kafka is great for event streaming architectures, continuous data integration (ETL), and messaging systems of record (database). Learn how to enhance your Kafka applications.
Updated May 12, 2023
by Fawaz Ghali, PhD
· 31 Views · 1 Like
article thumbnail
Non-Volatile Memory
In this article, we will explore the various forms of NVM, their advantages, limitations, and their potential applications.
Updated May 11, 2023
by Aditya Bhuyan
· 1,210 Views · 2 Likes
article thumbnail
A Deep Dive Into the Differences Between Kafka and Pulsar
Compare Apache Kafka and Pulsar, highlighting unique features and core distinctions. It aims to provide insight into mechanisms and inform decision-making.
Updated May 11, 2023
by Teng Fu
· 1,542 Views · 2 Likes
article thumbnail
Harnessing the Power of MQTT for the Future of IoT
As the use of MQTT in IoT is growing tremendously, we can anticipate seven developing trends in MQTT technology.
Updated May 11, 2023
by Umar Nabi
· 1,847 Views · 1 Like
article thumbnail
Have ChatGPT Scrape Your Website
A lot of people want to have ChatGPT scrape their website. In this article, I will explain how our ChatGPT website scraper works.
Updated May 10, 2023
by Thomas Hansen CORE
· 1,645 Views · 1 Like
article thumbnail
Developing Metadata-Driven Data Engineering Pipelines Using Apache Spark and Python Dictionary
This article will discuss metadata-driven programming techniques to make our PySpark code more flexible, maintainable, and reusable.
Updated May 10, 2023
by Amlan Patnaik
· 1,802 Views · 2 Likes
article thumbnail
Top Data Science Courses to Consider
In this article, we will read about the top 7 Data Science Certification Courses along with their price, mode of education course curriculum, and other basic stuff.
Updated May 9, 2023
by Sarang S Babu
· 1,299 Views · 1 Like
article thumbnail
Ways To Stop and Resume Your Kafka Producer/Consumer at Run-Time
In this blog, the reader will learn how to stop and resume a Kafka client or producer at runtime using two distinct methods.
Updated May 9, 2023
by Ashok Gudise
· 1,997 Views · 1 Like
article thumbnail
Understanding Kafka and Event-Driven Architecture [Video Tutorials]
Explore Kafka and Event Driven Architecture, covering key concepts such as consumer groups, topic partitions, replication, and cluster workings.
Updated May 9, 2023
by Ram N
· 1,886 Views · 1 Like
article thumbnail
How To Boost AI Model Training With a Distributed Storage System
Learn why distributed storage is important in the AI field and how JuiceFS, an open-source, high-performance distributed file system, can help boost model training.
Updated May 8, 2023
by Changjian Gao
· 1,798 Views · 2 Likes
article thumbnail
Preventing Data Loss With Kafka Listeners in Spring Boot
In this article, we'll look at how to build Kafka listeners with Spring Boot and how to use Kafka's acknowledgment mechanisms.
Updated May 8, 2023
by Viacheslav Shago
· 1,736 Views · 3 Likes
article thumbnail
Mastering Data Integration for Seamless Cloud Migration Approaches Benefits and Challenges
This article delves into various data integration methodologies for cloud migration and scrutinizes the merits and demerits of each approach.
Updated May 8, 2023
by srinivas Venkata
· 2,569 Views · 1 Like
article thumbnail
Impact Of ChatGPT on Software Testing Companies
Explore the blog to understand how ChatGPT's AI could complement the initiatives of the software testing companies, along with the limitations it offers.
Updated May 8, 2023
by Hima Pujara
· 1,713 Views · 1 Like
article thumbnail
SQL vs. NoSQL: Explained
Two primary databases are generally used for storing digital data: SQL and NoSQL. Quick difference explanation of two databases, SQL and NoSQL.
Updated May 8, 2023
by Aniruddh Agarwal
· 1,856 Views · 2 Likes
article thumbnail
Write a Smart Contract With ChatGPT, Metamask, Infura, and Truffle
Let’s put ChatGPT to a web3 test and see what kind of smart contract can be created using MetaMask, Infura, and Truffle. Will it be mainnet ready?
Updated May 8, 2023
by John Vester CORE
· 11,568 Views · 1 Like
article thumbnail
Spring Boot and Apache Kafka [Video Tutorials]
Explore a video tutorial series regarding Spring Boot exchanging messages with the Apache Kafka server on the Amazon EC2 Instance.
Updated May 6, 2023
by Ram N
· 3,439 Views · 2 Likes
article thumbnail
Building a Data Warehouse for Traditional Industry
The best component for you is the one that suits you most. In our case, we don't have too much data to process but want a data platform easy to use and maintain.
Updated May 5, 2023
by Herman Seah
· 2,984 Views · 3 Likes
article thumbnail
Choosing the Right Azure Storage Service
In this blog post, we will explore the array of services available within Azure Storage: Blobs, Files, Queues, and Tables, and their use cases
Updated May 5, 2023
by Harshvardhan Singh
· 3,419 Views · 1 Like
article thumbnail
Integrating Customer 360 Systems With GPT APIs: A Technical Architecture Perspective
Integrate GPT APIs into Customer 360 for natural language processing, data-driven analytics, personalized customer experiences, and AI-powered support solutions.
Updated May 5, 2023
by Prafulla Patil
· 4,058 Views · 2 Likes
article thumbnail
Apache Kafka + Apache Flink = Match Made in Heaven
Discover the pros and cons of Apache Kafka for data streaming in combination with Apache Flink (or Kafka Streams) for stream processing.
Updated May 5, 2023
by Kai Wähner CORE
· 5,000 Views · 4 Likes
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • ...
  • Next

ABOUT US

  • About DZone
  • Send feedback
  • Careers
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 600 Park Offices Drive
  • Suite 300
  • Durham, NC 27709
  • support@dzone.com

Let's be friends: