Rust driver tutorial
In this tutorial, we’ll build a sample application with the Rust driver capable of basic interaction with TypeDB:
-
Connect to a TypeDB server,
-
Manage databases and transactions,
-
Send different types of queries.
Follow the steps below or see the full source code.
See the full source code
use std::{error::Error, fs, io, process};
use std::io::{BufRead, Write};
use futures_util::stream::TryStreamExt;
use futures_util::StreamExt;
use typedb_driver::{answer::{ConceptRow, JSON}, Credentials, DriverOptions, Error as TypeDBError, TransactionType, TypeDBDriver};
static DB_NAME: &str = "sample_app_db";
static SERVER_ADDR: &str = "127.0.0.1:1729";
enum Edition {
Core,
Cloud,
}
static TYPEDB_EDITION: Edition = Edition::Core;
static USERNAME: &str = "admin";
static PASSWORD: &str = "password";
async fn fetch_all_users(driver: &TypeDBDriver, db_name: &str) -> Result<Vec<JSON>, Box<dyn Error>> {
let tx = driver.transaction(db_name, TransactionType::Read).await?;
let response = tx.query("match $u isa user; fetch { 'phone': $u.phone, 'email': $u.email };").await?;
let documents = response.into_documents().try_collect::<Vec<_>>().await?;
let mut documents_json = vec![];
for (index, document) in documents.into_iter().enumerate() {
let as_json = document.into_json();
println!("User #{}: {}", index, &as_json);
documents_json.push(as_json);
}
if documents_json.len() > 0 {
Ok(documents_json)
} else {
Err(Box::new(TypeDBError::Other("Error: No users found in a database.".to_string())))
}
}
async fn insert_new_user(
driver: &TypeDBDriver,
db_name: &str,
new_email: &str,
new_phone: &str,
new_username: &str,
) -> Result<Vec<ConceptRow>, Box<dyn Error>> {
let tx = driver.transaction(db_name, TransactionType::Write).await?;
let response = tx.query(&format!(
"insert $u isa user, has $e, has $p, has $username; $e isa email '{}'; $p isa phone '{}'; $username isa username '{}';",
new_email, new_phone, new_username
)).await?;
let rows = response.into_rows().try_collect::<Vec<_>>().await?;
for row in &rows {
let email = row.get("e").unwrap().try_get_string().unwrap();
let phone = row.get("p").unwrap().try_get_string().unwrap();
println!("Added new user. Phone: {}, E-mail: {}", phone, email);
}
if rows.len() > 0 {
tx.commit().await?;
Ok(rows)
} else {
Err(Box::new(TypeDBError::Other("Error: No new users created.".to_string())))
}
}
async fn get_direct_relatives_by_email(
driver: &TypeDBDriver,
db_name: &str,
email: &str,
) -> Result<Vec<ConceptRow>, Box<dyn Error>> {
let tx = driver.transaction(db_name, TransactionType::Read).await?;
let rows = tx
.query(&format!("match $u isa user, has email '{}';", email)).await?
.into_rows()
.try_collect::<Vec<_>>()
.await?;
if rows.len() != 1 {
return Err(Box::new(TypeDBError::Other(format!("Found {} users with email {}, expected 1.", rows.len(), email))));
}
let relative_emails = tx
.query(&format!(
"match
$e == '{}';
$u isa user, has email $e;
$family isa family ($u, $relative);
$relative has username $username;
not {{ $u is $relative; }};
select $username;
sort $username asc;
",
email
)).await?
.into_rows()
.try_collect::<Vec<_>>()
.await?;
for (count, row) in relative_emails.iter().enumerate() {
println!("Relative #{}: {}", count + 1, row.get("username").unwrap().try_get_string().unwrap());
}
Ok(relative_emails)
}
async fn get_all_relatives_by_email(
driver: &TypeDBDriver,
db_name: &str,
email: &str,
) -> Result<Vec<ConceptRow>, Box<dyn Error>> {
let tx = driver.transaction(db_name, TransactionType::Read).await?;
let rows = tx
.query(&format!("match $u isa user, has email '{}';", email)).await?
.into_rows()
.try_collect::<Vec<_>>()
.await?;
if rows.len() != 1 {
return Err(Box::new(TypeDBError::Other(format!("Found {} users with email {}, expected 1.", rows.len(), email))));
}
let relative_emails = tx
.query(&format!(
"match
$u isa user, has email $e;
$e == '{}';
let $relative in all_relatives($u);
not {{ $u is $relative; }};
$relative has username $username;
select $username;
sort $username asc;
",
email
)).await?
.into_rows()
.try_collect::<Vec<_>>()
.await?;
for (count, row) in relative_emails.iter().enumerate() {
println!("Relative #{}: {}", count + 1, row.get("username").unwrap().try_get_string().unwrap());
}
Ok(relative_emails)
}
async fn update_phone_by_email(
driver: &TypeDBDriver,
db_name: &str,
email: &str,
old_phone: &str,
new_phone: &str,
) -> Result<Vec<ConceptRow>, Box<dyn Error>> {
let tx = driver.transaction(db_name, TransactionType::Write).await?;
let rows = tx
.query(&format!(
"match $u isa user, has email '{email}', has phone $phone; $phone == '{old_phone}';
delete $phone of $u;
insert $u has phone '{new_phone}';",
))
.await?
.into_rows()
.try_collect::<Vec<_>>()
.await?;
tx.commit().await?;
println!("Total number of users updated: {}", rows.len());
Ok(rows)
}
async fn delete_user_by_email(driver: &TypeDBDriver, db_name: &str, email: &str) -> Result<(), Box<dyn Error>> {
let tx = driver.transaction(db_name, TransactionType::Write).await?;
let rows = tx.query(&format!(
"match $u isa user, has email '{email}';
delete $u;"
))
.await?
.into_rows()
.try_collect::<Vec<_>>()
.await?;
println!("Deleted {} users", rows.len());
Ok(())
}
async fn queries(driver: &TypeDBDriver, db_name: &str) -> Result<(), Box<dyn Error>> {
println!("\nRequest 1 of 6: Fetch all users as JSON objects with emails and phone numbers");
let users = fetch_all_users(driver, db_name).await?;
assert_eq!(users.len(), 3);
let new_user_phone = "17778889999";
let new_user_email = "k.koolidge@typedb.com";
let new_user_username = "k-koolidge";
println!("\nRequest 2 of 6: Add a new user with the email {} and phone {}", new_user_email, new_user_phone);
let new_user = insert_new_user(driver, db_name, new_user_email, new_user_phone, new_user_username).await?;
assert_eq!(new_user.len(), 1);
let kevin_email = "kevin.morrison@typedb.com";
println!("\nRequest 3 of 6: Find direct relatives of a user with email {}", kevin_email);
let direct_relatives = get_direct_relatives_by_email(driver, db_name, kevin_email).await?;
assert_eq!(direct_relatives.len(), 1);
println!("\nRequest 4 of 6: Transitively find all relatives of a user with email {}", kevin_email);
let all_relatives = get_all_relatives_by_email(driver, db_name, kevin_email).await?;
assert_eq!(all_relatives.len(), 2);
let old_kevin_phone = "110000000";
let new_kevin_phone = "110000002";
println!("\nRequest 5 of 6: Update the phone of a of user with email {} from {} to {}", kevin_email, old_kevin_phone, new_kevin_phone);
let updated_users = update_phone_by_email(driver, db_name, kevin_email, old_kevin_phone, new_kevin_phone).await?;
assert!(updated_users.len() == 1);
println!("\nRequest 6 of 6: Delete the user with email {}", new_user_email);
delete_user_by_email(driver, db_name, new_user_email).await
}
// WARNING: keep when changing the AsRef and signatures, ensure they aren't required as-is for code snippets throughout docs
async fn driver_connect(
edition: &Edition,
uri: &str,
username: impl AsRef<str>,
password: impl AsRef<str>,
) -> Result<TypeDBDriver, typedb_driver::Error> {
let username = username.as_ref();
let password = password.as_ref();
match edition {
Edition::Core => {
#[allow(clippy::let_and_return, reason = "tutorial readability")]
let driver = TypeDBDriver::new_core(
&uri,
Credentials::new(username, password),
DriverOptions::new(false, None).unwrap(),
).await;
driver
}
Edition::Cloud => {
#[allow(clippy::let_and_return, reason = "tutorial readability")]
let driver = TypeDBDriver::new_cloud(
&vec![&uri],
Credentials::new(username, password),
DriverOptions::new(true, None).unwrap(),
).await;
driver
}
}
}
async fn create_database(driver: &TypeDBDriver, db_name: impl AsRef<str>) -> Result<bool, Box<dyn Error>> {
print!("Creating a new database...");
let result = driver.databases().create(db_name.as_ref()).await;
match result {
Ok(_) => println!("OK"),
Err(err) => return Err(Box::new(TypeDBError::Other(format!("Failed to create a DB, due to: {}", err)))),
};
db_schema_setup(driver, db_name.as_ref(), "schema.tql").await?;
db_dataset_setup(driver, db_name.as_ref(), "data_small_single_query.tql").await?;
return Ok(true);
}
async fn replace_database(driver: &TypeDBDriver, db_name: &str) -> Result<bool, Box<dyn Error>> {
print!("Deleting an existing database...");
let deletion_result = driver.databases().get(db_name).await?.delete().await;
match deletion_result {
Ok(_) => println!("OK"),
Err(err) => return Err(Box::new(TypeDBError::Other(format!("Failed to delete a database, due to: {}", err))))
};
let creation_result = create_database(&driver, db_name).await;
match creation_result {
Ok(_) => return Ok(true),
Err(err) => return Err(Box::new(TypeDBError::Other(format!("Failed to create a new database, due to: {}", err))))
};
}
async fn db_schema_setup(driver: &TypeDBDriver, db_name: &str, schema_file_path: &str) -> Result<(), TypeDBError> {
let tx = driver.transaction(db_name, TransactionType::Schema).await?;
let schema_query = fs::read_to_string(schema_file_path)
.map_err(|err| TypeDBError::Other(format!("Error loading file content from '{schema_file_path}', due to: {}", err)))?;
print!("Defining schema...");
let response = tx.query(&schema_query).await?;
assert!(response.is_ok());
tx.commit().await?;
println!("OK");
Ok(())
}
async fn db_dataset_setup(driver: &TypeDBDriver, db_name: &str, data_file_path: &str) -> Result<(), Box<dyn Error>> {
let tx = driver.transaction(db_name, TransactionType::Write).await?;
let data = fs::read_to_string(data_file_path)
.map_err(|err| TypeDBError::Other(format!("Error loading file content from '{data_file_path}', due to: {}", err)))?;
print!("Loading data...");
let response = tx.query(&data).await?;
assert!(response.is_row_stream());
let results = response.into_rows().try_collect::<Vec<_>>().await?;
assert!(results.len() > 0);
tx.commit().await?;
println!("OK");
Ok(())
}
async fn validate_data(driver: &TypeDBDriver, db_name: &str) -> Result<bool, Box<dyn Error>> {
let tx = driver.transaction(db_name, TransactionType::Read).await?;
let count_query = "match $u isa user; reduce $count = count;";
print!("Validating the dataset...");
let response = tx.query(count_query).await?;
assert!(response.is_row_stream());
let row = response.into_rows().next().await.unwrap()?;
let count = row.get("count").unwrap().try_get_integer().unwrap();
if count == 3 {
println!("OK");
Ok(true)
} else {
Err(Box::new(TypeDBError::Other(format!("Validation failed, unexpected number of users: {}. Terminating...", count))))
}
}
async fn db_setup(driver: &TypeDBDriver, db_name: &str, db_reset: bool) -> Result<bool, Box<dyn Error>> {
println!("Setting up the database: {}", db_name);
if driver.databases().contains(db_name).await? {
if db_reset {
replace_database(&driver, db_name).await?;
} else {
print!("Found a pre-existing database. Do you want to replace it? (Y/N) ");
io::stdout().flush()?;
let answer = io::stdin().lock().lines().next().unwrap().unwrap();
if answer.trim().to_lowercase() == "y" {
replace_database(&driver, db_name).await?;
} else {
println!("Reusing an existing database.");
}
}
} else {
// No such database found on the server
create_database(&driver, db_name).await?;
}
validate_data(&driver, db_name).await
}
#[tokio::main]
async fn main() {
println!("Sample App");
let driver = driver_connect(&TYPEDB_EDITION, SERVER_ADDR, USERNAME, PASSWORD).await
.map_err(|err| {
println!("{err}");
process::exit(1);
})
.unwrap();
db_setup(&driver, DB_NAME, false).await
.map_err(|err| {
println!("{err}");
process::exit(1);
})
.unwrap();
queries(&driver, DB_NAME).await
.map_err(|err| {
println!("{err}");
process::exit(1);
})
.unwrap();
}
Environment setup
To run this sample application, you’ll need:
-
TypeDB: either a TypeDB Cloud cluster or a self-hosted deployment. For installation instructions, see the Installation manual page.
-
Rust, cargo, and TypeDB Rust driver. For the driver installation instructions, see the Rust driver page. If you see "unstable features"-related errors while running the example, to update cargo through
rustup update
.
The Sample application below is asynchronous.
Make sure to add the driver crate either in the Cargo.toml
or via the following command:
cargo add typedb-driver
Add -F sync
to the line above if you want to use a sync version of the driver for your applications.
Imported modules
To be able to use the TypeDB Rust driver API in the Sample application, use the following import statements:
use std::{error::Error, fs, io, process};
use std::io::{BufRead, Write};
use futures_util::stream::TryStreamExt;
use futures_util::StreamExt;
use typedb_driver::{answer::{ConceptRow, JSON}, Credentials, DriverOptions, Error as TypeDBError, TransactionType, TypeDBDriver};
Default values
We store default values as constants in the source code:
static DB_NAME: &str = "sample_app_db";
static SERVER_ADDR: &str = "127.0.0.1:1729";
enum Edition {
Core,
Cloud,
}
static TYPEDB_EDITION: Edition = Edition::Core;
static USERNAME: &str = "admin";
static PASSWORD: &str = "password";
where DB_NAME
— the name of the database to use;
SERVER_ADDR
— address of the TypeDB server to connect to;
TYPEDB_EDITION
— TypeDB Community Edition or Cloud edition selector;
USERNAME
/PASSWORD
— authentication credentials.
Program structure
The main workflow of this sample application includes establishing a connection to TypeDB, database setup, and querying.
#[tokio::main]
async fn main() {
println!("Sample App");
let driver = driver_connect(&TYPEDB_EDITION, SERVER_ADDR, USERNAME, PASSWORD).await
.map_err(|err| {
println!("{err}");
process::exit(1);
})
.unwrap();
db_setup(&driver, DB_NAME, false).await
.map_err(|err| {
println!("{err}");
process::exit(1);
})
.unwrap();
queries(&driver, DB_NAME).await
.map_err(|err| {
println!("{err}");
process::exit(1);
})
.unwrap();
}
The entire main()
function code is executed in the context of the network connection, represented by the driver
object that is returned by the function.
TypeDB connection
The driver_connect()
function takes edition
, uri
, and the credentials as mandatory parameters.
async fn driver_connect(
edition: &Edition,
uri: &str,
username: impl AsRef<str>,
password: impl AsRef<str>,
) -> Result<TypeDBDriver, typedb_driver::Error> {
let username = username.as_ref();
let password = password.as_ref();
match edition {
Edition::Core => {
#[allow(clippy::let_and_return, reason = "tutorial readability")]
let driver = TypeDBDriver::new_core(
&uri,
Credentials::new(username, password),
DriverOptions::new(false, None).unwrap(),
).await;
driver
}
Edition::Cloud => {
#[allow(clippy::let_and_return, reason = "tutorial readability")]
let driver = TypeDBDriver::new_cloud(
&vec![&uri],
Credentials::new(username, password),
DriverOptions::new(true, None).unwrap(),
).await;
driver
}
}
}
The edition
is expected to be an Enum for selecting a TypeDB edition.
Depending on the TypeDB edition selected, this function initializes either a TypeDB Community Edition or a TypeDB Cloud / Enterprise connection.
TypeDB connections require objects of the Credentials
(authentication credentials) and DriverOptions
(driver-specific connection options like TLS settings) classes.
For our sample application, we have suitable default values set for all editions.
Database setup
To set up a TypeDB database, we need to make sure that it exists and has the correct schema and data. First, we check whether a database with the provided name already exists on the server.
If such a database doesn’t exist, we create a new database, define its schema, and load initial data.
To prevent data loss, avoid deleting an existing database without confirmation from a user. |
If a database with the specified name already exists, we check whether we need to replace it.
To do so, we check the db_reset
parameter, and, if it’s false
, ask for an input from a user.
If any of the two suggesting replacement of the database is acceptable, we replace the database by deleting the existing database and then creating a new one.
As the final step of the database setup, we test it.
async fn db_setup(driver: &TypeDBDriver, db_name: &str, db_reset: bool) -> Result<bool, Box<dyn Error>> {
println!("Setting up the database: {}", db_name);
if driver.databases().contains(db_name).await? {
if db_reset {
replace_database(&driver, db_name).await?;
} else {
print!("Found a pre-existing database. Do you want to replace it? (Y/N) ");
io::stdout().flush()?;
let answer = io::stdin().lock().lines().next().unwrap().unwrap();
if answer.trim().to_lowercase() == "y" {
replace_database(&driver, db_name).await?;
} else {
println!("Reusing an existing database.");
}
}
} else {
// No such database found on the server
create_database(&driver, db_name).await?;
}
validate_data(&driver, db_name).await
}
Creating a new database
We create a new database with the specified name (sample_app_db
by default) and call functions to define its schema and load initial data.
async fn create_database(driver: &TypeDBDriver, db_name: impl AsRef<str>) -> Result<bool, Box<dyn Error>> {
print!("Creating a new database...");
let result = driver.databases().create(db_name.as_ref()).await;
match result {
Ok(_) => println!("OK"),
Err(err) => return Err(Box::new(TypeDBError::Other(format!("Failed to create a DB, due to: {}", err)))),
};
db_schema_setup(driver, db_name.as_ref(), "schema.tql").await?;
db_dataset_setup(driver, db_name.as_ref(), "data_small_single_query.tql").await?;
return Ok(true);
}
Replacing a database
We delete a database with the specified name (sample_app_db
by default) and call a function to create a new one instead:
async fn replace_database(driver: &TypeDBDriver, db_name: &str) -> Result<bool, Box<dyn Error>> {
print!("Deleting an existing database...");
let deletion_result = driver.databases().get(db_name).await?.delete().await;
match deletion_result {
Ok(_) => println!("OK"),
Err(err) => return Err(Box::new(TypeDBError::Other(format!("Failed to delete a database, due to: {}", err))))
};
let creation_result = create_database(&driver, db_name).await;
match creation_result {
Ok(_) => return Ok(true),
Err(err) => return Err(Box::new(TypeDBError::Other(format!("Failed to create a new database, due to: {}", err))))
};
}
Defining a schema
We use a define query to define a schema for the newly created database:
async fn db_schema_setup(driver: &TypeDBDriver, db_name: &str, schema_file_path: &str) -> Result<(), TypeDBError> {
let tx = driver.transaction(db_name, TransactionType::Schema).await?;
let schema_query = fs::read_to_string(schema_file_path)
.map_err(|err| TypeDBError::Other(format!("Error loading file content from '{schema_file_path}', due to: {}", err)))?;
print!("Defining schema...");
let response = tx.query(&schema_query).await?;
assert!(response.is_ok());
tx.commit().await?;
println!("OK");
Ok(())
}
The schema for the sample application is stored in the schema.tql file.
See the full schema
define
entity content @abstract,
owns id @key;
entity page @abstract, sub content,
owns page-id,
owns name,
owns bio,
owns profile-picture,
plays posting:page,
plays following:page;
entity profile @abstract, sub page,
owns username,
owns name @card(0..3),
plays group-membership:member,
plays location:located,
plays content-engagement:author,
plays following:follower;
entity user sub profile,
owns email,
owns phone @regex("^\d{8,15}$") @unique,
owns karma,
owns relationship-status,
plays friendship:friend,
plays family:relative,
plays relationship:partner,
plays marriage:spouse,
plays employment:employee;
relation social-relation @abstract,
relates related @card(0..);
relation friendship sub social-relation,
relates friend as related @card(0..);
relation family sub social-relation,
relates relative as related @card(0..1000);
relation relationship sub social-relation,
relates partner as related,
owns start-date;
relation marriage sub relationship,
relates spouse as partner,
owns exact-date,
plays location:located;
entity organisation sub profile,
owns tag @card(0..100),
plays employment:employer;
entity company sub organisation;
entity charity sub organisation;
relation employment,
relates employer,
relates employee,
owns start-date,
owns end-date;
entity group sub page,
owns group-id,
owns tag @card(0..100),
plays group-membership:group;
relation group-membership,
relates group,
relates member,
owns start-timestamp,
owns end-timestamp;
entity post @abstract, sub content,
owns post-id,
owns post-text,
owns creation-timestamp @range(1970-01-01T00:00:00..),
owns tag @card(0..10),
plays posting:post,
plays commenting:parent,
plays reaction:parent,
plays location:located;
entity text-post sub post;
entity image-post sub post,
owns post-image;
entity comment sub content,
owns comment-id,
owns comment-text,
owns creation-timestamp,
owns tag @card(0..5),
plays commenting:comment,
plays commenting:parent,
plays reaction:parent;
relation interaction @abstract,
relates subject @abstract,
relates content;
relation content-engagement @abstract, sub interaction,
relates author as subject;
relation posting sub content-engagement,
relates page as content,
relates post @card(0..1000);
relation commenting sub content-engagement,
relates parent as content,
relates comment;
relation reaction sub content-engagement,
relates parent as content,
owns emoji @values("like", "love", "funny", "surprise", "sad", "angry"),
owns creation-timestamp;
relation following,
relates follower,
relates page;
entity place,
owns place-id,
owns name,
plays location:place;
entity country sub place,
plays city-location:parent;
entity city sub place,
plays city-location:city;
relation location,
relates place,
relates located;
relation city-location sub location,
relates parent as place,
relates city as located;
attribute id @abstract, value string;
attribute page-id @abstract, sub id;
attribute username sub page-id;
attribute group-id sub page-id;
attribute post-id sub id;
attribute comment-id sub id;
attribute place-id sub id;
attribute name value string;
attribute email value string @regex("^.*@\w+\.\w+$");
attribute phone value string;
attribute karma value double;
attribute relationship-status value string @values("single", "married", "other");
attribute latitude value double;
attribute longitude value double;
attribute event-date @abstract, value datetime;
attribute start-date sub event-date;
attribute end-date sub event-date;
attribute exact-date @abstract, sub event-date;
attribute payload @abstract, value string;
attribute text-payload @abstract, sub payload;
attribute image-payload @abstract, sub payload;
attribute bio sub text-payload;
attribute comment-text sub text-payload;
attribute post-text sub text-payload;
attribute post-image sub image-payload;
attribute profile-picture sub image-payload;
attribute tag value string;
attribute emoji value string;
attribute creation-timestamp value datetime;
attribute start-timestamp value datetime;
attribute end-timestamp value datetime;
fun all_relatives($user: user) -> { user }:
match
$relative isa user;
{
family (relative: $user, relative: $relative);
} or {
let $intermediate in all_relatives($user);
family (relative: $intermediate, relative: $relative);
};
return { $relative };
We use a database name passed as a parameter to open a transaction. Then we send the contents of the file as a TypeQL define query and commit the changes made by the transaction.
Loading initial data
With the schema defined, we can load initial data into our database with the insert query:
async fn db_dataset_setup(driver: &TypeDBDriver, db_name: &str, data_file_path: &str) -> Result<(), Box<dyn Error>> {
let tx = driver.transaction(db_name, TransactionType::Write).await?;
let data = fs::read_to_string(data_file_path)
.map_err(|err| TypeDBError::Other(format!("Error loading file content from '{data_file_path}', due to: {}", err)))?;
print!("Loading data...");
let response = tx.query(&data).await?;
assert!(response.is_row_stream());
let results = response.into_rows().try_collect::<Vec<_>>().await?;
assert!(results.len() > 0);
tx.commit().await?;
println!("OK");
Ok(())
}
We read the data_small_single_query.tql file, send its contents as a single query, and then commit the changes.
See the full insert query
insert
$u1 isa user,
has username "masako-holley",
has phone "185800100011",
has email "masako.holley@typedb.com";
$u2 isa user,
has username "pearle-goodman",
has phone "171255522222",
has email "pearle.goodman@typedb.com";
$u3 isa user,
has username "kevin-morrison",
has phone "110000000",
has email "kevin.morrison@typedb.com";
$relatives1 isa family (relative: $u1, relative: $u2);
$relatives2 isa family (relative: $u2, relative: $u3);
Testing a database
With the schema defined and data loaded, we test our database to make sure it’s ready. To test the database, we send a query to count the number of users in the database:
async fn validate_data(driver: &TypeDBDriver, db_name: &str) -> Result<bool, Box<dyn Error>> {
let tx = driver.transaction(db_name, TransactionType::Read).await?;
let count_query = "match $u isa user; reduce $count = count;";
print!("Validating the dataset...");
let response = tx.query(count_query).await?;
assert!(response.is_row_stream());
let row = response.into_rows().next().await.unwrap()?;
let count = row.get("count").unwrap().try_get_integer().unwrap();
if count == 3 {
println!("OK");
Ok(true)
} else {
Err(Box::new(TypeDBError::Other(format!("Validation failed, unexpected number of users: {}. Terminating...", count))))
}
}
Query examples
After database setup is complete, we proceed with querying our database with different types of queries in the
queries()
function:
async fn queries(driver: &TypeDBDriver, db_name: &str) -> Result<(), Box<dyn Error>> {
println!("\nRequest 1 of 6: Fetch all users as JSON objects with emails and phone numbers");
let users = fetch_all_users(driver, db_name).await?;
assert_eq!(users.len(), 3);
let new_user_phone = "17778889999";
let new_user_email = "k.koolidge@typedb.com";
let new_user_username = "k-koolidge";
println!("\nRequest 2 of 6: Add a new user with the email {} and phone {}", new_user_email, new_user_phone);
let new_user = insert_new_user(driver, db_name, new_user_email, new_user_phone, new_user_username).await?;
assert_eq!(new_user.len(), 1);
let kevin_email = "kevin.morrison@typedb.com";
println!("\nRequest 3 of 6: Find direct relatives of a user with email {}", kevin_email);
let direct_relatives = get_direct_relatives_by_email(driver, db_name, kevin_email).await?;
assert_eq!(direct_relatives.len(), 1);
println!("\nRequest 4 of 6: Transitively find all relatives of a user with email {}", kevin_email);
let all_relatives = get_all_relatives_by_email(driver, db_name, kevin_email).await?;
assert_eq!(all_relatives.len(), 2);
let old_kevin_phone = "110000000";
let new_kevin_phone = "110000002";
println!("\nRequest 5 of 6: Update the phone of a of user with email {} from {} to {}", kevin_email, old_kevin_phone, new_kevin_phone);
let updated_users = update_phone_by_email(driver, db_name, kevin_email, old_kevin_phone, new_kevin_phone).await?;
assert!(updated_users.len() == 1);
println!("\nRequest 6 of 6: Delete the user with email {}", new_user_email);
delete_user_by_email(driver, db_name, new_user_email).await
}
The queries are as follows:
-
Fetch query — to retrieve information in a JSON format
-
Insert query — to insert new data into the database
-
Match query — to retrieve data from the database as rows
-
Match query with a function call — to retrieve data from the database as rows using functions
-
Update query — to replace data in the database
-
Delete query — to delete data from the database
Every query is implemented as a function that includes some output of the query response and returns some meaningful data.
Fetch query
Fetching allows you to retrieve data from a TypeDB database as JSON documents.
Let’s use a fetch pipeline to fetch phone
s and email
s for all user
s in the database:
async fn fetch_all_users(driver: &TypeDBDriver, db_name: &str) -> Result<Vec<JSON>, Box<dyn Error>> {
let tx = driver.transaction(db_name, TransactionType::Read).await?;
let response = tx.query("match $u isa user; fetch { 'phone': $u.phone, 'email': $u.email };").await?;
let documents = response.into_documents().try_collect::<Vec<_>>().await?;
let mut documents_json = vec![];
for (index, document) in documents.into_iter().enumerate() {
let as_json = document.into_json();
println!("User #{}: {}", index, &as_json);
documents_json.push(as_json);
}
if documents_json.len() > 0 {
Ok(documents_json)
} else {
Err(Box::new(TypeDBError::Other("Error: No users found in a database.".to_string())))
}
}
We get the response as a stream of results, containing JSONs (ConceptDocument
s).
We create a result
variable to store the results and iterate through stream to print and store JSONs.
Insert query
Let’s insert a new user
with username
, phone
, and email
attributes to the database.
async fn insert_new_user(
driver: &TypeDBDriver,
db_name: &str,
new_email: &str,
new_phone: &str,
new_username: &str,
) -> Result<Vec<ConceptRow>, Box<dyn Error>> {
let tx = driver.transaction(db_name, TransactionType::Write).await?;
let response = tx.query(&format!(
"insert $u isa user, has $e, has $p, has $username; $e isa email '{}'; $p isa phone '{}'; $username isa username '{}';",
new_email, new_phone, new_username
)).await?;
let rows = response.into_rows().try_collect::<Vec<_>>().await?;
for row in &rows {
let email = row.get("e").unwrap().try_get_string().unwrap();
let phone = row.get("p").unwrap().try_get_string().unwrap();
println!("Added new user. Phone: {}, E-mail: {}", phone, email);
}
if rows.len() > 0 {
tx.commit().await?;
Ok(rows)
} else {
Err(Box::new(TypeDBError::Other("Error: No new users created.".to_string())))
}
}
The insert query returns a stream of ConceptRow
s: one for every insert
clause execution.
We collect the stream to a vector to store the inserted data.
Then we print phone
s and email
s by iterating though the vector of ConceptRow
s, commit the changes, and return the stored vector.
Since the insert query has no match
clause, the insert
clause is executed exactly once.
Insert queries always return ConceptRow
s, which represent an inserted result: column names (variable names) and their respective concepts.
Match query
If we don’t need to convert the result into a JSON document, we can use a single match stage, which returns ConceptRow
s similarly to insert.
Let’s retrieve all direct relatives for a user
using its email
.
async fn get_direct_relatives_by_email(
driver: &TypeDBDriver,
db_name: &str,
email: &str,
) -> Result<Vec<ConceptRow>, Box<dyn Error>> {
let tx = driver.transaction(db_name, TransactionType::Read).await?;
let rows = tx
.query(&format!("match $u isa user, has email '{}';", email)).await?
.into_rows()
.try_collect::<Vec<_>>()
.await?;
if rows.len() != 1 {
return Err(Box::new(TypeDBError::Other(format!("Found {} users with email {}, expected 1.", rows.len(), email))));
}
let relative_emails = tx
.query(&format!(
"match
$e == '{}';
$u isa user, has email $e;
$family isa family ($u, $relative);
$relative has username $username;
not {{ $u is $relative; }};
select $username;
sort $username asc;
",
email
)).await?
.into_rows()
.try_collect::<Vec<_>>()
.await?;
for (count, row) in relative_emails.iter().enumerate() {
println!("Relative #{}: {}", count + 1, row.get("username").unwrap().try_get_string().unwrap());
}
Ok(relative_emails)
}
The get_direct_relatives_by_email()
function checks that there is only one user
matched with the email
provided by an input parameter.
It then executes the query to find the relatives, collects the results, and iterates through them to print the username
of every matched relative.
For bigger numbers of results it might be faster to iterate through a stream, rather than collect and store the results first. |
Match query with a function call
Let’s change the query a little to get not only direct relatives, but all relatives of a user
.
Additionally, let’s use a TypeDB function called all_relatives
that we previously defined in the schema.
async fn get_all_relatives_by_email(
driver: &TypeDBDriver,
db_name: &str,
email: &str,
) -> Result<Vec<ConceptRow>, Box<dyn Error>> {
let tx = driver.transaction(db_name, TransactionType::Read).await?;
let rows = tx
.query(&format!("match $u isa user, has email '{}';", email)).await?
.into_rows()
.try_collect::<Vec<_>>()
.await?;
if rows.len() != 1 {
return Err(Box::new(TypeDBError::Other(format!("Found {} users with email {}, expected 1.", rows.len(), email))));
}
let relative_emails = tx
.query(&format!(
"match
$u isa user, has email $e;
$e == '{}';
let $relative in all_relatives($u);
not {{ $u is $relative; }};
$relative has username $username;
select $username;
sort $username asc;
",
email
)).await?
.into_rows()
.try_collect::<Vec<_>>()
.await?;
for (count, row) in relative_emails.iter().enumerate() {
println!("Relative #{}: {}", count + 1, row.get("username").unwrap().try_get_string().unwrap());
}
Ok(relative_emails)
}
Update query
Let’s replace a phone
of one of the user
s by a new one.
We can do that by deleting ownership of the old path attribute from the file entity and assigning it with ownership of the new path attribute:
async fn update_phone_by_email(
driver: &TypeDBDriver,
db_name: &str,
email: &str,
old_phone: &str,
new_phone: &str,
) -> Result<Vec<ConceptRow>, Box<dyn Error>> {
let tx = driver.transaction(db_name, TransactionType::Write).await?;
let rows = tx
.query(&format!(
"match $u isa user, has email '{email}', has phone $phone; $phone == '{old_phone}';
delete $phone of $u;
insert $u has phone '{new_phone}';",
))
.await?
.into_rows()
.try_collect::<Vec<_>>()
.await?;
tx.commit().await?;
println!("Total number of users updated: {}", rows.len());
Ok(rows)
}
Here, you can notice how both delete
and insert
pipeline stages reuse the same variable from the first match
stage.
Executing both write stages in a single transaction isolates these changes from other transactions: there won’t be any point of time for other TypeDB users where they won’t see a user
s phone
.
Moreover, if any other transaction makes a conflicting change before we commit this transaction, then our transaction fails upon a commit.
Delete query
Finally, let’s delete a user
by a given email
.
It can be simply done by a single statement inside a match
, and a very short delete
operation.
async fn delete_user_by_email(driver: &TypeDBDriver, db_name: &str, email: &str) -> Result<(), Box<dyn Error>> {
let tx = driver.transaction(db_name, TransactionType::Write).await?;
let rows = tx.query(&format!(
"match $u isa user, has email '{email}';
delete $u;"
))
.await?
.into_rows()
.try_collect::<Vec<_>>()
.await?;
println!("Deleted {} users", rows.len());
Ok(())
}