Officially out now: The TypeDB 3.0 Roadmap

Optimizing speed

The speed of bulk loading can be optimised in two ways: by batching queries, and by parellelizing transactions. On this page, we’ll explore these methods.

For the examples shown on this page, we’ll load data into a bookstore database. We’ll assume the data is pre-formatted into TypeQL files containing one Insert query per line. We’ll consider data relating to contributors, publishers, and books, in the files contributors.tql, publishers.tql, and books.tql respectively.

Example queries
  • Example line from contributors.tql:

    insert $contributor isa contributor; $contributor has name "J.R.R. Tolkien";
  • Example line from publishers.tql:

    insert $publisher isa publisher; $publisher has name "Harper Collins";
  • Example line from books.tql:

    match $contributor_1 isa contributor; $contributor_1 has name "J.R.R. Tolkien"; $publisher isa publisher; $publisher has name "Harper Collins"; insert $book isa ebook; $book has isbn-13 "9780008627843"; $book has title "The Hobbit"; $book has page-count 310; $book has price 16.99; $book has isbn-10 "0008627843"; $book has genre "fiction"; $book has genre "fantasy"; (work: $book, author: $contributor_1) isa authoring; (work: $book, illustrator: $contributor_1) isa illustrating; (published: $book, publisher: $publisher) isa publishing;

The Insert queries for books reference contributors and publishers, so all the contributors and publishers must be inserted before we can begin inserting books.

Batching queries

Normally, transactions are used to represent logically atomic business operations, ensuring that the database is always in a consistent state when read. However, consistency is not typically a requirement until after bulk loading has finished, so we can use transactions to batch queries instead. The commit is the most computationally expensive part of a transaction. Rather than committing after every insert, we can commit inserts in large batches, significantly speeding up the process.

The following code snippets show example implementations of query batching using the TypeDB Python, Node.js, and Java drivers. To begin with, we define iterators that will pipeline the queries into batches for us.

Imports
  • Python

  • Node.js

  • Java

import os
from collections.abc import Iterator
from multiprocessing import Queue, Manager, Pool
from typedb.api.connection.credential import TypeDBCredential
from typedb.api.connection.session import TypeDBSession, SessionType
from typedb.api.connection.transaction import TransactionType
from typedb.driver import TypeDB
import {TypeDB} from "typedb-driver/TypeDB";
import {SessionType, TypeDBSession} from "typedb-driver/api/connection/TypeDBSession";
import {TransactionType, TypeDBTransaction} from "typedb-driver/api/connection/TypeDBTransaction";
import * as fs from "fs";
import * as readline from "readline";
import {TypeDBCredential, TypeDBDriver} from "typedb-driver";
import * as os from "os";
import com.vaticle.typedb.common.collection.Either;
import com.vaticle.typedb.common.concurrent.NamedThreadFactory;
import com.vaticle.typedb.driver.TypeDB;
import com.vaticle.typedb.driver.api.TypeDBCredential;
import com.vaticle.typedb.driver.api.TypeDBDriver;
import com.vaticle.typedb.driver.api.TypeDBSession;
import com.vaticle.typedb.driver.api.TypeDBTransaction;
import javax.lang.model.type.NullType;
import java.io.*;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.ArrayList;
import java.util.NoSuchElementException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import static java.nio.charset.StandardCharsets.UTF_8;
  • Python

  • Node.js

  • Java

def query_iterator(file_paths: list[str]) -> Iterator[str]:
    for path in file_paths:
        with open(path, "r") as file:
            for line in file:
                yield line


def batch_iterator(file_paths: list[str], batch_size: int) -> Iterator[list[str]]:
    next_batch: list[str] = list()

    for query in query_iterator(file_paths):
        next_batch.append(query)

        if len(next_batch) >= batch_size:
            yield next_batch
            next_batch: list[str] = list()

    if len(next_batch) > 0:
        yield next_batch
class QueryIterator implements AsyncIterable<string> {
    private filepaths: Array<string>;
    constructor(filepaths: Array<string>) {
        this.filepaths = filepaths;
    }

    async *[Symbol.asyncIterator](): AsyncIterableIterator<string> {

        for (let filepath of this.filepaths) {
            let file = readline.createInterface(fs.createReadStream(filepath));

            for await (let query of file) {
                yield query;
            }
        }
    }
}


class BatchIterator implements AsyncIterable<Array<string>> {
    private filepaths: Array<string>;
    private batchSize: number;

    constructor(filepaths: Array<string>, batchSize: number) {
        this.filepaths = filepaths;
        this.batchSize = batchSize;
    }

