It’s been a busy few months, and we are happy to announce the release of Kùzu 0.7.0! This release includes important performance and scalability improvements at the very core of the system along with some user-facing features. This blog post will highlight some key features that will make your experience with Kùzu even better when working with large graphs!
TL;DR: Key highlights of this release
There have been some key performance improvements in this release:
- New and much faster recursive path finding algorithms that implement relationship patterns
with the Kleene star (
*
). - Data spilling to disk during copy which enables copying very large graphs on machines with limited RAM.
- Zone maps which enable much faster scans of node/rel properties when there are filters on numeric properties.
We’ve also made significant strides on the scalability front: you can now scalably load and query graphs with billions of edges (see our microbenchmarks on LDBC1000 and Graph500-30 below, which respectively contain 2.2 billion and 17 billion edges).
Among the user-facing features, we have the following additions:
- CSV auto detection to automatically detect several CSV configurations during data ingest.
- Improved UX during CSV/JSON import that can report to users about skipping erroneous CSV lines or JSON elements.
- New JSON data type that you can use to store JSON blobs as node/relationship properties.
- New official Golang API so that you can build applications on top of Kùzu using Go!
The following sections go into a lot more detail on the above features and more, so let’s dive in!
Performance and scalability improvements
New recursive path finding algorithms
We did a significant rewrite of the part of the query processor that evaluates recursive relationship pattern clauses
(or recursive joins) in Cypher. These are the clauses that contain the Kleene star (*
) syntax in relationship patterns,
such as the following:
MATCH (a:Person)-[e:Knows*1..10]->(b:Person)
MATCH (a:Person)-[e:Knows* SHORTEST]->(b:Person)
These are perhaps the most expensive joins that graph DBMSs evaluate as they are recursive and in many databases, because each node connects to many other nodes (i.e., that the joins are many-to-many), the search space grows very quickly. Even searching paths with relatively small lengths, say 4 or 5, from a single source can end up scanning and processing large chunks of or even entire databases.
Prior to this release, our implementation was not very optimized as we hadn’t yet parallelized the evaluation of these queries and we used sparse intermediate data structures. As a result, the data structures storing already-reached nodes during a shortest path computation would grow as the number of nodes reached grows. This works fine on queries if the depth of the recursion is small, but was very slow on queries that have longer recursion depths on large datasets.
In this release, we’ve parallelized the implementation, started using dense structures, and added many
other optimizations — for example, we store the intermediate paths during the computation very compactly while
pushing down LIMIT
and add additional acyclic/trail/walk semantic checks to the computation.
For readers who are familiar with the low-level details of query processing techniques, our new
implementation mimics those in parallel graph analytics systems, such as Ligra,
Pregel, or GraphChi.
The technical details are beyond the scope of this release post and we plan to write
a separate blog post on this topic later. From the user perspective, nothing changes except that you should observe
your queries using recursive path patterns getting faster.
All of the previous recursive path features are supported as before, e.g., using filters on intermediate nodes and relationships or the support for acyclic, trail, and walk semantics. If you observe a slowdown, it’s certainly a performance bug, so let us know and we will fix it! Oh, and all of the recursive computations happen by scanning data from disk, so we are not building any in-memory index structures on the fly that can limit scalability. Kùzu’s relationship/adjacency list storage is disk-based but already optimized for good scan performance. So if your graphs are very large and don’t need to fit into Kùzu’s buffer manager, we will scale out of memory transparently.
Below is a demonstrative micro-benchmark of the performance improvements on large databases. Let’s take a query that finds the shortest path lengths from a single source to all other nodes. We use two input graphs:
-
LDBC1000-PK: LDBC1000 is LDBC’s social network benchmark dataset at scale 1000 (so this is a 1TB-scale database). In LDBC1000-PK, we use only the subgraph of Person nodes and Knows edges between Person nodes. LDBC1000-PK contains around 3.2M nodes and 202M edges;
-
Graph500-30: This is the largest graph in LDBC’s graphalytics benchmarks. This graph contains 448M nodes and 17B edges. When we load to disk, this graph takes 495GB on disk.
All experiments we present in this post use a machine with 2 AMD EPYC 7551 processors (32 cores) and 512GB DDR4 memory and 2TB SSDs. We run Kùzu with its default settings.
In the next experiment, we find the length of shortest paths from a single source to all other nodes and count the number of nodes at each length. The query template looks as follows:
MATCH p = (n1:person)-[e:knows* SHORTEST]->(n2:person)
WHERE n1.ID={source_id}
RETURN length(e), count(*);
# Threads | 1 | 2 | 4 | 8 | 16 | 32 |
---|---|---|---|---|---|---|
0.6.1 (LDBC1000-PK) | 56.4s | 55.8s | 55.7s | 54.1s | 52.1s | 50.4s |
0.7.0 (LDBC1000-PK) | 3.33s | 1.84s | 1.13s | 0.66s | 0.40s | 0.32s |
0.6.1 (Graph500-30) | Timeout | Timeout | Timeout | Timeout | Timeout | Timeout |
0.7.0 (Graph500-30) | 170.1s | 110.1s | 49.5s | 29s | 16.6s | 13.5s |
On the 202M edge LDBC sub-graph, we compute shortest path lengths to all destinations in 0.32s using 32 threads. This is a relatively small part of the entire LDBC database, and Kùzu does not scan much data from disk.
On the 17B edge Graph500-30 graph, we compute shortest path lengths in 13.5s. Note that we’re finding shortest paths to 448M other nodes, and outputting 448M tuples from the recursive join operator, and this is happening on a completely disk-based implementation, so all scans happen through the buffer manager. At this scale, there is actual I/O happening at each iteration of the shortest path computation and the computation takes 16 iterations (so you can assume we’re scanning large chunks of the 495GB database in many of the iterations). The previous version of Kùzu times out, which is set to 10 minutes, on this graph, while the new version can finish the query in 13.5 seconds using 32 threads.
It’s clear that both cases parallelize quite well and are much faster than the previous version of Kùzu.
There are still more optimizations on our roadmap to make these even faster, so you can keep an eye on how these numbers are improving over upcoming releases.
Data spilling during bulk relationship ingestion (COPY FROM)
The recommended (fast) way to ingest large amounts of data into Kùzu is the COPY FROM
statement.
COPY FROM
can be used to ingest records from a source into a node or relationship table.
The source data can be in a local or remote file in some tabular format
such as CSV, Parquet, or JSON, or in-memory objects, such as Pandas or Polars DataFrames, or the results of another subquery.
Prior to this release, COPY FROM
had a scalability issue when ingesting billions of records into a relationship table.
Specifically, Kùzu required storing all of the relationship records from the source in memory before it could start
creating its persistent disk-based structures. This meant that if you had machines with relatively low RAM, you would have
to manually chunk the source data into smaller parts, otherwise the COPY FROM
pipeline could run out of memory and fail.
In this release, Kùzu automatically spills the records it scans from the source table into
a temporary file .tmp
located inside the database directory on disk during COPY FROM
.
The spilling is in fact done automatically by the buffer manager
as COPY FROM
demands more and more memory.
It is by default turned on when you open a local Kùzu database under read-write mode, and you can choose to turn if off by CALL spill_to_disk=false;
.
Note that we don’t support spilling under in-memory mode for now.
As a result of this improvement, Kùzu can now ingest very
large sets of relationship records (such as the edges of the 17B Graph500-30 graph in the above experiment)
when it is running on a machine with limited memory.
Below is a demonstrative example of the loading times when we limit the buffer manager capacity of the system when loading the Graph500-30 graph. We are using 32 threads on the same machine as in the above experiment:
BM Size | Loading time | Amount of spilled data |
---|---|---|
410GB | 3613s | 420GB |
205GB | 4081s | 638GB |
102GB | 4276s | 736GB |
Using 32 threads, we can load 17B edges in 1 hour when giving 420GB of memory to the buffer manager, 1 hour 8 minutes when giving 205GB,
and 1 hour and 10 minutes when using 102GB. These numbers should look very good to anyone working with large datasets
and existing GDBMSs. We are very pleased with the performance of the COPY FROM
pipeline for really large graphs,
and highly recommend pushing it to its limits if you need to ingest and work on very large databases.
Zone maps
In this release we also implemented another optimization: Zone maps, which is a widely adopted optimization in columnar systems to speed up scans when there is a filter in a query on a numeric column. The idea is to store statistics, specifically the minimum and maximum values, per chunks of the column (in our case node groups). When there is a filter on the property stored in the column, the system can use the minimum and maximum values to infer whether the chunk can contain any value that passes the filter. For example, consider the following query:
MATCH (a:Person)
WHERE a.age < 10
RETURN *
To evaluate this query, Kùzu needs to scan the age
column
of a the Person
node table. If for a particular node group j
, the minimum value stored for the zone map is greater than 10, say 15,
then the system can skip over scanning the entire node group, since no value in node group j
can pass the filter.
Below is a simple experiment using 32 threads that demonstrates the performance benefits of zone maps.
We run the following query template on LDBC1000’s Comments
node table, which contains 2.2B nodes:
MATCH (c:Comment)
WHERE c.length > $length
RETURN *;
$length | output size | w/out zone maps | w/ zone maps |
---|---|---|---|
2000 | 0 | 12.8s | 0.13s |
1900 | 4597 | 12.8s | 3.56s |
1500 | 23195 | 12.3s | 8.98s |
1000 | 46640 | 12.0s | 11.1s |
So if your query is searching for a very selective property range, as in the first row, large chunks of the column can be skipped during the scan and improve your query’s performance significantly! Zone maps are automatically enabled on numeric columns and automatically used, so you do not have to do anything in your applications to benefit from this optimization.
Other Optimizations
ALP floating point data compression
We are continuing to improve our storage layer by incorporating new compression algorithms. This release implements
the ALP (Adaptive Lossless floating-Point Compression) compression algorithm for FLOAT
and DOUBLE
values in the system.
Here is an example to demonstrate the compression ratio of ALP. We use two datasets,
US Stocks and CommonGovernment_1.
The US Stocks
dataset contains 9.4M tuples that consists of a single DOUBLE
column.
CommonGovernment_1
contain 11.3M tuples that consists of 9 DOUBLE
columns.
We create node tables with the same
schema as these datasets and run a COPY FROM
statement to bulk-ingest the CSV data to the corresponding node tables.
Dataset | 0.6.1 DB size | 0.6.1 Copy time | 0.7.0 (w/ ALP) DB size | 0.7.0 Copy time |
---|---|---|---|---|
US Stocks | 99 MB | 0.66s | 45MB (2.2x) | 0.67s (1.01x slowdown) |
CommonGovernment_1 | 825MB | 1.41s | 290.5MB (2.8x) | 1.81s (1.28x slowdown) |
We get, respectively, 2.2x and 2.8x compression ratios for the two datasets with 1.01x and 1.28x slowdown of the bulk ingestion operation. Aside from the reduction in the database size, compression should improve general query performance because it leads to less I/O and allows keeping a larger fraction of the database in memory. As with zone maps, this optimization also does not require any manual configuration and you will automatically benefit from it.
Filter/projection push down to attached relational databases
This performance optimization is related to
attaching to external DuckDB, Postgres, and SQLite databases using Kùzu’s external RDBMSs extension.
Suppose you have attached to a remote Postgres database uw
that has a Person
table and you would like to copy the records
in this table to a Person
node table in Kùzu. You can run the following query:
COPY Person FROM (
LOAD FROM uw.Person
WHERE age > 10
RETURN name
)
The age > 10
predicate in the above query is part of the LOAD FROM
and filters the tuples in the Postgres table based on
their age
values. Part of the query’s execution requires sending a SQL statement to Postgres to scan the uw.Person
records.
Previously, Kùzu would scan all tuples in uw.Person
using a simple SELECT * FROM uw.person
query
and run any filters and projections in its own query processor.
Starting from this release, we now push down filters in the WHERE
clause to the SQL query sent to Postgres.
The same happens for projections, i.e., when possible, projections are also pushed to SQL queries sent to the external RDBMSs.
User-facing features
JSON data type
If you work with JSON data, you’ll find that this latest release has a new data type: JSON
.
In Kùzu’s previous versions, you could scan and load data from JSON files but you could not have node or relationship
properties that are of type JSON
.
To use this data type, you need to install the JSON extension.
Along with this data type, we also provide a set of functions
to make it easy to work with JSON data.
The following example creates a node table Person with a JSON property, it then creates two
JSON objects in this column using the to_json
function, and outputs them.
INSTALL json;
LOAD EXTENSION json;
CREATE NODE TABLE Person (id INT64, description JSON, primary key(id));
CREATE (p:Person {id: 20, description: to_json({height: 52, age: 32, scores: [1,2,5]})});
CREATE (p:Person {id: 40, description: to_json({age: 55, scores: [1,32,5,null], name: 'dan'})});
You can then query on the JSON objects by using the json_extract
function to access their fields.
MATCH (p:Person)
WHERE json_extract(p.description, 'age') < 50
RETURN p.id AS id, json_extract(p.description, 'age') AS age;
┌───────┬────────┐
│ id │ age │
│ INT64 │ json │
├───────┼────────┤
│ 20 │ 32 │
└───────┴────────┘
Allow skipping erroneous rows during COPY/LOAD FROM
We’ve enhanced the usability of COPY FROM
(and LOAD FROM
) statements when scanning raw files, such as CSV or JSON, by allowing
users to skip erroneous or malformed rows during the ingestion process.
There can be several reasons for errors, such as the CSV line being malformed or that the line leads to
duplicate primary keys if you were copying into a node table.
Having the ability to skip erroneous rows (without the entire COPY
operation failing) greatly improves the user experience of COPY FROM
.
By specifying IGNORE_ERRORS=TRUE
in a COPY FROM
statement, you can skip rows that trigger exceptions, and continue ingesting
all other rows. To view the error messages and skipped rows in detail, you can use the new show_warnings()
function.
More details can be found in our docs. The same functionality also works
for using LOAD FROM
.
CSV auto detection
To make it easier for users to work with data in CSV format, we implemented our own CSV sniffer.
Kùzu can now automatically detect several CSV configurations, such as delimiter, quote, and escape character,
and apply them when using them with COPY/LOAD FROM
statements. See our docs for some
examples on how CSV auto detection works.
Attach remote DuckDB
Earlier releases allowed attaching Kùzu to local DuckDB databases. This release extends the functionality to also allow attaching to remote DuckDB databases on S3. The syntax looks something like this:
SET S3_URL_STYLE='VHOST';
ATTACH 's3://my_s3_url/my_bucket/persons.duckdb' as person_db (dbtype duckdb);
LOAD FROM person_db.persons RETURN COUNT(*);
All you need to do is point to the S3 URL that contains a remote DuckDB database and Kùzu can then scan and load data from it. See the external RDBMS docs for more details.
Golang API
A lot of of our users work with Golang and have been requesting official support for it in Kùzu, so we are pleased to announce our new Golang client API in this release! The Go package is a wrapper around our C API, so it shares the same performance and scalability characteristics. If your primary application language is Go, please give this a go (pun intended 😄). You can check out the documentation for the Golang API here.
(Temporary) Removal of RDFGraphs
After much deliberation, we have removed our RDFGraphs feature from Kùzu beginning from this release. We had originally added this feature to enable users to import and query their raw RDF data, for e.g., a Turtle file, in Kùzu, so they can do basic querying over their RDF triples using Cypher (similar to Neosemantics). However, we had designed this as a core extension of our data model. Over time, we realized this was too aggressive and maintaining this as a core feature was slowing the overall development. So we decided to remove it from the core system. We are discussing instead to repackage it as an extension and release it at some later date. Some discussion around this decision can be found here. Users who are interested in using RDFGraphs can continue using version 0.6.1 of Kùzu. If you’d like to discuss your RDF use cases further, please join us on Discord and create a discussion issue on the #discussion channel to start a conversation.
Closing remarks
Version 0.7.0 furthers Kùzu’s larger vision of enabling users to build faster and more scalable graph-based applications, and we couldn’t be more excited to see what you build with it! Please give this release a try and chat with us and other users on Discord if you have any questions. As always, you can check out the release notes on GitHub for a full list of all the bugfixes and updates in this release. We thank everyone who worked hard on this release, especially our amazing interns!
Till next time!