A database is a collection of structured data that is stored in a computer system, and it can be hosted on-premises or in the cloud. As databases are designed to enable easy access to data, our resources are compiled here for smooth browsing of everything you need to know from database management systems to database languages.
With the growth of the application modernization demands, monolithic applications were refactored to cloud-native microservices and serverless functions with lighter, faster, and smaller application portfolios for the past years. This was not only about rewriting applications, but the backend data stores were also redesigned in terms of dynamic scalability, high performance, and flexibility for event-driven architecture. For example, traditional data structures in relational databases started to move forward to a new approach that enables to storage and retrieval of key-value and document data structures using NoSQL databases. However, faster modernization presents more challenges for Java developers in terms of steep learning curves about new technologies adoption and retaining current skillsets with experience. For instance, Java developers need to rewrite all existing Java applications to Golang and JavaScript for new serverless functions and learn new APIs or SDKs to process dynamic data records by new modernized serverless applications. This article will take you through a step-by-step tutorial to learn how Quarkus enables Java developers to implement serverless functions on AWS Lambda to process dynamic data on AWS DynamoDB. Quarkus enables developers not only to optimize Java applications for superfast startup time (e.g., milliseconds) and tiny memory footprints (e.g., less than 100 MB) for serverless applications, but developers can also use more than XX AWS extensions to deploy Java applications to AWS Lambda and access AWS DynamoDB directly without steep learning curves. Creating a New Serverless Java Project Using Quarkus We’ll use the Quarkus command to generate a new project with required files such as Maven Wrapper, Dockerfiles, configuration properties, and sample code. Find more information about the benefits of the Quarkus command (CLI) here. Run the following Quarkus command in your working directory. Shell quarkus create piggybank --java=17 You need to use the JDK 17 version since AWS Lambda currently supports JDK 17 as the latest version by default Java runtime (Corretto). Let’s start Quarkus Live Coding, also known as quarkus dev mode, using the following command. Shell cd piggybank && quarkus dev mode Developing Business Logic for Piggybank Now let's add a couple of Quarkus extensions to create a DynamoDB entity and relevant abstract services using the following Quarkus command in the Piggybank directory. Shell quarkus ext add amazon-dynamodb resteasy-reactive-jackson The output should look like this. Java [SUCCESS] ✅ Platform io.quarkus.platform:quarkus-amazon-services-bom has been installed [SUCCESS] ✅ Extension io.quarkiverse.amazonservices:quarkus-amazon-dynamodb has been installed [SUCCESS] ✅ Extension io.quarkus:quarkus-resteasy-reactive-jackson has been installed Creating an Entity Class You will create a new data model (entry.java) file to define Java attributes that map into the fields in DynamoDB. The Java class should look like the following code snippet (you can find the solution in the GitHub repository): Java @RegisterForReflection public class Entry { public Long timestamp; public String accountID; ... public Entry() {} public static Entry from(Map<String, AttributeValue> item) { Entry entry = new Entry(); if (item != null && !item.isEmpty()) { entry.setAccountID(item.get(AbstractService.ENTRY_ACCOUNTID_COL).s()); ... } return entry; } ... } The @RegisterForReflectionannotation instructs Quarkus to keep the class and its members during the native compilation. Find more information here. Creating an Abstract Service Now you will create a new AbstractService.java file to consist of helper methods that prepare DynamoDB to request objects for reading and adding items to the table. The code snippet should look like this (find the solution in the GitHub repository): Java public class AbstractService { public String accountID; ... public static final String ENTRY_ACCOUNTID_COL = "accountID"; ... public String getTableName() { return "finance"; } protected ScanRequest scanRequest() { return ScanRequest.builder().tableName(getTableName()) .attributesToGet(ENTRY_ACCOUNTID_COL, ENTRY_DESCRIPTION_COL, ENTRY_AMOUNT_COL, ENTRY_BALANCE_COL, ENTRY_DATE_COL, ENTRY_TIMESTAMP, ENTRY_CATEGORY).build(); } ... } Adding a Business Layer for REST APIs Create a new EntryService.java file to extend the AbstractService class that will be the business layer of your application. This logic will store and retrieve the entry data from DynamoDB synchronously. The code snippet should look like this (solution in the GitHub repository): Java @ApplicationScoped public class EntryService extends AbstractService { @Inject DynamoDbClient dynamoDB; public List<Entry> findAll() { List<Entry> entries = dynamoDB.scanPaginator(scanRequest()).items().stream() .map(Entry::from) .collect(Collectors.toList()); entries.sort((e1, e2) -> e1.getDate().compareTo(e2.getDate())); BigDecimal balance = new BigDecimal(0); for (Entry entry : entries) { balance = balance.add(entry.getAmount()); entry.setBalance(balance); } return entries; } ... } Creating REST APIs Now you'll create a new EntryResource.java file to implement REST APIs to get and post the entry data from and to DynamoDB. The code snippet should look like the below (solution in the GitHub repository): Java @Path("/entryResource") public class EntryResource { SimpleDateFormat piggyDateFormatter = new SimpleDateFormat("yyyy-MM-dd+HH:mm"); @Inject EntryService eService; @GET @Path("/findAll") public List<Entry> findAll() { return eService.findAll(); } ... } Verify the Business Services Locally First, we need to install a local DynamoDB that the piggy bank services access. There’re a variety of ways to stand up a local DynamoDB such as downloading an executable .jar file, running a container image, and deploying by Apache Maven repository. Today, you will use the Docker compose to install and run DynamoDB locally. Find more information here. Create the following docker-compose.yml file in your local environment. YAML version: '3.8' services: dynamodb-local: command: "-jar DynamoDBLocal.jar -sharedDb -dbPath ./data" image: "amazon/dynamodb-local:latest" container_name: dynamodb-local ports: - "8000:8000" volumes: - "./docker/dynamodb:/home/dynamodblocal/data" working_dir: /home/dynamodblocal Then, run the following command-line command. Shell docker-compose up The output should look like this. Shell [+] Running 2/2 ⠿ Network quarkus-piggybank_default Created 0.0s ⠿ Container dynamodb-local Created 0.1s Attaching to dynamodb-local dynamodb-local | Initializing DynamoDB Local with the following configuration: dynamodb-local | Port: 8000 dynamodb-local | InMemory: false dynamodb-local | DbPath: ./data dynamodb-local | SharedDb: true dynamodb-local | shouldDelayTransientStatuses: false dynamodb-local | CorsParams: null dynamodb-local | Creating an Entry Table Locally Run the following AWS DynamoDB API command to create a new entry table in the running DynamoDB container. Shell aws dynamodb create-table --endpoint-url http://localhost:8000 --table-name finance --attribute-definitions AttributeName=accountID,AttributeType=S AttributeName=timestamp,AttributeType=N --key-schema AttributeName=timestamp,KeyType=HASH AttributeName=accountID,KeyType=RANGE --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5 --table-class STANDARD Adding DynamoDB Clients Configurations DynamoDB clients are configurable in the application.properties programmatically. You also need to add to the classpath a proper implementation of the sync client. By default, the extension uses the java.net.URLConnection HTTP client. Open the pom.xml file and copy the following dependency right after the quarkus-amazon-dynamodb dependency. XML <dependency> <groupId>software.amazon.awssdk</groupId> <artifactId>url-connection-client</artifactId> </dependency> Then, add the following key and value to the application.properties to specify your local DynamoDB's endpoint. Java %dev.quarkus.dynamodb.endpoint-override=http://localhost:8000 Starting Quarkus Live Coding Now you should be ready to verify the Piggybank application using Quarkus Dev mode and local DynamoDB. Run the Quarkus Dev mode using the following Quarkus command. Shell quarkus dev The output should end up this. Shell __ ____ __ _____ ___ __ ____ ______ --/ __ \/ / / / _ | / _ \/ //_/ / / / __/ -/ /_/ / /_/ / __ |/ , _/ ,< / /_/ /\ \ --\___\_\____/_/ |_/_/|_/_/|_|\____/___/ [io.quarkus] (Quarkus Main Thread) Profile dev activated. Live Coding activated.quarkus xx.xx.xx.) s2023-04-30 21:14:49,824 INFO [io.quarkus] (Quarkus Main Thread) Installed features: [amazon-dynamodb, cdi, resteasy-reactive, resteasy-reactive-jackson, smallrye-context-propagation, vertx] -- Tests paused Press [r] to resume testing, [o] Toggle test output, [:] for the terminal, [h] for more options> Run the following curl command to insert several expense items into the piggybank account (entry table). Shell curl -X POST http://localhost:8080/entryResource -H 'Content-Type: application/json' -d '{"accountID": "Food", "description": "Shrimp", "amount": "-20", "balance": "0", "date": "2023-02-01"}' curl -X POST http://localhost:8080/entryResource -H 'Content-Type: application/json' -d '{"accountID": "Car", "description": "Flat tires", "amount": "-200", "balance": "0", "date": "2023-03-01"}' curl -X POST http://localhost:8080/entryResource -H 'Content-Type: application/json' -d '{"accountID": "Payslip", "description": "Income", "amount": "2000", "balance": "0", "date": "2023-04-01"}' curl -X POST http://localhost:8080/entryResource -H 'Content-Type: application/json' -d '{"accountID": "Utilities", "description": "Gas", "amount": "-400", "balance": "0", "date": "2023-05-01"}' Verify the stored data using the following command. Shell curl http://localhost:8080/entryResource/findAll The output should look like this. JSON [{"accountID":"Food","description":"Shrimp","amount":"-20","balance":"-30","date":"2023-02-01"},{"accountID":"Drink","description":"Wine","amount":"-10","balance":"-10","date":"2023-01-01"},{"accountID":"Payslip","description":"Income","amount":"2000","balance":"1770","date":"2023-04-01"},{"accountID":"Car","description":"Flat tires","amount":"-200","balance":"-230","date":"2023-03-01"},{"accountID":"Utilities","description":"Gas","amount":"-400","balance":"1370","date":"2023-05-01"}] You can also find a certain expense based on accountID. Run the following curl command again. Shell curl http://localhost:8080/entryResource/find/Drink The output should look like this. JSON {"accountID":"Drink","description":"Wine","amount":"-10","balance":"-10","date":"2023-01-01"} Conclusion You learned how Quarkus enables developers to write serverless functions that connect NoSQL databases to process dynamic data. To stand up local development environments, you quickly ran the local DynamoDB image using the docker-compose command as well. Quarkus also provide various AWS extensions including amazon-dynamodb to access the AWS cloud services directly from your Java applications. Find more information here. In the next article, you’ll learn how to create a serverless database using AWS DynamoDB and build and deploy your local serverless Java functions to AWS Lambda by enabling SnapStart.
Spark is an analytics engine for large-scale data engineering. Despite its long history, it still has its well-deserved place in the big data landscape. QuestDB, on the other hand, is a time-series database with a very high data ingestion rate. This means that Spark desperately needs data, a lot of it! ...and QuestDB has it, a match made in heaven. Of course, there are pandas for data analytics! The key here is the expression large-scale. Unlike pandas, Spark is a distributed system and can scale really well. What does this mean exactly? Let's take a look at how data is processed in Spark. For the purposes of this article, we only need to know that a Spark job consists of multiple tasks, and each task works on a single data partition. Tasks are executed parallel in stages, distributed on the cluster. Stages have a dependency on the previous ones; tasks from different stages cannot run in parallel. The schematic diagram below depicts an example job: In this tutorial, we will load data from a QuestDB table into a Spark application and explore the inner working of Spark to refine data loading. Finally, we will modify and save the data back to QuestDB. Loading Data to Spark First thing first, we need to load time-series data from QuestDB. I will use an existing table, trades, with just over 1.3 million rows. It contains bitcoin trades spanning over 3 days: not exactly a big data scenario but good enough to experiment. The table contains the following columns: Column Name Column Type symbol SYMBOL side SYMBOL price DOUBLE amount DOUBLE timestamp TIMESTAMP The table is partitioned by day and the timestamp column serves as the designated timestamp. QuestDB accepts connections via Postgres wire protocol, so we can use JDBC to integrate. You can choose from various languages to create Spark applications, and here we will go for Python. Create the script, sparktest.py: Python from pyspark.sql import SparkSession # create Spark session spark = SparkSession.builder.appName("questdb_test").getOrCreate() # load 'trades' table into the dataframe df = spark.read.format("jdbc") \ .option("url", "jdbc:postgresql://localhost:8812/questdb") \ .option("driver", "org.postgresql.Driver") \ .option("user", "admin").option("password", "quest") \ .option("dbtable", "trades") \ .load() # print the number of rows print(df.count()) # do some filtering and print the first 3 rows of the data df = df.filter(df.symbol == 'BTC-USD').filter(df.side == 'buy') df.show(3, truncate=False) Believe it or not, this tiny application already reads data from the database when submitted as a Spark job. Shell spark-submit --jars postgresql-42.5.1.jar sparktest.py The job prints the following row count: Shell 1322570 And these are the first 3 rows of the filtered table: Shell +-------+----+--------+---------+--------------------------+ |symbol |side|price |amount |timestamp | +-------+----+--------+---------+--------------------------+ |BTC-USD|buy |23128.72|6.4065E-4|2023-02-01 00:00:00.141334| |BTC-USD|buy |23128.33|3.7407E-4|2023-02-01 00:00:01.169344| |BTC-USD|buy |23128.33|8.1796E-4|2023-02-01 00:00:01.184992| +-------+----+--------+---------+--------------------------+ only showing top 3 rows Although sparktest.py speaks for itself, it is still worth mentioning that this application has a dependency on the JDBC driver located in postgresql-42.5.1.jar. It cannot run without this dependency; hence it has to be submitted to Spark together with the application. Optimizing Data Loading With Spark We have loaded data into Spark. Now we will look at how this was completed and some aspects to consider. The easiest way to peek under the hood is to check QuestDB's log, which should tell us how Spark interacted with the database. We will also make use of the Spark UI, which displays useful insights of the execution, including stages and tasks. Spark Connection to QuestDB: Spark Is Lazy QuestDB log shows that Spark connected three times to the database. For simplicity, I only show the relevant lines in the log: Shell 2023-03-21T21:12:24.031443Z I pg-server connected [ip=127.0.0.1, fd=129] 2023-03-21T21:12:24.060520Z I i.q.c.p.PGConnectionContext parse [fd=129, q=SELECT * FROM trades WHERE 1=0] 2023-03-21T21:12:24.072262Z I pg-server disconnected [ip=127.0.0.1, fd=129, src=queue] Spark first queried the database when we created the DataFrame, but as it turns out, it was not too interested in the data itself. The query looked like this: SELECT * FROM trades WHERE 1=0 The only thing Spark wanted to know was the schema of the table in order to create an empty DataFrame. Spark evaluates expressions lazily and only does the bare minimum required at each step. After all, it is meant to analyze big data, so resources are incredibly precious for Spark. Especially memory: data is not cached by default. The second connection happened when Spark counted the rows of the DataFrame. It did not query the data this time, either. Interestingly, instead of pushing the aggregation down to the database by running SELECT count(*) FROM trades, it just queried a 1 for each record: SELECT 1 FROM trades. Spark adds the 1s together to get the actual count. Shell 2023-03-21T21:12:25.692098Z I pg-server connected [ip=127.0.0.1, fd=129] 2023-03-21T21:12:25.693863Z I i.q.c.p.PGConnectionContext parse [fd=129, q=SELECT 1 FROM trades ] 2023-03-21T21:12:25.695284Z I i.q.c.TableReader open partition /Users/imre/Work/dbroot/db/trades~10/2023-02-01.2 [rowCount=487309, partitionNameTxn=2, transientRowCount=377783, partitionIndex=0, partitionCount=3] 2023-03-21T21:12:25.749986Z I i.q.c.TableReader open partition /Users/imre/Work/dbroot/db/trades~10/2023-02-02.1 [rowCount=457478, partitionNameTxn=1, transientRowCount=377783, partitionIndex=1, partitionCount=3] 2023-03-21T21:12:25.800765Z I i.q.c.TableReader open partition /Users/imre/Work/dbroot/db/trades~10/2023-02-03.0 [rowCount=377783, partitionNameTxn=0, transientRowCount=377783, partitionIndex=2, partitionCount=3] 2023-03-21T21:12:25.881785Z I pg-server disconnected [ip=127.0.0.1, fd=129, src=queue] Working with the data itself eventually forced Spark to get a taste of the table's content too. Filters are pushed down to the database by default. Shell 2023-03-21T21:12:26.132570Z I pg-server connected [ip=127.0.0.1, fd=28] 2023-03-21T21:12:26.134355Z I i.q.c.p.PGConnectionContext parse [fd=28, q=SELECT "symbol","side","price","amount","timestamp" FROM trades WHERE ("symbol" IS NOT NULL) AND ("side" IS NOT NULL) AND ("symbol" = 'BTC-USD') AND ("side" = 'buy') ] 2023-03-21T21:12:26.739124Z I pg-server disconnected [ip=127.0.0.1, fd=28, src=queue] We can see that Spark's interaction with the database is rather sophisticated and optimized to achieve good performance without wasting resources. The Spark DataFrame is the key component that takes care of the optimization, and it deserves some further analysis. What Is a Spark DataFrame? The name DataFrame sounds like a container to hold data, but we have seen earlier that this is not really true. So, what is a Spark DataFrame, then? One way to look at Spark SQL, with the risk of oversimplifying it, is that it is a query engine. df.filter(predicate) is really just another way of saying WHERE predicate. With this in mind, the DataFrame is pretty much a query, or actually more like a query plan. Most databases come with functionality to display query plans, and Spark has it too! Let's check the plan for the above DataFrame we just created: Python df.explain(extended=True) Shell == Parsed Logical Plan == Filter (side#1 = buy) +- Filter (symbol#0 = BTC-USD) +- Relation [symbol#0,side#1,price#2,amount#3,timestamp#4] JDBCRelation(trades) [numPartitions=1] == Analyzed Logical Plan == symbol: string, side: string, price: double, amount: double, timestamp: timestamp Filter (side#1 = buy) +- Filter (symbol#0 = BTC-USD) +- Relation [symbol#0,side#1,price#2,amount#3,timestamp#4] JDBCRelation(trades) [numPartitions=1] == Optimized Logical Plan == Filter ((isnotnull(symbol#0) AND isnotnull(side#1)) AND ((symbol#0 = BTC-USD) AND (side#1 = buy))) +- Relation [symbol#0,side#1,price#2,amount#3,timestamp#4] JDBCRelation(trades) [numPartitions=1] == Physical Plan == *(1) Scan JDBCRelation(trades) [numPartitions=1] [symbol#0,side#1,price#2,amount#3,timestamp#4] PushedFilters: [*IsNotNull(symbol), *IsNotNull(side), *EqualTo(symbol,BTC-USD), *EqualTo(side,buy)], ReadSchema: struct<symbol:string,side:string,price:double,amount:double,timestamp:timestamp> If the DataFrame knows how to reproduce the data by remembering the execution plan, it does not need to store the actual data. This is precisely what we have seen earlier. Spark desperately tried not to load our data, but this can have disadvantages too. Caching Data Not caching the data radically reduces Spark's memory footprint, but there is a bit of jugglery here. Data does not have to be cached because the plan printed above can be executed again and again and again... Now imagine how a mere decently-sized Spark cluster could make our lonely QuestDB instance suffer martyrdom. With a massive table containing many partitions, Spark would generate a large number of tasks to be executed parallel across different nodes of the cluster. These tasks would query the table almost simultaneously, putting a heavy load on the database. So, if you find your colleagues cooking breakfast on your database servers, consider forcing Spark to cache some data to reduce the number of trips to the database. This can be done by calling df.cache(). In a large application, it might require a bit of thinking about what is worth caching and how to ensure that Spark executors have enough memory to store the data. In practice, you should consider caching smallish datasets used frequently throughout the application's life. Let's rerun our code with a tiny modification, adding .cache(): Python from pyspark.sql import SparkSession # create Spark session spark = SparkSession.builder.appName("questdb_test").getOrCreate() # load 'trades' table into the dataframe df = spark.read.format("jdbc") \ .option("url", "jdbc:postgresql://localhost:8812/questdb") \ .option("driver", "org.postgresql.Driver") \ .option("user", "admin").option("password", "quest") \ .option("dbtable", "trades") \ .load() \ .cache() # print the number of rows print(df.count()) # print the first 3 rows of the data df.show(3, truncate=False) This time Spark hit the database only twice. First, it came for the schema, the second time for the data: SELECT "symbol","side","price","amount","timestamp" FROM trades. Shell 2023-03-21T21:13:04.122390Z I pg-server connected [ip=127.0.0.1, fd=129] 2023-03-21T21:13:04.147353Z I i.q.c.p.PGConnectionContext parse [fd=129, q=SELECT * FROM trades WHERE 1=0] 2023-03-21T21:13:04.159470Z I pg-server disconnected [ip=127.0.0.1, fd=129, src=queue] 2023-03-21T21:13:05.873960Z I pg-server connected [ip=127.0.0.1, fd=129] 2023-03-21T21:13:05.875951Z I i.q.c.p.PGConnectionContext parse [fd=129, q=SELECT "symbol","side","price","amount","timestamp" FROM trades ] 2023-03-21T21:13:05.876615Z I i.q.c.TableReader open partition /Users/imre/Work/dbroot/db/trades~10/2023-02-01.2 [rowCount=487309, partitionNameTxn=2, transientRowCount=377783, partitionIndex=0, partitionCount=3] 2023-03-21T21:13:06.259472Z I i.q.c.TableReader open partition /Users/imre/Work/dbroot/db/trades~10/2023-02-02.1 [rowCount=457478, partitionNameTxn=1, transientRowCount=377783, partitionIndex=1, partitionCount=3] 2023-03-21T21:13:06.653769Z I i.q.c.TableReader open partition /Users/imre/Work/dbroot/db/trades~10/2023-02-03.0 [rowCount=377783, partitionNameTxn=0, transientRowCount=377783, partitionIndex=2, partitionCount=3] 2023-03-21T21:13:08.479209Z I pg-server disconnected [ip=127.0.0.1, fd=129, src=queue] Clearly, even a few carefully placed .cache() calls can improve the overall performance of an application, sometimes significantly. What else should we take into consideration when thinking about performance? Earlier, we mentioned that our Spark application consists of tasks, which are working on the different partitions of the data parallel. So, partitioned data mean parallelism, which results in better performance. Spark Data Partitioning Now we turn to the Spark UI. It tells us that the job was done in a single task: The truth is that we have already suspected this. The execution plan told us (numPartitions=1) and we did not see any parallelism in the QuestDB logs either. We can display more details about this partition with a bit of additional code: Python from pyspark.sql.functions import spark_partition_id, min, max, count df = df.withColumn("partitionId", spark_partition_id()) df.groupBy(df.partitionId) \ .agg(min(df.timestamp), max(df.timestamp), count(df.partitionId)) \ .show(truncate=False) Shell +-----------+--------------------------+--------------------------+------------------+ |partitionId|min(timestamp) |max(timestamp) |count(partitionId)| +-----------+--------------------------+--------------------------+------------------+ |0 |2023-02-01 00:00:00.078073|2023-02-03 23:59:59.801778|1322570 | +-----------+--------------------------+--------------------------+------------------+ The UI helps us confirm that the data is loaded as a single partition. QuestDB stores this data in 3 partitions. We should try to fix this. Although it is not recommended, we can try to use DataFrame.repartition(). This call reshuffles data across the cluster while partitioning the data, so it should be our last resort. After running df.repartition(3, df.timestamp), we see 3 partitions, but not exactly the way we expected. The partitions overlap with one another: Shell +-----------+--------------------------+--------------------------+------------------+ |partitionId|min(timestamp) |max(timestamp) |count(partitionId)| +-----------+--------------------------+--------------------------+------------------+ |0 |2023-02-01 00:00:00.698152|2023-02-03 23:59:59.801778|438550 | |1 |2023-02-01 00:00:00.078073|2023-02-03 23:59:57.188894|440362 | |2 |2023-02-01 00:00:00.828943|2023-02-03 23:59:59.309075|443658 | +-----------+--------------------------+--------------------------+------------------+ It seems that DataFrame.repartition() used hashes to distribute the rows across the 3 partitions. This would mean that all 3 tasks would require data from all 3 QuestDB partitions. Let's try this instead: df.repartitionByRange(3, "timestamp"): Shell +-----------+--------------------------+--------------------------+------------------+ |partitionId|min(timestamp) |max(timestamp) |count(partitionId)| +-----------+--------------------------+--------------------------+------------------+ |0 |2023-02-01 00:00:00.078073|2023-02-01 21:22:35.656399|429389 | |1 |2023-02-01 21:22:35.665599|2023-02-02 21:45:02.613339|470372 | |2 |2023-02-02 21:45:02.641778|2023-02-03 23:59:59.801778|422809 | +-----------+--------------------------+--------------------------+------------------+ This looks better but still not ideal. That is because DaraFrame.repartitionByRange() samples the dataset and then estimates the borders of the partitions. What we really want is for the DataFrame partitions to match exactly the partitions we see in QuestDB. This way, the tasks running parallel in Spark do not cross their way in QuestDB, likely to result in better performance. Data source options are to the rescue! Let's try the following: Python from pyspark.sql import SparkSession from pyspark.sql.functions import spark_partition_id, min, max, count # create Spark session spark = SparkSession.builder.appName("questdb_test").getOrCreate() # load 'trades' table into the dataframe df = spark.read.format("jdbc") \ .option("url", "jdbc:postgresql://localhost:8812/questdb") \ .option("driver", "org.postgresql.Driver") \ .option("user", "admin").option("password", "quest") \ .option("dbtable", "trades") \ .option("partitionColumn", "timestamp") \ .option("numPartitions", "3") \ .option("lowerBound", "2023-02-01T00:00:00.000000Z") \ .option("upperBound", "2023-02-04T00:00:00.000000Z") \ .load() df = df.withColumn("partitionId", spark_partition_id()) df.groupBy(df.partitionId) \ .agg(min(df.timestamp), max(df.timestamp), count(df.partitionId)) \ .show(truncate=False) Shell +-----------+--------------------------+--------------------------+------------------+ |partitionId|min(timestamp) |max(timestamp) |count(partitionId)| +-----------+--------------------------+--------------------------+------------------+ |0 |2023-02-01 00:00:00.078073|2023-02-01 23:59:59.664083|487309 | |1 |2023-02-02 00:00:00.188002|2023-02-02 23:59:59.838473|457478 | |2 |2023-02-03 00:00:00.565319|2023-02-03 23:59:59.801778|377783 | +-----------+--------------------------+--------------------------+------------------+ After specifying partitionColumn, numPartitions, lowerBound, and upperBound, the situation is much better: the row counts in the partitions match what we have seen in the QuestDB logs earlier: rowCount=487309, rowCount=457478 and rowCount=377783. Looks like we did it! Shell 2023-03-21T21:13:05.876615Z I i.q.c.TableReader open partition /Users/imre/Work/dbroot/db/trades~10/2023-02-01.2 [rowCount=487309, partitionNameTxn=2, transientRowCount=377783, partitionIndex=0, partitionCount=3] 2023-03-21T21:13:06.259472Z I i.q.c.TableReader open partition /Users/imre/Work/dbroot/db/trades~10/2023-02-02.1 [rowCount=457478, partitionNameTxn=1, transientRowCount=377783, partitionIndex=1, partitionCount=3] 2023-03-21T21:13:06.653769Z I i.q.c.TableReader open partition /Users/imre/Work/dbroot/db/trades~10/2023-02-03.0 [rowCount=377783, partitionNameTxn=0, transientRowCount=377783, partitionIndex=2, partitionCount=3] We can check Spark UI again; it also confirms that the job was completed in 3 separate tasks, each of them working on a single partition. Sometimes it might be tricky to know the minimum and maximum timestamps when creating the DataFrame. In the worst case, you could query the database for those values via an ordinary connection. We have managed to replicate our QuestDB partitions in Spark, but data does not always come from a single table. What if the data required is the result of a query? Can we load that, and is it possible to partition it? Options to Load Data: SQL Query vs. Table We can use the query option to load data from QuestDB with the help of a SQL query: Python # 1-minute aggregated trade data df = spark.read.format("jdbc") \ .option("url", "jdbc:postgresql://localhost:8812/questdb") \ .option("driver", "org.postgresql.Driver") \ .option("user", "admin").option("password", "quest") \ .option("query", "SELECT symbol, sum(amount) as volume, " "min(price) as minimum, max(price) as maximum, " "round((max(price)+min(price))/2, 2) as mid, " "timestamp as ts " "FROM trades WHERE symbol = 'BTC-USD' " "SAMPLE BY 1m ALIGN to CALENDAR") \ .load() Depending on the amount of data and the actual query, you might find that pushing the aggregations to QuestDB is faster than completing it in Spark. Spark definitely has an edge when the dataset is really large. Now let's try partitioning this DataFrame with the options used before with the option dbtable. Unfortunately, we will get an error message: Shell Options 'query' and 'partitionColumn' can not be specified together. However, we can trick Spark by just giving the query an alias name. This means we can go back to using the dbtable option again, which lets us specify partitioning. See the example below: Python from pyspark.sql import SparkSession from pyspark.sql.functions import spark_partition_id, min, max, count # create Spark session spark = SparkSession.builder.appName("questdb_test").getOrCreate() # load 1-minute aggregated trade data into the dataframe df = spark.read.format("jdbc") \ .option("url", "jdbc:postgresql://localhost:8812/questdb") \ .option("driver", "org.postgresql.Driver") \ .option("user", "admin").option("password", "quest") \ .option("dbtable", "(SELECT symbol, sum(amount) as volume, " "min(price) as minimum, max(price) as maximum, " "round((max(price)+min(price))/2, 2) as mid, " "timestamp as ts " "FROM trades WHERE symbol = 'BTC-USD' " "SAMPLE BY 1m ALIGN to CALENDAR) AS fake_table") \ .option("partitionColumn", "ts") \ .option("numPartitions", "3") \ .option("lowerBound", "2023-02-01T00:00:00.000000Z") \ .option("upperBound", "2023-02-04T00:00:00.000000Z") \ .load() df = df.withColumn("partitionId", spark_partition_id()) df.groupBy(df.partitionId) \ .agg(min(df.ts), max(df.ts), count(df.partitionId)) \ .show(truncate=False) Shell +-----------+-------------------+-------------------+------------------+ |partitionId|min(ts) |max(ts) |count(partitionId)| +-----------+-------------------+-------------------+------------------+ |0 |2023-02-01 00:00:00|2023-02-01 23:59:00|1440 | |1 |2023-02-02 00:00:00|2023-02-02 23:59:00|1440 | |2 |2023-02-03 00:00:00|2023-02-03 23:59:00|1440 | +-----------+-------------------+-------------------+------------------+ Looking good. Now it seems that we can load any data from QuestDB into Spark by passing a SQL query to the DataFrame. Do we, really? Our trades table is limited to three data types only. What about all the other types you can find in QuestDB? We expect that Spark will successfully map a double or a timestamp when queried from the database, but what about a geohash? It is not that obvious what is going to happen. As always, when unsure, we should test. Type Mappings I have another table in the database with a different schema. This table has a column for each type currently available in QuestDB. SQL CREATE TABLE all_types ( symbol SYMBOL, string STRING, char CHAR, long LONG, int INT, short SHORT, byte BYTE, double DOUBLE, float FLOAT, bool BOOLEAN, uuid UUID, --long128 LONG128, long256 LONG256, bin BINARY, g5c GEOHASH(5c), date DATE, timestamp TIMESTAMP ) timestamp (timestamp) PARTITION BY DAY; INSERT INTO all_types values('sym1', 'str1', 'a', 456, 345, 234, 123, 888.8, 11.1, true, '9f9b2131-d49f-4d1d-ab81-39815c50d341', --to_long128(10, 5), rnd_long256(), rnd_bin(10,20,2), rnd_geohash(35), to_date('2022-02-01', 'yyyy-MM-dd'), to_timestamp('2022-01-15T00:00:03.234', 'yyyy-MM-ddTHH:mm:ss.SSS')); long128 is not fully supported by QuestDB yet, so it is commented out. Let's try to load and print the data; we can also take a look at the schema of the DataFrame: Python from pyspark.sql import SparkSession # create Spark session spark = SparkSession.builder.appName("questdb_test").getOrCreate() # create dataframe df = spark.read.format("jdbc") \ .option("url", "jdbc:postgresql://localhost:8812/questdb") \ .option("driver", "org.postgresql.Driver") \ .option("user", "admin").option("password", "quest") \ .option("dbtable", "all_types") \ .load() # print the schema print(df.schema) # print the content of the dataframe df.show(truncate=False) Much to my surprise, Spark managed to create the DataFrame and mapped all types. Here is the schema: Shell StructType([ StructField('symbol', StringType(), True), StructField('string', StringType(), True), StructField('char', StringType(), True), StructField('long', LongType(), True), StructField('int', IntegerType(), True), StructField('short', ShortType(), True), StructField('byte', ShortType(), True), StructField('double', DoubleType(), True), StructField('float', FloatType(), True), StructField('bool', BooleanType(), True), StructField('uuid', StringType(), True), # StructField('long128', StringType(), True), StructField('long256', StringType(), True), StructField('bin', BinaryType(), True), StructField('g5c', StringType(), True), StructField('date', TimestampType(), True), StructField('timestamp', TimestampType(), True) ]) It looks pretty good, but you might wonder if it is a good idea to map long256 and geohash types to String. QuestDB does not provide arithmetics for these types, so it is not a big deal. Geohashes are basically 32-base numbers, represented and stored in their string format. The 256-bit long values are also treated as string literals. long256 is used to store cryptocurrency private keys. Now let's see the data: Shell +------+------+----+----+---+-----+----+------+-----+----+------------------------------------+ |symbol|string|char|long|int|short|byte|double|float|bool|uuid | +------+------+----+----+---+-----+----+------+-----+----+------------------------------------+ |sym1 |str1 |a |456 |345|234 |123 |888.8 |11.1 |true|9f9b2131-d49f-4d1d-ab81-39815c50d341| +------+------+----+----+---+-----+----+------+-----+----+------------------------------------+ +------------------------------------------------------------------+----------------------------------------------------+ |long256 |bin | +------------------------------------------------------------------+----------------------------------------------------+ |0x8ee3c2a42acced0bb0bdb411c82bb01e7e487689228c189d1e865fa0e025973c|[F2 4D 4B F6 18 C2 9A A7 87 C2 D3 19 4A 2C 4B 92 C4]| +------------------------------------------------------------------+----------------------------------------------------+ +-----+-------------------+-----------------------+ |g5c |date |timestamp | +-----+-------------------+-----------------------+ |q661k|2022-02-01 00:00:00|2022-01-15 00:00:03.234| +-----+-------------------+-----------------------+ It also looks good, but we could omit the 00:00:00 from the end of the date field. We can see that it is mapped to Timestamp and not Date. We could also try to map one of the numeric fields to Decimal. This can be useful if later we want to do computations that require high precision. We can use the customSchema option to customize the column types. Our modified code: Python from pyspark.sql import SparkSession # create Spark session spark = SparkSession.builder.appName("questdb_test").getOrCreate() # create dataframe df = spark.read.format("jdbc") \ .option("url", "jdbc:postgresql://localhost:8812/questdb") \ .option("driver", "org.postgresql.Driver") \ .option("user", "admin").option("password", "quest") \ .option("dbtable", "all_types") \ .option("customSchema", "date DATE, double DECIMAL(38, 10)") \ .load() # print the schema print(df.schema) # print the content of the dataframe df.show(truncate=False) The new schema: Shell StructType([ StructField('symbol', StringType(), True), StructField('string', StringType(), True), StructField('char', StringType(), True), StructField('long', LongType(), True), StructField('int', IntegerType(), True), StructField('short', ShortType(), True), StructField('byte', ShortType(), True), StructField('double', DecimalType(38,10), True), StructField('float', FloatType(), True), StructField('bool', BooleanType(), True), StructField('uuid', StringType(), True), # StructField('long128', StringType(), True), StructField('long256', StringType(), True), StructField('bin', BinaryType(), True), StructField('g5c', StringType(), True), StructField('date', DateType(), True), StructField('timestamp', TimestampType(), True) ]) And the data is displayed as: Shell +------+------+----+----+---+-----+----+--------------+-----+----+------------------------------------+ |symbol|string|char|long|int|short|byte|double |float|bool|uuid | +------+------+----+----+---+-----+----+--------------+-----+----+------------------------------------+ |sym1 |str1 |a |456 |345|234 |123 |888.8000000000|11.1 |true|9f9b2131-d49f-4d1d-ab81-39815c50d341| +------+------+----+----+---+-----+----+--------------+-----+----+------------------------------------+ +------------------------------------------------------------------+----------------------------------------------------+ |long256 |bin | +------------------------------------------------------------------+----------------------------------------------------+ |0x8ee3c2a42acced0bb0bdb411c82bb01e7e487689228c189d1e865fa0e025973c|[F2 4D 4B F6 18 C2 9A A7 87 C2 D3 19 4A 2C 4B 92 C4]| +------------------------------------------------------------------+----------------------------------------------------+ +-----+----------+-----------------------+ |g5c |date |timestamp | +-----+----------+-----------------------+ |q661k|2022-02-01|2022-01-15 00:00:03.234| +-----+----------+-----------------------+ It seems that Spark can handle almost all database types. The only issue is long128, but this type is a work in progress currently in QuestDB. When completed, it will be mapped as String, just like long256. Writing Data Back Into the Database There is only one thing left: writing data back into QuestDB. In this example, first, we will load some data from the database and add two new features: 10-minute moving average standard deviation, also calculated over the last 10-minute window Then we will try to save the modified DataFrame back into QuestDB as a new table. We need to take care of some type mappings as Double columns are sent as FLOAT8 to QuestDB by default, so we end up with this code: Python from pyspark.sql import SparkSession from pyspark.sql.window import Window from pyspark.sql.functions import avg, stddev, when # create Spark session spark = SparkSession.builder.appName("questdb_test").getOrCreate() # load 1-minute aggregated trade data into the dataframe df = spark.read.format("jdbc") \ .option("url", "jdbc:postgresql://localhost:8812/questdb") \ .option("driver", "org.postgresql.Driver") \ .option("user", "admin").option("password", "quest") \ .option("dbtable", "(SELECT symbol, sum(amount) as volume, " "round((max(price)+min(price))/2, 2) as mid, " "timestamp as ts " "FROM trades WHERE symbol = 'BTC-USD' " "SAMPLE BY 1m ALIGN to CALENDAR) AS fake_table") \ .option("partitionColumn", "ts") \ .option("numPartitions", "3") \ .option("lowerBound", "2023-02-01T00:00:00.000000Z") \ .option("upperBound", "2023-02-04T00:00:00.000000Z") \ .load() # add new features window_10 = Window.partitionBy(df.symbol).rowsBetween(-10, Window.currentRow) df = df.withColumn("ma10", avg(df.mid).over(window_10)) df = df.withColumn("std", stddev(df.mid).over(window_10)) df = df.withColumn("std", when(df.std.isNull(), 0.0).otherwise(df.std)) # save the data as 'trades_enriched' df.write.format("jdbc") \ .option("url", "jdbc:postgresql://localhost:8812/questdb") \ .option("driver", "org.postgresql.Driver") \ .option("user", "admin").option("password", "quest") \ .option("dbtable", "trades_enriched") \ .option("createTableColumnTypes", "volume DOUBLE, mid DOUBLE, ma10 DOUBLE, std DOUBLE") \ .save() All works but…we soon realize that our new table, trades_enriched is not partitioned and does not have a designated timestamp, which is not ideal. Obviously Spark has no idea of QuestDB specifics. It would work better if we created the table upfront and Spark only saved the data into it. We drop the table and re-create it; this time, it is partitioned and has a designated timestamp: SQL DROP TABLE trades_enriched; CREATE TABLE trades_enriched ( volume DOUBLE, mid DOUBLE, ts TIMESTAMP, ma10 DOUBLE, std DOUBLE ) timestamp (ts) PARTITION BY DAY; The table is empty and waiting for the data. We rerun the code; all works, no complaints. The data is in the table, and it is partitioned. One aspect of writing data into the database is if we are allowed to create duplicates. What if I try to rerun the code again without dropping the table? Will Spark let me save the data this time? No, we get an error: Shell pyspark.sql.utils.AnalysisException: Table or view 'trades_enriched' already exists. SaveMode: ErrorIfExists. The last part of the error message looks interesting: SaveMode: ErrorIfExists. What is SaveMode? It turns out we can configure what should happen if the table already exists. Our options are: errorifexists: the default behavior is to return an error if the table already exists, Spark is playing safe here append : data will be appended to the existing rows already present in the table overwrite: the content of the table will be replaced entirely by the newly saved data ignore: if the table is not empty, our save operation gets ignored without any error We have already seen how errorifexists behaves, append and ignore seem to be simple enough just to work. However, overwrite is not straightforward. The content of the table must be cleared before the new data can be saved. Spark will delete and re-create the table by default, which means losing partitioning and the designated timestamp. In general, we do not want Spark to create tables for us. Luckily, with the truncate option we can tell Spark to use TRUNCATE to clear the table instead of deleting it: Python # save the data as 'trades_enriched', overwrite if already exists df.write.format("jdbc") \ .option("url", "jdbc:postgresql://localhost:8812/questdb") \ .option("driver", "org.postgresql.Driver") \ .option("user", "admin").option("password", "quest") \ .option("dbtable", "trades_enriched") \ .option("truncate", True) \ .option("createTableColumnTypes", "volume DOUBLE, mid DOUBLE, ma10 DOUBLE, std DOUBLE") \ .save(mode="overwrite") The above works as expected. Conclusion Our ride might seem bumpy, but we finally have everything working. Our new motto should be "There is a config option for everything!". To summarize what we have found: We can use Spark's JDBC data source to integrate with QuestDB. It is recommended to use the dbtable option, even if we use a SQL query to load data. Always try to specify partitioning options (partitionColumn, numPartitions, lowerBound and upperBound) when loading data, partitions ideally should match with the QuestDB partitions. Sometimes it makes sense to cache some data in Spark to reduce the number of trips to the database. It can be beneficial to push work down into the database, depending on the task and how much data is involved. It makes sense to make use of QuestDB's time-series-specific features, such as SAMPLE BY, instead of trying to rewrite it in Spark. Type mappings can be customized via the customSchema option when loading data. When writing data into QuestDB always specify the desired saving mode. Generally works better if you create the table upfront and do not let Spark create it, because this way you can add partitioning and designated timestamp. If selected the overwrite saving mode, you should enable the truncate option too to make sure Spark does not delete the table; hence partitioning and the designated timestamp will not get lost. Type mappings can be customized via the createTableColumnTypes option when saving data. I mentioned only the config options which are most likely to be tweaked when working with QuestDB; the complete set of options can be found here: Spark data source options. What Could the Future Bring? Overall everything works, but it would be nice to have a much more seamless way of integration, where partitioning would be taken care of automagically. Some type mappings could use better defaults, too, when saving data into QuestDB. The overwrite saving mode could default to use TRUNCATE. More seamless integration is not impossible to achieve. If QuestDB provided its own JDBCDialect implementation for Spark, the above nuances could be handled. We should probably consider adding this. Finally, there is one more thing we did not mention yet, data locality. That is because, currently QuestDB cannot run as a cluster. However, we are actively working on a solution — check out The Inner Workings of Distributed Databases for more information. When the time comes, we should ensure that data locality is also considered. Ideally, each Spark node would work on tasks that require partitions loaded from the local (or closest) QuestDB instance. However, this is not something we should be concerned about at this moment... for now, just enjoy data crunching!
In this blog post, you will be using the aws-lambda-go library along with the AWS Go SDK v2 for an application that will process records from an Amazon SNS topic and store them in a DynamoDB table. You will also learn how to use Go bindings for AWS CDK to implement “Infrastructure-as-code” for the entire solution and deploy it with the AWS CDK CLI. The code is available on GitHub. Introduction Amazon Simple Notification Service (SNS) is a highly available, durable, and scalable messaging service that enables the exchange of messages between applications or microservices. It uses a publish/subscribe model where publishers send messages to topics, and subscribers receive messages from topics they are interested in. Clients can subscribe to the SNS topic and receive published messages using a supported endpoint type, such as Amazon Kinesis Data Firehose, Amazon SQS, AWS Lambda, HTTP, email, mobile push notifications, and mobile text messages (SMS). AWS Lambda and Amazon SNS integration enable developers to build event-driven architectures that can scale automatically and respond to changes in real time. When a new message is published to an SNS topic, it can trigger a Lambda function (Amazon SNS invokes your function asynchronously with an event that contains a message and metadata) which can perform a set of actions, such as processing the message, storing data in a database, sending emails or SMS messages, or invoking other AWS services. Prerequisites Before you proceed, make sure you have the Go programming language (v1.18 or higher) and AWS CDK installed. Clone the project and change it to the right directory: Shell git clone https://github.com/abhirockzz/sns-lambda-events-golang cd sns-lambda-events-golang Use CDK To Deploy the Solution To start the deployment, simply invoke cdk deploy and wait for a bit. You will see a list of resources that will be created and will need to provide your confirmation to proceed. Shell cd cdk cdk deploy # output Bundling asset SNSLambdaGolangStack/sns-function/Code/Stage... ✨ Synthesis time: 5.94s This deployment will make potentially sensitive changes according to your current security approval level (--require-approval broadening). Please confirm you intend to make the following modifications: //.... omitted Do you wish to deploy these changes (y/n)? y This will start creating the AWS resources required for our application. If you want to see the AWS CloudFormation template which will be used behind the scenes, run cdk synth and check the cdk.out folder. You can keep track of the progress in the terminal or navigate to AWS console: CloudFormation > Stacks > SNSLambdaGolangStack Once all the resources are created, you can try out the application. You should have: A Lambda function A SNS topic A DynamoDB table Along with a few other components (like IAM roles etc.) Verify the Solution You can check the table and SNS info in the stack output (in the terminal or the Outputs tab in the AWS CloudFormation console for your Stack): Send few messages to the SNS topic. For the purposes of this demo, you can use the AWS CLI: Shell export SNS_TOPIC_ARN=<enter the queue url from cloudformation output> aws sns publish --topic-arn $SNS_TOPIC_ARN --message "user1@foo.com" --message-attributes 'name={DataType=String, StringValue="user1"}, city={DataType=String,StringValue="seattle"}' aws sns publish --topic-arn $SNS_TOPIC_ARN --message "user2@foo.com" --message-attributes 'name={DataType=String, StringValue="user2"}, city={DataType=String,StringValue="new delhi"}' aws sns publish --topic-arn $SNS_TOPIC_ARN --message "user3@foo.com" --message-attributes 'name={DataType=String, StringValue="user3"}, city={DataType=String,StringValue="new york"}' You can also use the AWS console to send SQS messages. Check the DynamoDB table to confirm that the file metadata has been stored. You can use the AWS console or the AWS CLI aws dynamodb scan --table-name <enter the table name from cloudformation output> Don’t Forget To Clean Up Once you’re done, to delete all the services, simply use: Shell cdk destroy #output prompt (choose 'y' to continue) Are you sure you want to delete: SQSLambdaGolangStack (y/n)? You were able to setup and try the complete solution. Before we wrap up, let’s quickly walk through some of important parts of the code to get a better understanding of what’s going the behind the scenes. Code Walk Through Some of the code (error handling, logging etc.) has been omitted for brevity since we only want to focus on the important parts. CDK You can refer to the CDK code here. We start by creating a DynamoDB table: Shell table := awsdynamodb.NewTable(stack, jsii.String("dynamodb-table"), &awsdynamodb.TableProps{ PartitionKey: &awsdynamodb.Attribute{ Name: jsii.String("email"), Type: awsdynamodb.AttributeType_STRING}, }) table.ApplyRemovalPolicy(awscdk.RemovalPolicy_DESTROY) Then, we handle the Lambda function (CDK will take care of building and deploying the function) and make sure we provide it appropriate permissions to write to the DynamoDB table. Shell function := awscdklambdagoalpha.NewGoFunction(stack, jsii.String("sns-function"), &awscdklambdagoalpha.GoFunctionProps{ Runtime: awslambda.Runtime_GO_1_X(), Environment: &map[string]*string{"TABLE_NAME": table.TableName()}, Entry: jsii.String(functionDir), }) table.GrantWriteData(function) Then, we create the SNS topic and add that as an event source to the Lambda function. Shell snsTopic := awssns.NewTopic(stack, jsii.String("sns-topic"), nil) function.AddEventSource(awslambdaeventsources.NewSnsEventSource(snsTopic, nil)) Finally, we export the SNS topic and DynamoDB table name as CloudFormation outputs. Shell awscdk.NewCfnOutput(stack, jsii.String("sns-topic-name"), &awscdk.CfnOutputProps{ ExportName: jsii.String("sns-topic-name"), Value: snsTopic.TopicName()}) awscdk.NewCfnOutput(stack, jsii.String("dynamodb-table-name"), &awscdk.CfnOutputProps{ ExportName: jsii.String("dynamodb-table-name"), Value: table.TableName()}) Lambda Function You can refer to the Lambda Function code here. The Lambda function handler iterates over each SNS topic, and for each of them: Stores the message body in the primary key attribute (email) of the DynamoDB table Rest of the message attributes are stored as is. Shell func handler(ctx context.Context, snsEvent events.SNSEvent) { for _, record := range snsEvent.Records { snsRecord := record.SNS item := make(map[string]types.AttributeValue) item["email"] = &types.AttributeValueMemberS{Value: snsRecord.Message} for attrName, attrVal := range snsRecord.MessageAttributes { fmt.Println(attrName, "=", attrVal) attrValMap := attrVal.(map[string]interface{}) dataType := attrValMap["Type"] val := attrValMap["Value"] switch dataType.(string) { case "String": item[attrName] = &types.AttributeValueMemberS{Value: val.(string)} } } _, err := client.PutItem(context.Background(), &dynamodb.PutItemInput{ TableName: aws.String(table), Item: item, }) } } Wrap Up In this blog, you saw an example of how to use Lambda to process messages sent to SNS and store them in DynamoDB, thanks to the SNS and Lamdba integration. The entire infrastructure life-cycle was automated using AWS CDK. All this was done using the Go programming language, which is well-supported in DynamoDB, AWS Lambda, and AWS CDK. Happy building!
One of the more notable aspects of ChatGPT is its engine, which not only powers the web-based chatbot but can also be integrated into your Java applications. Whether you prefer reading or watching, let’s review how to start using the OpenAI GPT engine in your Java projects in a scalable way, by sending prompts to the engine only when necessary: Budget Journey App Imagine you want to visit a city and have a specific budget in mind. How should you spend the money and make your trip memorable? This is an excellent question to delegate to the OpenAI engine. Let’s help users get the most out of their trips by building a simple Java application called BudgetJourney. The app can suggest multiple points of interest within a city, tailored to fit specific budget constraints. The architecture of the BudgetJourney app looks as follows: The users open a BudgetJourney web UI that runs on Vaadin. Vaadin connects to a Spring Boot backend when users want to get recommendations for a specific city and budget. Spring Boot connects to a YugabyteDB database instance to check if there are already any suggestions for the requested city and budget. If the data is already in the database, the response is sent back to the user. Otherwise, Spring Boot connects to the OpenAI APIs to get recommendations from the neural network. The response is stored in YugabyteDB for future reference and sent back to the user. Now, let’s see how the app communicates with the Open AI engine (step 4) and how using the database (step 3) makes the solution scalable and cost-effective. OpenAI Java Library The OpenAI engine can be queried via the HTTP API. You need to create an account, get your token (i.e., API key) and use that token while sending requests to one of the OpenAI models. A model in the context of OpenAI is a computational construct trained on a large dataset to recognize patterns, make predictions, or perform specific tasks based on input data. Presently, the service supports several models that can understand and generate natural language, code, images, or convert audio into text. Our BudgetJourney app uses the GPT-3.5 model which understands and generates natural language or code. The app asks the model to suggest several points of interest within a city while considering budget constraints. The model then returns the suggestions in a JSON format. The open-source OpenAI Java library implements the GPT-3.5 HTTP APIs, making it easy to communicate with the service via well-defined Java abstractions. Here’s how you get started with the library: Add the latest OpenAI Java artifact to your pom.xml file. XML <dependency> <groupId>com.theokanning.openai-gpt3-java</groupId> <artifactId>service</artifactId> <version>${version}</version> </dependency> Create an instance of the OpenAiService class by providing your token and a timeout for requests between the app and OpenAI engine. Java OpenAiService openAiService = new OpenAiService( apiKey, Duration.ofSeconds(apiTimeout)); Easy! Next, let’s see how you can work with the GPT-3.5 model via the OpenAiService instance. Sending Prompts to GPT-3.5 Model You communicate with the OpenAI models by sending text prompts that tell what you expect a model to do. The model behaves best when your instructions are clear and include examples. To build a prompt for the GPT-3.5 model, you use the ChatCompletionRequest API of the OpenAI Java library: Java ChatCompletionRequest chatCompletionRequest = ChatCompletionRequest .builder() .model(“gpt-3.5-turbo”) .temperature(0.8) .messages( List.of( new ChatMessage("system", SYSTEM_TASK_MESSAGE), new ChatMessage("user", String.format("I want to visit %s and have a budget of %d dollars", city, budget)))) .build(); model(“gpt-3.5-turbo”) is an optimized version of the GPT-3.5 model. temperature(...) controls how much randomness and creativity to expect in a model’s response. For instance, higher values like 0.8 will make the output more random, while lower values like 0.2 will make it more deterministic. messages(...) are the actual instructions or prompts to the model. There are “system” messages that instruct the model to behave a certain way, “assistant” messages that store previous responses, and “user” messages that carry user requests with asks. The SYSTEM_TASK_MESSAGE of the BudgetJourney app looks as follows: You are an API server that responds in a JSON format. Don't say anything else. Respond only with the JSON. The user will provide you with a city name and available budget. While considering that budget, you must suggest a list of places to visit. Allocate 30% of the budget to restaurants and bars. Allocate another 30% to shows, amusement parks, and other sightseeing. Dedicate the remainder of the budget to shopping. Remember, the user must spend 90-100% of the budget. Respond in a JSON format, including an array named 'places'. Each item of the array is another JSON object that includes 'place_name' as a text, 'place_short_info' as a text, and 'place_visit_cost' as a number. Don't add anything else after you respond with the JSON. Although wordy and in need of optimization, this system message conveys the desired action: to suggest multiple points of interest with maximal budget utilization and to provide the response in JSON format, which is essential for the rest of the application. Once you created the prompt (ChatCompletionRequest) providing both the system and user messages as well as other parameters, you can send it via the OpenAiService instance: Java OpenAiService openAiService = … //created earlier StringBuilder builder = new StringBuilder(); openAiService.createChatCompletion(chatCompletionRequest) .getChoices().forEach(choice -> { builder.append(choice.getMessage().getContent()); }); String jsonResponse = builder.toString(); The jsonResponse object is then further processed by the rest of the application logic which prepares a list of points of interest and displays them with the help of Vaadin. For example, suppose a user is visiting Tokyo and wants to spend up to $900 in the city. The model will strictly follow our instructions from the system message and respond with the following JSON: JSON { "places": [ { "place_name": "Tsukiji Fish Market", "place_short_info": "Famous fish market where you can eat fresh sushi", "place_visit_cost": 50 }, { "place_name": "Meiji Shrine", "place_short_info": "Beautiful Shinto shrine in the heart of Tokyo", "place_visit_cost": 0 }, { "place_name": "Shibuya Crossing", "place_short_info": "Iconic pedestrian crossing with bright lights and giant video screens", "place_visit_cost": 0 }, { "place_name": "Tokyo Skytree", "place_short_info": "Tallest tower in the world, offering stunning views of Tokyo", "place_visit_cost": 30 }, { "place_name": "Robot Restaurant", "place_short_info": "Unique blend of futuristic robots, dancers, and neon lights", "place_visit_cost": 80 }, // More places ]} This JSON is then converted into a list of different points of interest. It is then shown to the user: NOTE: The GPT-3.5 model was trained on the Sep 2021 data set. Therefore, it can’t provide 100% accurate and relevant trip recommendations. However, this inaccuracy can be improved with the help of OpenAI plugins that give models access to real-time data. For instance, once the Expedia plugin for OpenAI becomes publicly available as an API, this will let you improve this BudgetJourney app further. Scaling With a Database As you can see, it’s straightforward to integrate the neural network into your Java applications and communicate with it in a way similar to other 3rd party APIs. You can also tune the API behavior, such as adding a desired output format. But, this is still a 3rd party API that charges you for every request. The more prompts you send and the longer they are, the more you pay. Nothing comes for free. Plus, it takes time for the model to process your prompts. For instance, it can take 10-30 seconds before the BudgetJourney app receives a complete list of recommendations from OpenAI. This might be overkill, especially if different users send similar prompts. To make OpenAI GPT applications scalable, it’s worth storing the model responses in a database. That database allows you to: Reduce the volume of requests to the OpenAI API and, therefore, the associated costs. Serve user requests with low latency by returning previously processed (or preloaded) recommendations from the database. The BudgetJourney app uses the YugabyteDB database due to its ability to scale globally and store the model responses close to the user locations. With the geo-partitioned deployment mode, you can have a single database cluster with the data automatically pinned to and served from various geographies with low latency. A custom geo-partitioning column (the “region” column in the picture above) lets the database decide on a target row location. For instance, the database nodes from Europe already store recommendations for a trip to Miami on a $1500 budget. Next, suppose a user from Europe wants to go to Miami and spend that amount. In that case, the application can respond within a few milliseconds by getting the recommendations straight from the database nodes in the same geography. The BudgetJourney app uses the following JPA repository to get recommendations from the YugabyteDB cluster: Java @Repository public interface CityTripRepository extends JpaRepository<CityTrip, Integer> { @Query("SELECT pointsOfInterest FROM CityTrip WHERE cityName=?1 and budget=?2 and region=?3") String findPointsOfInterest(String cityName, Integer budget, String region); } With an Entity class looking as follows: Java @Entity public class CityTrip { @Id @GeneratedValue(strategy = GenerationType.SEQUENCE, generator = "landmark_generator") @SequenceGenerator(name = "landmark_generator", sequenceName = "landmark_sequence", allocationSize = 5) int id; @NotEmpty String cityName; @NotNull Integer budget; @NotEmpty @Column(columnDefinition = "text") String pointsOfInterest; @NotEmpty String region; //The rest of the logic } So, all you need to do is to make a call to the database first, then revert to the OpenAI API if relevant suggestions are not yet available in the database. As your application increases in popularity, more and more local recommendations will be available, making this approach even more cost-effective over time. Wrapping Up A ChatGPT web-based chatbot is an excellent way to demonstrate the OpenAI engine’s capabilities. Explore the engine’s powerful models and start building new types of Java applications. Just make sure you do it in a scalable way!
Previous Articles on CockroachDB CDC Using CockroachDB CDC with Azure Event Hubs Using CockroachDB CDC with Confluent Cloud Kafka and Schema Registry SaaS Galore: Integrating CockroachDB with Confluent Kafka, Fivetran, and Snowflake CockroachDB CDC using Minio as a cloud storage sink CockroachDB CDC using Hadoop Ozone S3 Gateway as a cloud storage sink Motivation Apache Pulsar is a cloud-native distributed messaging and streaming platform. In my customer conversations, it most often comes up when compared to Apache Kafka. I have a customer needing a Pulsar sink support as they rely on Pulsar's multi-region capabilities. CockroachDB does not have a native Pulsar sink; however, the Pulsar project supports Kafka on Pulsar protocol support, and that's the core of today's article. This tutorial assumes you have an enterprise license, you can also leverage our managed offerings where enterprise changefeeds are enabled by default. I am going to demonstrate the steps using a Docker environment instead. High-Level Steps Deploy Apache Pulsar Deploy a CockroachDB cluster with enterprise changefeeds Deploy a Kafka Consumer Verify Conclusion Step-By-Step Instructions Deploy Apache Pulsar Since I'm using Docker, I'm relying on the KoP Docker Compose environment provided by the Stream Native platform, which spearheads the development of Apache Pulsar. I've used the service taken from the KoP example almost as-is aside from a few differences: pulsar: container_name: pulsar hostname: pulsar image: streamnative/sn-pulsar:2.11.0.5 command: > bash -c "bin/apply-config-from-env.py conf/standalone.conf && exec bin/pulsar standalone -nss -nfw" # disable stream storage and functions worker environment: allowAutoTopicCreationType: partitioned brokerDeleteInactiveTopicsEnabled: "false" PULSAR_PREFIX_messagingProtocols: kafka PULSAR_PREFIX_kafkaListeners: PLAINTEXT://pulsar:9092 PULSAR_PREFIX_brokerEntryMetadataInterceptors: org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor PULSAR_PREFIX_webServicePort: "8088" ports: - 6650:6650 - 8088:8088 - 9092:9092 I removed PULSAR_PREFIX_kafkaAdvertisedListeners: PLAINTEXT://127.0.0.1:19092 as I don't need it, I also changed the exposed port for - 19092:9092 to - 9092:9092. My PULSAR_PREFIX_kafkaListeners address points to a Docker container with the hostname pulsar. I will need to access the address from other containers and I can't rely on the localhost. I'm also using a more recent version of the image than the one in their docs. Deploy a CockroachDB Cluster With Enterprise Changefeeds I am using a 3-node cluster in Docker. If you've followed my previous articles, you should be familiar with it. I am using Flyway to set up the schema and seed the tables. The actual schema and data are taken from the changefeed examples we have in our docs. The only difference is I'm using a database called example. To enable CDC we need to execute the following commands: SET CLUSTER SETTING cluster.organization = '<organization name>'; SET CLUSTER SETTING enterprise.license = '<secret>'; SET CLUSTER SETTING kv.rangefeed.enabled = true; Again, if you don't have an enterprise license, you won't be able to complete this tutorial. Feel free to use our Dedicated or Serverless instances if you want to follow along. Finally, after the tables and the data are in place, we can create a changefeed on these tables. CREATE CHANGEFEED FOR TABLE office_dogs, employees INTO 'kafka://pulsar:9092'; Here I am using the Kafka port and the address of the Pulsar cluster, in my case pulsar. job_id ---------------------- 855538618543276035 (1 row) NOTICE: changefeed will emit to topic office_dogs NOTICE: changefeed will emit to topic employees Time: 50ms total (execution 49ms / network 1ms) Everything seems to work and changefeed does not error out. Deploy a Kafka Consumer To validate data is being written to Pulsar, we need to stand up a Kafka client. I've created an image that downloads and installs Kafka. Once the entire Docker Compose environment is running, we can access the client and run the console consumer to verify. /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server pulsar:9092 --topic office_dogs --from-beginning {"after": {"id": 1, "name": "Petee H"} {"after": {"id": 2, "name": "Carl"} If we want to validate that new data is flowing, let's insert another record into CockroachDB: INSERT INTO office_dogs VALUES (3, 'Test'); The consumer will print a new row: {"after": {"id": 3, "name": "Test"} Since we've created two topics, let's now look at the employees topic. /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server pulsar:9092 --topic employees --from-beginning {"after": {"dog_id": 1, "employee_name": "Lauren", "rowid": 855539880336523267} {"after": {"dog_id": 2, "employee_name": "Spencer", "rowid": 855539880336654339} Similarly, let's update a record and see the changes propagate to Pulsar. UPDATE employees SET employee_name = 'Spencer Kimball' WHERE dog_id = 2; {"after": {"dog_id": 2, "employee_name": "Spencer Kimball", "rowid": 855539880336654339} Verify We've confirmed we can produce messages to Pulsar topics using the Kafka protocol via KoP. We've also confirmed we can consume using the Kafka console consumer. We can also use the native Pulsar tooling to confirm the data is consumable from Pulsar. I installed the Pulsar Python client, pip install pulsar-client, on the Kafka client machine and created a Python script with the following code: import pulsar client = pulsar.Client('pulsar://pulsar:6650') consumer = client.subscribe('employees', subscription_name='my-sub') while True: msg = consumer.receive() print("Received message: '%s'" % msg.data()) consumer.acknowledge(msg) client.close() I execute the script: root@kafka-client:/opt/kafka# python3 consume_messages.py 2023-04-11 14:17:21.761 INFO [281473255101472] Client:87 | Subscribing on Topic :employees 2023-04-11 14:17:21.762 INFO [281473255101472] ClientConnection:190 | [<none> -> pulsar://pulsar:6650] Create ClientConnection, timeout=10000 2023-04-11 14:17:21.762 INFO [281473255101472] ConnectionPool:97 | Created connection for pulsar://pulsar:6650 2023-04-11 14:17:21.763 INFO [281473230237984] ClientConnection:388 | [172.28.0.3:33826 -> 172.28.0.6:6650] Connected to broker 2023-04-11 14:17:21.771 INFO [281473230237984] HandlerBase:72 | [persistent://public/default/employees-partition-0, my-sub, 0] Getting connection from pool 2023-04-11 14:17:21.776 INFO [281473230237984] ClientConnection:190 | [<none> -> pulsar://pulsar:6650] Create ClientConnection, timeout=10000 2023-04-11 14:17:21.776 INFO [281473230237984] ConnectionPool:97 | Created connection for pulsar://localhost:6650 2023-04-11 14:17:21.776 INFO [281473230237984] ClientConnection:390 | [172.28.0.3:33832 -> 172.28.0.6:6650] Connected to broker through proxy. Logical broker: pulsar://localhost:6650 2023-04-11 14:17:21.786 INFO [281473230237984] ConsumerImpl:238 | [persistent://public/default/employees-partition-0, my-sub, 0] Created consumer on broker [172.28.0.3:33832 -> 172.28.0.6:6650] 2023-04-11 14:17:21.786 INFO [281473230237984] MultiTopicsConsumerImpl:274 | Successfully Subscribed to a single partition of topic in TopicsConsumer. Partitions need to create : 0 2023-04-11 14:17:21.786 INFO [281473230237984] MultiTopicsConsumerImpl:137 | Successfully Subscribed to Topics Let's insert a record into the employees tables: INSERT INTO employees (dog_id, employee_name) VALUES (3, 'Test'); UPDATE employees SET employee_name = 'Artem' WHERE dog_id = 3; The Pulsar client output is as follows: Received message: 'b'{"after": {"dog_id": 3, "employee_name": "Test", "rowid": 855745376561364994}'' Received message: 'b'{"after": {"dog_id": 3, "employee_name": "Artem", "rowid": 855745376561364994}'' Conclusion This is how you can leverage existing CockroachDB capability with non-standard services like Apache Pulsar. Hopefully, you've found this article useful and can start leveraging the existing Kafka sink with non-standard message brokers.
Apache Kafka is a battle-tested distributed stream-processing platform popular in the financial industry to handle mission-critical transactional workloads. Kafka’s ability to handle large volumes of real-time market data makes it a core infrastructure component for trading, risk management, and fraud detection. Financial institutions use Kafka to stream data from market data feeds, transaction data, and other external sources to drive decisions. A common data pipeline to ingest and store financial data involves publishing real-time data to Kafka and utilizing Kafka Connect to stream that to databases. For example, the market data team may continuously update real-time quotes for a security to Kafka, and the trading team may consume that data to make buy/sell orders. Processed market data and orders may then be saved to a time series database for further analysis. In this article, we’ll create a sample data pipeline to illustrate how this could work in practice. We will poll an external data source (FinnHub) for real-time quotes of stocks and ETFs, and publish that information to Kafka. Kafka Connect will then grab those records and publish them to a time series database (QuestDB) for analysis. Prerequisites Git Docker Engine: 20.10+ Golang 1.19+ FinnHub API Token Setup To run the example locally, first clone the repo. The codebase is organized into three parts: Golang code is located at the root of the repo. Dockerfile for the Kafka Connect QuestDB image and the Docker Compose YAML file is under docker. JSON files for Kafka Connect sinks are under kafka-connect-sinks. Building the Kafka Connect QuestDB Image We first need to build the Kafka Connect docker image with QuestDB Sink connector. Navigate to the docker directory and run docker-compose build. The Dockerfile is simply installing the Kafka QuestDB Connector via Confluent Hub on top of the Confluent Kafka Connect base image: FROM confluentinc/cp-kafka-connect-base:7.3.2 RUN confluent-hub install --no-prompt questdb/kafka-questdb-connector:0.6 Start Kafka, Kafka Connect, and QuestDB Next, we will set up the infrastructure via Docker Compose. From the same docker directory, run Docker Compose in the background: docker-compose up -d This will start Kafka + Zookeeper, our custom Kafka Connect image with the QuestDB Connector installed, as well as QuestDB. The full content of the Docker Compose file is as follows: --- version: '2' services: zookeeper: image: confluentinc/cp-zookeeper:7.3.2 hostname: zookeeper container_name: zookeeper ports: - "2181:2181" environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 broker: image: confluentinc/cp-server:7.3.2 hostname: broker container_name: broker depends_on: - zookeeper ports: - "9092:9092" - "9101:9101" environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092 KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1 KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 KAFKA_JMX_PORT: 9101 KAFKA_JMX_HOSTNAME: localhost CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092 CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1 CONFLUENT_METRICS_ENABLE: 'true' CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous' kafka-connect: image: cp-kafka-connect-questdb build: context: . hostname: connect container_name: connect depends_on: - broker - zookeeper ports: - "8083:8083" environment: CONNECT_BOOTSTRAP_SERVERS: 'broker:29092' CONNECT_REST_ADVERTISED_HOST_NAME: connect CONNECT_REST_PORT: 8083 CONNECT_GROUP_ID: compose-connect-group CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1 CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000 CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1 CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1 CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter questdb: image: questdb/questdb hostname: questdb container_name: questdb ports: - "9000:9000" - "9009:9009" Start the QuestDB Kafka Connect Sink Wait for the Docker containers to be healthy (the kafka-connect image will log “Finished starting connectors and tasks” message), and we can create our Kafka Connect sinks. We will create two sinks: one for Tesla and one for SPY (SPDR S&P 500 ETF) to compare price trends of a volatile stock and the overall market. Issue the following curl command to create the Tesla sink within the kafka-connect-sinks directory: curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" --data @questdb-sink-TSLA.json http://localhost:8083/connectors The JSON file it posts contains the following configurations. { "name": "questdb-sink-SPY", "config": { "connector.class":"io.questdb.kafka.QuestDBSinkConnector", "tasks.max":"1", "topics": "topic_SPY", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": "false", "value.converter.schemas.enable": "false", "host": "questdb", "timestamp.field.name": "timestamp" } } Create the sink for SPY as well: curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" --data @questdb-sink-SPY.json http://localhost:8083/connectors Streaming Real Time Stock Quotes Now that we have our data pipeline set up, we are ready to stream real-time stock quotes to Kafka and store them in QuestDB. First, we need to get a free API token from Finnhub Stock API. Create a free account online and copy the API key. Export that key to our shell under FINNHUB_TOKEN : export FINNHUB_TOKEN=<my-token-here> The real-time quote endpoint returns various attributes such as the current price, high/low/open quotes, as well as previous close price. Since we are just interested in the current price, we only grab the price and add the ticket symbol and timestamp to the Kafka JSON message. The code below will grab the quote every 30 seconds and publish to the Kafka topic: topic_TSLA . package main import ( "encoding/json" "fmt" "io/ioutil" "net/http" "os" "time" "github.com/confluentinc/confluent-kafka-go/v2/kafka" ) type StockData struct { Price float64 `json:"c"` } type StockDataWithTime struct { Symbol string `json:"symbol"` Price float64 `json:"price"` Timestamp int64 `json:"timestamp"` } func main() { // Create a new Kafka producer instance p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092"}) if err != nil { panic(fmt.Sprintf("Failed to create producer: %s\n", err)) } defer p.Close() for { token, found := os.LookupEnv("FINNHUB_TOKEN") if !found { panic("FINNHUB_TOKEN is undefined") } symbol := "TSLA" url := fmt.Sprintf("https://finnhub.io/api/v1/quote?symbol=%s&token=%s", symbol, token) // Retrieve the stock data resp, err := http.Get(url) if err != nil { fmt.Println(err) return } defer resp.Body.Close() // Read the response body body, err := ioutil.ReadAll(resp.Body) if err != nil { fmt.Println(err) return } // Unmarshal the JSON data into a struct var data StockData err = json.Unmarshal(body, &data) if err != nil { fmt.Println(err) return } // Format data with timestamp tsData := StockDataWithTime{ Symbol: symbol, Price: data.Price, Timestamp: time.Now().UnixNano() / 1000000, } jsonData, err := json.Marshal(tsData) if err != nil { fmt.Println(err) return } topic := fmt.Sprintf("topic_%s", symbol) err = p.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: jsonData, }, nil) if err != nil { fmt.Printf("Failed to produce message: %s\n", err) } fmt.Printf("Message published to Kafka: %s", string(jsonData)) time.Sleep(30 * time.Second) } } To start streaming the data, run the code: $ go run main.go To also get data for SPY, open up another terminal window, modify the code for the symbol to SPY and run the code as well with the token value set. Result After running the producer code, it will print out messages that it sends to Kafka like: Message published to Kafka: {“symbol”:”TSLA”,”price”:174.48,”timestamp”:1678743220215} . This data is sent to the Kafka topic topic_TSLA and sent to QuestDB via the Kafka Connect sink. We can then navigate to localhost:9000 to access the QuestDB console. Searching for all records in the topic_TSLA table, we can see our real-time market quotes: SELECT * FROM ‘topic_TSLA’ We can also look at SPY data from topic_SPY : SELECT * FROM ‘topic_SPY’ With the data now in QuestDB, we can query for aggregate information by getting the average price over a 2m window: SELECT avg(price), timestamp FROM topic_SPY SAMPLE BY 2m; Conclusion Kafka is a trusted component of data pipelines handling large amounts of time series data such as financial data. Kafka can be used to stream mission-critical source data to multiple destinations, including time series databases suited for real-time analytics. In this article, we created a reference implementation of how to poll real-time market data and use Kafka to stream that to QuestDB via Kafka Connect. For more information on the QuestDB Kafka connector, check out the overview page on the QuestDB website. It lists more information on the configuration details and FAQs on setting it up. The GitHub repo for the connector also has sample projects including a Node.js and Java example for those looking to extend this reference architecture.
In this article, I’ll explain how to use database hooks in your Node.js applications to solve specific problems that might arise in your development journey. Many applications require little more than establishing a connection pool between a server, database, and executing queries. However, depending on your application and database deployments, additional configurations might be necessary. For example, multi-region distributed SQL databases can be deployed with different topologies depending on the application use case. Some topologies require setting properties on the database on a per-session basis. Let’s explore some of the hooks made available by some of the most popular database clients and ORMs in the Node.js ecosystem. Laying the Foundation The Node.js community has many drivers to choose from when working with the most popular relational databases. Here, I’m going to focus on PostgreSQL-compatible database clients, which can be used to connect to YugabyteDB or another PostgreSQL database. Sequelize, Prisma, Knex and node-postgres are popular clients with varying feature sets depending on your needs. I encourage you to read through their documentation to determine which best suits your needs. These clients come with hooks for different use cases. For instance: Connection hooks: Execute a function immediately before or after connecting and disconnecting from your database. Logging hooks: Log messages to stdout at various log levels. Lifecycle hooks: Execute a function immediately before or after making calls to the database. In this article, I’ll cover some of the hooks made available by these clients and how you can benefit from using them in your distributed SQL applications. I’ll also demonstrate how to use hooks to hash a user's password before creation and how to set runtime configuration parameters after connecting to a multi-region database with read replicas. Sequelize The Sequelize ORM has a number of hooks for managing the entire lifecycle of your database transactions. The beforeCreate lifecycle hook can be used to hash a password before creating a new user: JavaScript User.beforeCreate(async (user, options) => { const hashedPassword = await hashPassword(user.password); user.password = hashedPassword; }); Next, I’m using the afterConnect connection hook to set session parameters. With this YugabyteDB deployment, you can execute reads from followers to reduce latencies, and eliminate the need to read from the primary cluster nodes: JavaScript const config = { host: process.env.DB_HOST, port: 5433, dialect: "postgres", dialectOptions: { ssl: { require: true, rejectUnauthorized: true, ca: [CERTIFICATE], }, }, pool: { max: 5, min: 1, acquire: 30000, idle: 10000, }, hooks: { async afterConnect(connection) { if (process.env.DB_DEPLOYMENT_TYPE === "multi_region_with_read_replicas") { await connection.query("set yb_read_from_followers = true; set session characteristics as transaction read only;"); } }, }, }; const connection = new Sequelize( process.env.DATABASE_NAME, process.env.DATABASE_USER, process.env.DATABASE_PASSWORD, config ); By using this hook, each database session in the connection pool will set these parameters upon establishing a new connection: set yb_read_from_followers = true;: This parameter controls whether or not reading from followers is enabled. set session characteristics as transaction read only;: This parameter applies the read-only setting to all statements and transaction blocks that follow. Prisma Despite being the ORM of choice for many in the Node.js community, at the time of writing, Prisma doesn’t contain many of the built-in hooks found in Sequelize. Currently, the library contains hooks to handle the query lifecycle, logging, and disconnecting, but offers no help before or after establishing connections. Here’s how you can use Prisma’s lifecycle middleware to hash a password before creating a user: JavaScript prisma.$use(async (params, next) => { if (params.model == 'User' && params.action == 'create') { params.args.data.password = await hashPassword(params.args.data.password); } return next(params) }) const create = await prisma.user.create({ data: { username: 'bhoyer', password: 'abc123' }, }) To set session parameters to make use of our read replicas, we’ll have to execute a statement before querying our database: JavaScript await prisma.$executeRaw(`set yb_read_from_followers = true; set session characteristics as transaction read only;`); const users = await prisma.user.findMany(); If you need to immediately establish a connection in your connection pool to set a parameter, you can connect explicitly with Prisma to forgo the lazy connection typical of connection pooling. Prisma has the log levels of query , error, info, and warn. Queries can be handled as events using event-based logging: JavaScript const prisma = new PrismaClient({ log: [ { emit: 'event', level: 'query', }, { emit: 'stdout', level: 'error', }, { emit: 'stdout', level: 'info', }, { emit: 'stdout', level: 'warn', }, ], }); prisma.$on('query', (e) => { console.log('Query: ' + e.query); console.log('Params: ' + e.params); console.log('Duration: ' + e.duration + 'ms'); }); This can be helpful in development when working on query tuning in a distributed system. Here’s how you can make use of the beforeExit hook to access the database before exiting: JavaScript const prisma = new PrismaClient(); prisma.$on('beforeExit', async () => { // PrismaClient still available await prisma.issue.create({ data: { message: 'Connection exiting.' }, }) }); Knex Knex is a lightweight query builder, but it does not have the query middleware found in more full-featured ORMs. To hash a password, you can process this manually using a custom function: JavaScript async function handlePassword(password) { const hashedPassword = await hashPassword(password); return hashedPassword; } const password = await handlePassword(params.password); knex('users').insert({...params, password}); The syntax required to achieve a connection hook in the Knex.js query builder is similar to that of Sequelize. Here’s how we can set our session parameters to read from YugabyteDB’s replica nodes: JavaScript const knex = require('knex')({ client: 'pg', connection: {/*...*/}, pool: { afterCreate: function (connection, done) { connection.query('set yb_read_from_followers = true; set session characteristics as transaction read only;', function (err) { if (err) { //Query failed done(err, conn); } else { console.log("Reading from replicas."); done(); } }); } } }); node-postgres The node-postgres library is the most low-level of all of the libraries discussed. Under the hood, the Node.js EventEmitter is used to emit connection events. A connect event is triggered when a new connection is established in the connection pool. Let’s use it to set our session parameters. I’ve also added an error hook to catch and log all error messages: JavaScript const config = { user: process.env.DB_USER, host: process.env.DB_HOST, password: process.env.DB_PASSWORD, port: 5433, database: process.env.DB_NAME, min: 1, max: 10, idleTimeoutMillis: 5000, connectionTimeoutMillis: 5000, ssl: { rejectUnauthorized: true, ca: [CERTIFICATE], servername: process.env.DB_HOST, } }; const pool = new Pool(config); pool.on("connect", (c) => { c.query("set yb_read_from_followers = true; set session characteristics as transaction read only;"); }); pool.on("error", (e) => { console.log("Connection error: ", e); }); There aren’t any lifecycle hooks at our disposal with node-postgres, so hashing our password will have to be done manually, like with Prisma: JavaScript async function handlePassword(password) { const hashedPassword = await hashPassword(password); return hashedPassword; } const password = await handlePassword(params.password); const user = await pool.query('INSERT INTO user(username, password) VALUES ($1, $2) RETURNING *', [params.username, password]); Wrapping Up As you can see, hooks can solve a lot of the problems previously addressed by complicated and error-prone application code. Each application has a different set of requirements and brings new challenges. You might go years before you need to utilize a particular hook in your development process, but now, you’ll be ready when that day comes. Look out for more from me on Node.js and distributed application development. Until then, keep on coding!
This is how PostgreSQL's official documentation defines Indexes, pretty simply and clearly — "Indexes are a common way to enhance database performance. An index allows the database server to find and retrieve specific rows much faster than it could do without an index. But indexes also add overhead to the database system as a whole, so they should be used sensibly." The last word, "sensibly," in this definition, is the crux of this article. Indexes are good, and they enhance the performance of queries. This does not imply that we create Indexes for every query and every column. It is important to remember that while Indexes do enhance Performance, they do need to be maintained, and that is overhead. While working with PostgreSQL Performance, below are some IMPORTANT takeaways I found for PostgreSQL Indexes that will help design Indexes that enhance Performance, where the Performance gains outweigh the maintenance overhead: 1. When deploying New/Modified Indexes to Production Environment, use the CONCURRENTLY option with the CREATE Index command. This will allow “Writes” on the Database to proceed seamlessly. There are conditions under which this command can be used and needs to be monitored closely during deployment as well, as failure in deployment may result in an invalid index which would need to be removed manually. 2. Indexes with multiple columns in their definition must be used as sparingly as possible. The PostgreSQL Planner uses the Leading column as the Index for the major “Filtering Criteria,” hence the presence of other columns in the definition though will be used for inequality comparison or for fetching data, is mostly more of a maintenance overhead than a performance benefit. 3. We can design Indexes with a “WHERE CLAUSE” in the Index Definition, called partial indexes. Saves on both space and time but needs to be used very carefully, only when we are absolutely sure that the condition will either be directly or arithmetically connected to the query, or else the index can become a maintenance overhead with no performance benefit. 4. If your workload includes a mix of queries that sometimes involve only column x, sometimes only column y, and sometimes both x and y, you might choose to create two separate indexes on x and y, relying on index combinations to process the queries that use both columns. This would be a better approach than creating a multi-column index with x and y in above mentioned scenario. 5. We can create indexes on expressions of columns, such as (lower (col1)) ;(( first_name || ' ' || last_name)), etc. Index expressions are relatively expensive to maintain because the derived expression(s) must be computed for each row upon insertion and whenever it is updated. Indexes on expressions are useful when retrieval speed is more important than insertion and update speed. 6. PostgreSQL supports index-only scans, which can answer queries from an index alone without any need for random heap access, given that it is primarily a B-tree index, and the query must reference only columns stored in the index. It will be a win only if a significant fraction of the table's heap pages have their all-visible map bits set. But tables in which a large fraction of the rows are unchanging are common enough to make this type of scan very useful in practice. 7. To make effective use of the index-only scan feature, you might choose to create a covering index, which is an index specifically designed to include the columns needed by a particular type of query that you run frequently. Since queries typically need to retrieve more columns than just the ones they search on, PostgreSQL allows you to create an index in which some columns are just “payload” and are not part of the search key. This is done by adding an INCLUDE clause listing the extra columns. 8. It's wise to be conservative about adding non-key payload columns to an index, especially wide columns. If an index tuple exceeds the maximum size allowed for the index type, data insertion will fail. In any case, non-key columns duplicate data from the index's table and bloat the size of the index, thus potentially slowing searches. And remember that there is little point in including payload columns in an index unless the table changes slowly enough that an index-only scan is likely to not need to access the heap. If the heap tuple must be visited anyway, it costs nothing more to get the column's value from there. 9. Suffix truncation removes non-key columns from upper B-Tree levels. As payload columns, they are never used to guide index scans. The truncation process also removes one or more trailing key column(s) when the remaining prefix of key column(s) happens to be sufficient to describe tuples on the lowest B-Tree level. In practice, covering indexes without an INCLUDE clause often avoid storing columns that are effectively payload in the upper levels. However, explicitly defining payload columns as non-key columns reliably keeps the tuples in upper levels small. 10. In principle, index-only scans can be used with expression indexes. However, PostgreSQL's planner is currently not very smart about such cases. It considers a query to be potentially executable by index-only scan only when all columns needed by the query are available from the index. For example, for a query searching on f(x), x is not needed except in the context f(x), but the planner does not notice that and concludes that an index-only scan is not possible. If an index-only scan seems sufficiently worthwhile, this can be worked around by adding x as an included column. Partial indexes also support index-only scans. 11. An index can support only one collation per index column. If multiple collations are of interest, multiple indexes may be needed. The index automatically uses the collation of the underlying column. 12. Always run ANALYZE first before examining index usage. This command collects statistics about the distribution of the values in the table. This information is required to estimate the number of rows returned by a query, which is needed by the planner to assign realistic costs to each possible query plan. In absence of any real statistics, some default values are assumed, which are almost certain to be inaccurate. 13. It is fatal to use very small test data sets to check index usage. While selecting 1,000 out of 100,000 rows could be a candidate for an index, selecting 1 out of 100 rows will hardly be because the 100 rows probably fit within a single disk page, and there is no plan that can beat sequentially fetching 1 disk page. Do evaluate each Index designed for each of the above points and proceed with an Informed Index Design Process. Also, I do need to point this out, for that migrating from SQL Server to PostgreSQL, please do not carry the Indexes from SQL Server to PostgreSQL as is. The architecture of the two software for Indexes is very, very different. So, re-evaluate and re-design Indexes when moving to PostgreSQL. It will be an effort, but it will be worth your while.
A streaming database is a type of database that is designed specifically to process large amounts of real-time streaming data. Unlike traditional databases, which store data in batches before processing, a streaming database processes data as soon as it is generated, allowing for real-time insights and analysis. Unlike traditional stream processing engines that do not persist data, a streaming database can store data and respond to user data access requests. Streaming databases are ideal for latency-critical applications such as real-time analytics, fraud detection, network monitoring, and the Internet of Things (IoT) and can simplify the technology stack. Brief History The concept of a streaming database was first introduced in academia in 2002. A group of researchers from Brown, Brandeis, and MIT pointed out the demand for managing data streams inside databases and built the first streaming database, Aurora. A few years later, the technology was adopted by large enterprises. The top three database vendors, Oracle, IBM, and Microsoft, consecutively launched their stream processing solutions known as Oracle CQL, IBM System S, and Microsoft SQLServer StreamInsight. Instead of developing a streaming database from scratch, these vendors have directly integrated stream processing functionality into their existing databases. Since the late 2000s, developers inspired by MapReduce have separated stream processing functionality from database systems and developed large-scale stream processing engines, including Apache Storm, Apache Samza, Apache Flink, and Apache Spark Streaming. These systems were designed to continuously process ingested data streams and deliver results to downstream systems. However, compared to streaming databases, stream processing engines do not store data and, therefore, cannot serve user-initiated ad-hoc queries. Streaming databases keep evolving in parallel with stream processing engines. Two streaming databases, PipelineDB and KsqlDB, were developed in the 2010s and were popular then. In the early 2020s, a few cloud-based streaming databases, like RisingWave, Materialize, and DeltaStream, emerged. These products aim to provide users with streaming database services in the cloud. To achieve that objective, the focus is on designing an architecture that fully utilizes resources on the cloud to achieve unlimited horizontal scalability and supreme cost efficiency. Typical Use Cases Real-time applications need streaming databases. Streaming databases are well-suited for real-time applications that demand up-to-date results with a freshness requirement ranging from sub-seconds to minutes. Applications like IoT and network monitoring require sub-second latency, and latency requirements for applications like ad recommendations, stock dashboarding, and food delivery can range from hundreds of milliseconds to several minutes. Streaming databases continuously deliver results at low latency and can be a good fit for these applications. Some applications are not freshness sensitive and can tolerate delays of tens of minutes, hours, or even days. Some representative applications include hotel reservations and inventory tracking. In these cases, users may consider using either streaming databases or traditional batch-based databases. They should decide based on other factors, such as cost efficiency, flexibility, and tech stack complexity. Streaming databases are commonly used alongside other data systems in real-time applications to facilitate two classic types of use cases: streaming ingestion (ETL) and streaming analytics. Streaming databases are commonly integrated with other modern data systems to facilitate two types of use cases: streaming ingestion (ETL) and streaming analytics. Streaming Ingestion (ETL) Streaming ingestion provides a continuous flow of data from one set of systems to another. Developers can use a streaming database to clean streaming data, join multiple streams, and move the joined results into downstream systems in real time. In real-world scenarios, data ingested into the streaming databases typically come from OLTP databases, messaging queues, or storage systems. After processing, the results are most likely to be dumped back into these systems or inserted into data warehouses or data lakes. Streaming Analytics Streaming analytics focuses on performing complex computations and delivering fresh results on-the-fly. Data typically comes from OLTP databases, message queues, and storage systems in the streaming analytics scenario. Results are usually ingested into a serving system to support user-triggered requests. A streaming database can also serve queries on its own. Users can connect a streaming database directly with a BI tool to visualize results. With the growing demand for real-time machine learning, streaming databases have also become a crucial tool for enabling agile feature engineering. By utilizing streaming databases to store transformed data as features, developers can respond promptly to changing data patterns and new events. Streaming databases allow for real-time ingestion, processing, and transformation of data into meaningful features that can enhance the accuracy and efficiency of machine learning models while also reducing data duplication and improving data quality. This empowers organizations to make faster and more informed decisions, optimize their machine-learning workflows, and gain a competitive advantage. Streaming Databases vs. Traditional Databases Difference between a streaming database and a traditional database. Traditional databases are designed to store large amounts of batch data and provide fast, consistent access to that data through transactions and queries. They are often optimized for processing complex operations, such as aggregations and joins, that manipulate the data in bulk. Traditional databases’ execution models are often referred to as Human-Active, DBMS-Passive (HADP) models. That is, a traditional database passively stores data, and queries actively initiated by humans trigger computations. Examples of traditional databases include OLTP databases like MySQL and PostgreSQL and OLAP databases like DuckDB and ClickHouse. Streaming databases, on the other hand, are designed to incrementally process a large volume of continuously ingested data on-the-fly, and provide low-latency access to the data and results for further processing and analysis. They are optimized for processing data as soon as it arrives rather than bulk processing after data is persisted. Streaming databases’ execution models are often called DBMS-active, Human-Passive (DAHP) models. A streaming database actively triggers computation as data comes in, and humans passively receive results from the database. Examples of streaming databases include PipelineDB, KsqlDB, and RisingWave. Streaming Databases vs. OLTP Databases An OLTP database is ACID-compliant and can process concurrent transactions. In contrast, a streaming database does not guarantee ACID compliance and, therefore, cannot be used to support transactional workloads. In terms of data correctness, streaming databases enforce consistency and completeness. A well-designed streaming database should guarantee the following two properties: Exactly-once semantics, meaning that every single data event will be processed once and only once, even if a system failure occurs. Out-of-order processing means that users can enforce a streaming database to process data events in a predefined order, even if data events arrive out of order. Streaming Databases vs. OLAP Databases An OLAP database is optimized for efficiently answering user-initiated analytical queries. OLAP databases typically implement columnar stores and a vectorized execution engine to accelerate complex query processing over large amounts of data. OLAP databases are best suited for use cases where interactive queries are essential. Different from OLAP databases, streaming databases focus more on the resulting freshness, and they use an incremental computation model to optimize latency. Streaming databases typically do not adopt column stores but may implement vectorized execution for query processing. Conclusion In conclusion, a streaming database is an essential system for organizations that require real-time insights from large amounts of data. By providing real-time processing, scalability, and reliability, a streaming database can help organizations make better decisions, identify opportunities, and respond to threats in real time.
When it comes to managing large amounts of data in a distributed system, Apache Cassandra and Apache Pulsar are two names that often come up. Apache Cassandra is a highly scalable NoSQL database that excels at handling high-velocity writes and queries across multiple nodes. It is an ideal solution for use cases such as user profile management, product catalogs, and real-time analytics. A platform for distributed messaging and streaming, called Apache Pulsar, was created to manage moving data. It can handle standard messaging workloads and more complex streaming use cases including real-time data processing and event-driven architectures. This article covers the main steps of building a Spring Boot and React-based web application that interacts with Pulsar and Cassandra, displaying stock data live as it is received. This is not a complete tutorial, it only covers the most important steps. You can find the complete source code for the application on GitHub. You will learn how to: Set up Cassandra and Pulsar instances using DataStax Astra DB and Astra Streaming. Publish and consume Pulsar messages in a Spring Boot application. Store Pulsar messages in Cassandra using a sink. Viewing live and stored data in React using the Hilla framework by Vaadin. Used Technologies and Libraries Apache Cassandra (with Astra DB) Apache Pulsar (with Astra Streaming) Spring Boot Spring for Apache Pulsar Spring Data for Apache Cassandra React Hilla AlphaVantage API Requirements Java 17 or newer Node 18 or newer Intermediate Java skills and familiarity with Spring Boot Storing Sensitive Data in Spring Boot Much of the setup for Cassandra and Pulsar is configuration-based. While it might be tempting to put the configuration in application.properties, it is not a smart idea as the file is under source control, and you may unintentionally reveal secrets. Instead, create a local config/local/application.properties configuration file and add it to .gitignore to ensure it does not leave your computer. The settings from the local configuration file will be automatically applied by Spring Boot: mkdir -p config/local touch config/local/application.properties echo " # Contains secrets that shouldn't go into the repository config/local/" >> .gitignore You may provide Spring Boot with the options as environment variables when using it in production. Setting Up Cassandra and Pulsar Using DataStax Astra Both Apache technologies used in this article are open-source projects and can be installed locally. However, using cloud services to set up the instances is a simpler option. In this article, we set up the data infrastructure required for our example web application using DataStax free tier services. Begin by logging in to your existing account or signing up for a new one on Astra DataStax’s official website, where you will be required to create the database and streaming service separately. Cassandra Setup Start by clicking “Create Database” from the official Astra DataStax website. Sinking data from a stream into Astra DB requires that both services are deployed in a region that supports both Astra Streaming and Astra DB: Enter the name of your new database instance. Select the keyspace name. (A keyspace stores your group of tables, a bit like schema in relational databases). Select a cloud Provider and Region.Note: For the demo application to work, you need to deploy the database service on a region that supports streaming too. Select “Create Database.” Cassandra: Connecting to the Service Once the initialization of the database service is created, you need to generate a token and download the “Secure Connection Bundle” that encrypts the data transfer between the app and the cloud database (mTLS). Navigate to the DB dashboard “Connect” tab sheet where you will find the button to generate a one-time token (please remember to download it) and the bundle download button: spring.cassandra.schema-action=CREATE_IF_NOT_EXISTS spring.cassandra.keyspace-name=<KEYSPACE_NAME> spring.cassandra.username=<ASTRADB_TOKEN_CLIENT_ID> spring.cassandra.password=<ASTRADB_TOKEN_SECRET> # Increase timeouts when connecting to Astra from a dev workstation spring.cassandra.contact-points=<ASTRADB_DATACENTER_ID> spring.cassandra.port=9042 spring.cassandra.local-datacenter=<ASTRADB_REGION> datastax.astra.secure-connect-bundle=<secure-connect-astra-stock-db.zip> Pulsar parameters for application.properties. Pulsar Set Up Start by clicking “Create Stream” from the main Astra DataStax page: Enter the name for your new streaming instance. Select a provider and region.Note: Remember to use the same provider and region you used to create the database service. Select “Create Stream.” Pulsar: Enabling Auto Topic Creation In addition to getting the streaming service up and running, you will also need to define the topic that is used by the application to consume and produce messages. You can create a topic explicitly using UI, but a more convenient way is to enable “Allow Auto Topic Creation” setting for the created instance: Click on the newly created stream instance and navigate to the “Namespace and Topics” tab sheet, and click “Modify Namespace.” Navigate to the “Settings” tab located under the default namespace (not the top-level “Settings” tab) and scroll all the way down. Change the “Allow Topic Creation” to “Allow Auto Topic Creation.” Changing this default setting will allow the application to create new topics automatically without any additional admin effort in Astra. With this, you have successfully established the infrastructure for hosting your active and passive data. Pulsar: Connecting to the Service Once the streaming instance has been set up, you need to create a token to access the service from your app. Most of the necessary properties are located on the “Connect” tab sheet of the “Streaming dashboard.” The “topic-name” input is found in the “Namespaces and Topics” tab sheet: ## Client spring.pulsar.client.service-url=<Broker Service URL> spring.pulsar.client.auth-plugin-class-name=org.apache.pulsar.client.impl.auth.AuthenticationToken spring.pulsar.client.authentication.token=<Astra_Streaming_Token> ## Producer spring.pulsar.producer.topic-name=persistent://<TENANT_NAME>/default/<TOPIC_NAME> spring.pulsar.producer.producer-name=<name of your choice> ## Consumer spring.pulsar.consumer.topics=persistent://<TENANT_NAME>/default/<TOPIC_NAME> spring.pulsar.consumer.subscription-name=<name of your choice> spring.pulsar.consumer.consumer-name=<name of your choice> spring.pulsar.consumer.subscription-type=key_shared Pulsar parameters for application.properties. Publishing Pulsar Messages From Spring Boot The Spring for Apache Pulsar library takes care of setting up Pulsar producers and consumers based on the given configuration. In the application, the StockPriceProducer component handles message publishing. To fetch stock data, it makes use of an external API call before publishing it to a Pulsar stream using a PulsarTemplate. Autowire the PulsarTemplate into the class and save it to a field: Java @Component public class StockPriceProducer { private final PulsarTemplate<StockPrice> pulsarTemplate; public StockPriceProducer(PulsarTemplate<StockPrice> pulsarTemplate) { this.pulsarTemplate = pulsarTemplate; } //... } Then use it to publish messages: Java private void publishStockPrices(Stream<StockPrice> stockPrices) { // Publish items to Pulsar with 100ms intervals Flux.fromStream(stockPrices) // Delay elements for the demo, don't do this in real life .delayElements(Duration.ofMillis(100)) .subscribe(stockPrice -> { try { pulsarTemplate.sendAsync(stockPrice); } catch (PulsarClientException e) { throw new RuntimeException(e); } }); } You need to configure the schema for the custom StockPrice type. In Application.java, define the following bean: Java @Bean public SchemaResolver.SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() { return (schemaResolver) -> schemaResolver.addCustomSchemaMapping(StockPrice.class, Schema.JSON(StockPrice.class)); } Consuming Pulsar Messages in Spring Boot The Spring for Apache Pulsar library comes with a @PulsarListener annotation for a convenient way of listening to Pulsar messages. Here, the messages are emitted to a Project Reactor Sink so the UI can consume them as a Flux: Java @Service public class StockPriceConsumer { private final Sinks.Many<StockPrice> stockPriceSink = Sinks.many().multicast().directBestEffort(); private final Flux<StockPrice> stockPrices = stockPriceSink.asFlux(); @PulsarListener private void stockPriceReceived(StockPrice stockPrice) { stockPriceSink.tryEmitNext(stockPrice); } public Flux<StockPrice> getStockPrices() { return stockPrices; } } Creating a Server Endpoint for Accessing Data From React The project uses Hilla, a full-stack web framework for Spring Boot. It manages websocket connections for reactive data types and allows type-safe server communication. The client may utilize the matching TypeScript methods created by the StockPriceEndpoint to fetch data: Java @Endpoint @AnonymousAllowed public class StockPriceEndpoint { private final StockPriceProducer producer; private final StockPriceConsumer consumer; private final StockPriceService service; StockPriceEndpoint(StockPriceProducer producer, StockPriceConsumer consumer, StockPriceService service) { this.producer = producer; this.consumer = consumer; this.service = service; } public List<StockSymbol> getSymbols() { return StockSymbol.supportedSymbols(); } public void produceDataForTicker(String ticker) { producer.produceStockPriceData(ticker); } public Flux<StockPrice> getStockPriceStream() { return consumer.getStockPrices(); } public List<StockPrice> findAllByTicker(String ticker) { return service.findAllByTicker(ticker); } } Displaying a Live-Updating Chart in React The DashboardView has an Apex Chart candle stick chart for displaying the stock data. It’s bound to a state of type ApexAxisChartSeries: TypeScript const [series, setSeries] = useState<ApexAxisChartSeries>([]); The view uses a React effect hook to call the server endpoint and subscribe to new data. It returns a disposer function to close the websocket when it is no longer needed: TypeScript useEffect(() => { const subscription = StockPriceEndpoint .getStockPriceStream() .onNext((stockPrice) => updateSeries(stockPrice)); return () => subscription.cancel(); }, []); The series is bound to the template. Because the backend and frontend are reactive, the chart is automatically updated any time a new Pulsar message is received: HTML <ReactApexChart type="candlestick" options={options} series={series} height={350} ></div> Persisting Pulsar Messages to Cassandra Sinking Pulsar messages to Astra DB can be useful in scenarios where you need a reliable, scalable, and secure platform to store event data from Pulsar for further analysis, processing, or sharing. Perhaps you need to retain a copy of event data for compliance and auditing purposes, need to store event data from multiple tenants in a shared database, or for some other use case. Astra Streaming offers numerous fully-managed Apache Pulsar connectors you can use to persist event data to various databases and third party solutions, like Snowflake. In this article, we are persisting the stream data into Astra DB. Creating a Sink Start by selecting the “Sink” tab sheet from the Astra streaming dashboard. Select the “default” namespace: From the list of available “Sink Types,” choose “Astra DB.” Give the sink a name of your choice Select the “stock-feed” that will be available once you have published messages to that topic from your app. After selecting data stream input, select the database you want to persist pulsar messages: To enable table creation, paste the Astra DB token with valid roles. You’ll notice keyspaces after the entry of a valid token, choose the keyspace name that was used to create the database. Then enter the table name.Note: This needs to match the @Table("stock_price") annotation value you use in StockPrice.java class to read back the data. Next, you need to map the properties from the Pulsar message to the database table column. Property fields are automatically mapped in our demo application, so you can simply click “Create” to proceed. If you were, for instance, persisting a portion of the data to the database, opening the schema definition would enable you to view the property names employed and create a custom mapping between the fields. After the sink is created, the initialization process will begin. After which, the status will change to “active.” Then, you’re done with automatically persisting stock data into your database for easy access by application. The sink dashboard provides access to sink log files in the event of an error. Displaying Cassandra Data in a Table The historical data that is stored in Cassandra are displayed in a data grid component. The DetailsView contains a Vaadin Grid component that is bound to an array of StockPrice objects which are kept in a state variable: TypeScript const [stockData, setStockData] = useState<StockPrice[]>([]); The view has a dropdown selector for selecting the stock you want to view. When the selection is updated, the view fetches the data for that stock from the server endpoint: TypeScript async function getDataFor(ticker?: string) { if (ticker) setStockData(await StockPriceEndpoint.findAllByTicker(ticker)); } The StockData array is bound to the grid in the template. GridColumns define the properties that columns should map to: HTML <Grid items={stockData} className="flex-grow"> <GridColumn path="time" ></GridColumn> <GridColumn path="open" ></GridColumn> <GridColumn path="high" ></GridColumn> <GridColumn path="low" ></GridColumn> <GridColumn path="close" ></GridColumn> <GridColumn path="volume" ></GridColumn> </Grid> Conclusion In this article, we showed how you can build a scalable real-time application using an open-source Java stack. You can clone the completed application and use it as a base for your own experiments.
Oren Eini
Wizard,
Hibernating Rhinos @ayende
Abhishek Gupta
Principal Developer Advocate,
AWS
Artem Ervits
Solutions Engineer,
Cockroach Labs
Sahiti Kappagantula
Product Associate,
Oorwin