    async *[Symbol.asyncIterator](): AsyncIterator<Array<string>> {
        let queryIterator = new QueryIterator(this.filepaths);
        let nextBatch: Array<string> = [];

        for await (let query of queryIterator) {
            nextBatch.push(query);

            if (nextBatch.length >= this.batchSize) {
                yield nextBatch;
                nextBatch = [];
            }
        }

        if (nextBatch.length > 0) { yield nextBatch }
    }
}
public class QueryIterator implements Iterator<String> {
    public final List<String> filepaths;
    private Iterator<String> lineIterator;
    private boolean finished;

    public QueryIterator(ArrayList<String> filepaths) throws FileNotFoundException {
        this.filepaths = filepaths;
        finished = false;
        prepNextIterator();
    }

    private static InputStream inputStream(String filepath) throws FileNotFoundException {
        try {
            return new BufferedInputStream(new FileInputStream(filepath));
        } catch (IOException e) {
            throw new FileNotFoundException();
        }
    }

    private void prepNextIterator() throws FileNotFoundException {
        if (this.filepaths.isEmpty()) {
            finished = true;
        } else {
            InputStream inputStream = inputStream(filepaths.remove(0));
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream, UTF_8));
            lineIterator = bufferedReader.lines().iterator();
            if (!lineIterator.hasNext()) prepNextIterator();
        }
    }

    @Override
    public boolean hasNext() {
        return !finished;
    }

    @Override
    public String next() {
        String nextItem = lineIterator.next();

        if (!lineIterator.hasNext()) {
            try {
                this.prepNextIterator();
            } catch (FileNotFoundException e) {
                throw new RuntimeException(e);
            }
        }

        return nextItem;
    }
}


public class BatchIterator implements Iterator<List<String>> {
    public int batchSize;
    private final Iterator<String> queries;

    public BatchIterator(ArrayList<String> filepaths, int batchSize) throws FileNotFoundException {
        this.batchSize = batchSize;
        this.queries = new QueryIterator(filepaths);
    }

    @Override
    public boolean hasNext() { return queries.hasNext(); }

    @Override
    public List<String> next() {
        if (!this.hasNext()) throw new NoSuchElementException();
        List<String> batch = new ArrayList<>();

        while (queries.hasNext() && batch.size() < batchSize) {
            batch.add(queries.next());
        }

        return batch;
    }
}

Now we can instantiate and consume a batch iterator from a function that will load the batches into the database. To load each batch, we open a transaction, execute each insert query in the batch, and then commit the transaction.

  • Python

  • Node.js

  • Java

def load_batch(session: TypeDBSession, batch: list[str]) -> None:
    with session.transaction(TransactionType.WRITE) as transaction:
        for query in batch:
            transaction.query.insert(query)

        transaction.commit()


def load_data(addresses: str | list[str], username: str, password: str) -> None:
    database = "bookstore"
    data_files = ["contributors.tql", "publishers.tql", "books.tql"]
    batch_size = 100
    credential = TypeDBCredential(username, password, tls_enabled=True)

    with TypeDB.cloud_driver(addresses, credential) as driver:
        with driver.session(database, SessionType.DATA) as session:
            for batch in batch_iterator(data_files, batch_size):
                load_batch(session, batch)
async function loadBatch(batch: Array<string>, session: TypeDBSession): Promise<void> {
    let transaction: TypeDBTransaction;

    try {
        transaction = await session.transaction(TransactionType.WRITE);

        for (let query of batch) {
            transaction.query.insert(query)
        }

        await transaction.commit();
    } finally {
        if (transaction.isOpen()) {
            await transaction.close();
        }
    }
}


async function loadData(addresses: Array<string>, username: string, password: string): Promise<void> {
    const database: string = "bookstore";
    const dataFiles: Array<string> = ["contributors.tql", "publishers.tql", "books.tql"];
    const batchSize: number = 100;

    let credential: TypeDBCredential = new TypeDBCredential(username, password);
    let driver: TypeDBDriver;

    try {
        driver = await TypeDB.cloudDriver(addresses, credential);
        let session: TypeDBSession;

        try {
            session = await driver.session(database, SessionType.DATA);
            let batchIterator = new BatchIterator(dataFiles, batchSize);

            for await (let batch of batchIterator) {
                await loadBatch(batch, session);
            }
        } finally { await session?.close() }
    } finally { await driver?.close() }
}
public static void loadBatch(TypeDBSession session, List<String> batch) {
    try (TypeDBTransaction transaction = session.transaction(TypeDBTransaction.Type.WRITE)) {
        for (String query: batch) {
            transaction.query().insert(query);
        }
        transaction.commit();
    }
}


public static void loadData(Set<String> addresses, String username, String password) throws FileNotFoundException {
    String database = "bookstore";
    ArrayList<String> dataFiles = new ArrayList<>(List.of("contributors.tql", "publishers.tql", "books.tql"));
    int batchSize = 100;
    TypeDBCredential credential = new TypeDBCredential(username, password, true);

    try (TypeDBDriver driver = TypeDB.cloudDriver(addresses, credential)) {
        try (TypeDBSession session = driver.session(database, TypeDBSession.Type.DATA)) {
            BatchIterator batchIterator = new BatchIterator(dataFiles, batchSize);

            while (batchIterator.hasNext()) {
                List<String> batch = batchIterator.next();
                loadBatch(session, batch);
            }
        }
    }
}

During a transaction, the changes to the database must be held in memory until the commit. This means that if batches become too large, transactions will be slowed down. As a result, the optimum number of queries per batch depends on the queries being run as well as the configuration of the server. The simpler the queries and the more powerful the server, the larger the optimum batch size.

For most applications, the optimum batch size will likely be between 50 and 250 Insert queries.

It is important to note that if any query in the transaction causes an exception to be thrown, the entire transaction will fail. As a result, issuing queries that might fail, such as those leveraging key constraints, can be tricky when batching queries as the batch will need to be re-issued without the query that causes the exception.

Parallelizing transactions

To further increase the speed of bulk loading, we can open multiple transactions that will be processed on the server in parallel. To do so, our client-side code must utilise either concurrency or parallelism to simultaneously manage the open transactions.

Parellelizing transactions requires a more complex approach than is needed for query batching, so it is not recommended to parallelize transactions without also batching queries, as the performance gains will be similar in either individual case. Of course, the best performance is achieved by doing both.

The following code snippets show example implementations of transaction parallelization in combination with query batching using the TypeDB Python, Node.js, and Java drivers. It makes use of some of the code shown previously on this page: the batch iterators for all languages, and the loadBatch function for the Node.js example. The ways in which parallel transactions are issued differs significantly between different programming language paradigms:

Using Python

Python utilizes a global interpreter lock to guarantee thread safety. In order to parallelize transactions on the server, multiple Python processes must be started using the multiprocessing module, with each transaction managed by a separate process. In the code snippet below, we use a multiprocessing pool of batch_loader functions executed in parallel, with each consuming query batches from a shared queue. We cannot share the same driver and session instances between processes, so must open one of each per process. This is contrary to typical best practice for TypeDB: normally a program should only need one driver and session instance at a time.

Using Node.js

JavaScript and TypeScript use event loops to enable concurrency by default. To parallelize transactions on the server, we can simply start them serially and await them all at the end of the program. This could consume a large amount of memory if care is not taken, so it is best to limit the number of transactions open at any given time. In the code snippet below, we manage the transactions using the PromisePool class. Once the pool has been filled with the maximum number of concurrent transactions, it will block further ones from being added until one of the previous ones has completed.

Using Java (or other driver languages)

For Java, and most other languages, transactions can be parallelized on the server by making use of multithreading. In the code snippet below, we use a thread pool to schedule multiple batch loaders that will be executed concurrently, consuming query batches from a shared queue. Unlike the Python implementation, the batch loaders can share network resources, so only a single driver and session instance are required.

  • Python

  • Node.js

  • Java

def batch_loader(
    queue: Queue,
    addresses: str | list[str],
    username: str,
    password: str,
    database: str
) -> None:
    credential = TypeDBCredential(username, password, tls_enabled=True)

    with TypeDB.cloud_driver(addresses, credential) as driver:
        with driver.session(database, SessionType.DATA) as session:
            while True:
                batch: list[str] | None = queue.get()

                if batch is None:
                    break
                else:
                    with session.transaction(TransactionType.WRITE) as transaction:
                        for query in batch:
                            transaction.query.insert(query)

                        transaction.commit()


def load_data_async(addresses: str | list[str], username: str, password: str) -> None:
    database = "bookstore"
    data_files = ["contributors.tql", "publishers.tql", "books.tql"]
    batch_size = 100

    for data_file in data_files:
        with Manager() as manager:
            pool_size = os.cpu_count()
            pool = Pool(pool_size)
            queue = manager.Queue(4 * pool_size)

            kwargs = {
                "queue": queue,
                "addresses": addresses,
                "username": username,
                "password": password,
                "database": database,
            }

            for _ in range(pool_size):
                pool.apply_async(batch_loader, kwds=kwargs)

            for batch in batch_iterator([data_file], batch_size):
                queue.put(batch)

            for _ in range(pool_size):
                queue.put(None)  # Sentinel value

            pool.close()
            pool.join()
class PromisePool<T> {
    public size: number;
    public promises: Array<Promise<{index: number; result: T}>>;
    constructor(size: number) {
        this.size = size;
        this.promises = [];
    }

    async put(value: Promise<T>): Promise<void | T> {
        if (this.promises.length < this.size) {
            let index = this.promises.length;
            this.promises.push(value.then(result => ({index: index, result})));
            return;
        } else {
            let {index, result} = await Promise.race(this.promises)
            this.promises[index] = value.then(result => ({index: index, result}));
            return result;
        }
    }

    async join(): Promise<Array<T>> {
        let results: Array<T> = [];
        let outputs = await Promise.all(this.promises)

        for (let output of outputs) {
            results.push(output.result)
        }

        return results;
    }
}


async function loadDataAsync(addresses: Array<string>, username: string, password: string): Promise<void> {
    const database: string = "bookstore";
    const dataFiles: Array<string> = ["contributors.tql", "publishers.tql", "books.tql"];
    const batchSize: number = 100;

    let credential: TypeDBCredential = new TypeDBCredential(username, password);
    let driver: TypeDBDriver;

    try {
        driver = await TypeDB.cloudDriver(addresses, credential);
        let session: TypeDBSession;

        try {
            session = await driver.session(database, SessionType.DATA);

            for (let dataFile of dataFiles) {
                let poolSize = os.cpus().length;
                let promisePool = new PromisePool(poolSize);
                let batchIterator = new BatchIterator([dataFile], batchSize);

                for await (let batch of batchIterator) {
                    await promisePool.put(loadBatch(batch, session));
                }

                await promisePool.join();
            }
        } finally { await session?.close() }
    } finally { await driver?.close() }
}
public static CompletableFuture<Void> scheduleBatchLoader(
        ExecutorService executor,
        LinkedBlockingQueue<Either<List<String>, NullType>> queue,
        TypeDBSession session,
        AtomicBoolean hasError
) {
    return CompletableFuture.runAsync(() -> {
        Either<List<String>, NullType> queries;
        try {
            while ((queries = queue.take()).isFirst() && !hasError.get()) {
                try (TypeDBTransaction transaction = session.transaction(TypeDBTransaction.Type.WRITE)) {
                    for (String query: queries.first()) {
                        transaction.query().insert(query);
                    }
                    transaction.commit();
                }
            }
        } catch (Throwable e) {
            hasError.set(true);
            throw new RuntimeException(e);
        }
    }, executor);
}


public static void loadDataAsync(Set<String> addresses, String username, String password) throws InterruptedException, FileNotFoundException {
    String database = "bookstore";
    ArrayList<String> dataFiles = new ArrayList<>(List.of("contributors.tql", "publishers.tql", "books.tql"));
    int batchSize = 100;
    TypeDBCredential credential = new TypeDBCredential(username, password, true);

    try (TypeDBDriver driver = TypeDB.cloudDriver(addresses, credential)) {
        try (TypeDBSession session = driver.session(database, TypeDBSession.Type.DATA)) {
            for (String dataFile: dataFiles) {
                int poolSize = Runtime.getRuntime().availableProcessors();
                ExecutorService executor = Executors.newFixedThreadPool(poolSize, new NamedThreadFactory(database));
                LinkedBlockingQueue<Either<List<String>, NullType>> queue = new LinkedBlockingQueue<>(4 * poolSize);
                List<CompletableFuture<Void>> batchLoadersFutures = new ArrayList<>(poolSize);
                AtomicBoolean hasError = new AtomicBoolean(false);

                for (int i = 0; i < poolSize; i++) {
                    batchLoadersFutures.add(scheduleBatchLoader(executor, queue, session, hasError));
                }

                BatchIterator batchIterator = new BatchIterator(new ArrayList<>(List.of(dataFile)), batchSize);

                while (batchIterator.hasNext() && !hasError.get()) {
                    List<String> batch = batchIterator.next();
                    queue.put(Either.first(batch));
                }

                for (int i = 0; i < poolSize; i++) {
                    queue.put(Either.second(null));
                }

                CompletableFuture.allOf(batchLoadersFutures.toArray(new CompletableFuture[0])).join();
                executor.shutdown();
            }
        }
    }
}

There are additional considerations when parallelizing transactions. Because the data in each batch will be inserted simultaneously, it is essential to ensure that data is only loaded when data dependencies are already in place. In the above examples, we ensure that all the contributors and publishers are loaded before we attempt to load books by creating one batch iterator and one loading pool per file. Handling exceptions thrown by problematic queries is also harder.

Provide Feedback