TypeDB 3.0 is live! Get started for free.

Python driver tutorial

In this tutorial, we’ll build a sample application with the Python driver capable of basic interaction with TypeDB:

  • Connect to a TypeDB server,

  • Manage databases and transactions,

  • Send different types of queries.

Follow the steps below or see the full source code.

See the full source code
from typedb.driver import TypeDB, TransactionType, Credentials, DriverOptions
from enum import Enum


DB_NAME = "sample_app_db"
SERVER_ADDR = "127.0.0.1:1729"

class Edition(Enum):
    Cloud = 1
    Core = 2

TYPEDB_EDITION = Edition.Core
USERNAME = "admin"
PASSWORD = "password"


def create_database(driver, db_name) -> bool:
    print("Creating a new database", end="...")
    driver.databases.create(db_name)
    print("OK")
    db_schema_setup(driver, db_name)
    db_dataset_setup(driver, db_name)
    return True


def replace_database(driver, db_name) -> bool:
    print("Deleting an existing database", end="...")
    driver.databases.get(db_name).delete()  # Delete the database if it exists already
    print("OK")
    if create_database(driver, db_name):
        return True
    else:
        print("Failed to create a new database. Terminating...")
        return False


def db_schema_setup(driver, db_name, schema_file='schema.tql'):
    with open(schema_file, 'r') as data:
        define_query = data.read()
    with driver.transaction(db_name, TransactionType.SCHEMA) as tx:
        print("Defining schema", end="...")
        tx.query(define_query).resolve()
        tx.commit()
        print("OK")


def db_dataset_setup(driver, db_name, data_file='data_small_single_query.tql'):
    with open(data_file, 'r') as data:
        insert_query = data.read()
    with driver.transaction(db_name, TransactionType.WRITE) as tx:
        print("Loading data", end="...")
        tx.query(insert_query).resolve()
        tx.commit()
        print("OK")


def validate_data(driver, db_name) -> bool:
    with driver.transaction(db_name, TransactionType.READ) as tx:
        count_query = "match $u isa user; reduce $count = count;"
        print("Testing the dataset", end="...")
        count = next(tx.query(count_query).resolve().as_concept_rows()).get("count").try_get_integer()
        if count == 3:
            print("Passed")
            return True
        else:
            print("Validation failed, unexpected number of users:", count, "\n Expected result: 3. Terminating...")
            return False


def db_setup(driver, db_name, db_reset=False) -> bool:
    print(f"Setting up the database: {db_name}")
    if driver.databases.contains(db_name):
        if db_reset or (input("Found a pre-existing database. Do you want to replace it? (Y/N) ").lower() == "y"):
            if not replace_database(driver, db_name):
                return False
        else:
            print("Reusing an existing database.")
    else:  # No such database found on the server
        if not create_database(driver, db_name):
            print("Failed to create a new database. Terminating...")
            return False
    if driver.databases.contains(db_name):
        return validate_data(driver, db_name)
    else:
        print("Database not found. Terminating...")
        return False


def fetch_all_users(driver, db_name) -> list:
    with driver.transaction(db_name, TransactionType.READ) as tx:
        query = "match $u isa user; fetch { 'phone': $u.phone, 'email': $u.email };"
        answers = list(tx.query(query).resolve().as_concept_documents())
        for i, JSON in enumerate(answers, start=0):
            print(f"JSON #{i}: {JSON}")
        return answers


def insert_new_user(driver, db_name, email, phone, username) -> list:
    with driver.transaction(db_name, TransactionType.WRITE) as tx:
        query = f"""
        insert 
          $u isa user, has $e, has $p, has $username; 
          $e isa email '{email}'; 
          $p isa phone '{phone}'; 
          $username isa username '{username}';
        """
        answers = list(tx.query(query).resolve().as_concept_rows())
        tx.commit()
        for i, row in enumerate(answers, start=1):
            phone = row.get("p").try_get_string()
            email = row.get("e").try_get_string()
            print(f"Added new user. Phone: {phone}, E-mail: {email}")
        return answers


def get_direct_relatives_by_email(driver, db_name, email):
    with driver.transaction(db_name, TransactionType.READ) as tx:
        users = list(tx.query(f"match $u isa user, has email '{email}';").resolve().as_concept_rows())
        users_len = len(users)
        if users_len == 1:
            answers = list(tx.query(f"""
              match 
                $e == '{email}';
                $u isa user, has email $e;
                $family isa family ($u, $relative);
                $relative has username $username;
                not {{ $u is $relative; }};
              select $username;
              sort $username asc;
            """).resolve().as_concept_rows())
            for row in answers:
                print(f"Relative: {row.get('username').try_get_string()}")
            if len(answers) == 0:
                print("No relatives found.")
            return answers
        else:
            print(f"Error: Found {users_len} users, expected 1.")
            return None


def get_all_relatives_by_email(driver, db_name, email):
    with driver.transaction(db_name, TransactionType.READ) as tx:
        users = list(tx.query(f"match $u isa user, has email '{email}';").resolve().as_concept_rows())
        users_len = len(users)
        if users_len == 1:
            answers = list(tx.query(f"""
              match 
                $u isa user, has email $e;
                $e == '{email}';
                let $relative in all_relatives($u);
                not {{ $u is $relative; }};
                $relative has username $username;
                select $username;
                sort $username asc;
            """).resolve().as_concept_rows())
            for row in answers:
                print(f"Relative: {row.get('username').try_get_string()}")
            if len(answers) == 0:
                print("No relatives found.")
            return answers
        else:
            print(f"Error: Found {users_len} users, expected 1.")
            return None


def update_phone_by_email(driver, db_name, email, old, new):
    with driver.transaction(db_name, TransactionType.WRITE) as tx:
        answers = list(tx.query(f"""
          match $u isa user, has email '{email}', has phone $phone; $phone == '{old}';
          update $u has phone '{new}';
        """).resolve().as_concept_rows())
        tx.commit()
        answers_len = len(answers)
        if answers_len == 0:
            print("Error: No phones updated")
            return None
        else:
            print(f"Total number of phones updated: {len(answers)}")
            return answers


def delete_user_by_email(driver, db_name, email):
    with driver.transaction(db_name, TransactionType.WRITE) as tx:
        answers = list(tx.query(f"match $u isa user, has email '{email}'; delete $u;").resolve().as_concept_rows())
        tx.commit()
        answers_len = len(answers)
        if answers_len == 0:
            print("Error: No users deleted")
            return None
        else:
            print(f"Total number of users deleted: {len(answers)}")
            return answers


def driver_connect(edition, uri, username=USERNAME, password=PASSWORD):
    if edition is Edition.Core:
        driver = TypeDB.core_driver(uri, Credentials(username, password), DriverOptions(False, None))
        return driver
    if edition is Edition.Cloud:
        driver = TypeDB.cloud_driver([uri], Credentials(username, password), DriverOptions(False, None))
        return driver


def queries(driver, db_name):
    print("\nRequest 1 of 6: Fetch all users as JSON objects with emails and phone numbers")
    users = fetch_all_users(driver, DB_NAME)
    assert len(users) == 3

    new_phone = "17778889999"
    new_email = "jk@typedb.com"
    new_username = "k-koolidge"
    print(f"\nRequest 2 of 6: Add a new user with the email {new_email} and phone {new_phone}")
    insert_new_user(driver, DB_NAME, new_email, new_phone, new_username)

    kevin_email = "kevin.morrison@typedb.com"
    print(f"\nRequest 3 of 6: Find direct relatives of a user with email {kevin_email}")
    relatives = get_direct_relatives_by_email(driver, DB_NAME, kevin_email)
    assert relatives is not None
    assert len(relatives) == 1

    print(f"\nRequest 4 of 6: Transitively find all relatives of a user with email {kevin_email}")
    relatives = get_all_relatives_by_email(driver, DB_NAME, kevin_email)
    assert relatives is not None
    assert len(relatives) == 2

    old_kevin_phone = "110000000"
    new_kevin_phone = "110000002"
    print(f"\nRequest 5 of 6: Update the phone of a of user with email {kevin_email} from {old_kevin_phone} to {new_kevin_phone}")
    updated_users = update_phone_by_email(driver, DB_NAME, kevin_email, old_kevin_phone, new_kevin_phone)
    assert updated_users is not None
    assert len(updated_users) == 1

    print(f'\nRequest 6 of 6: Delete the user with email "{new_email}"')
    delete_user_by_email(driver, DB_NAME, new_email)


def main():
    with driver_connect(TYPEDB_EDITION, SERVER_ADDR) as driver:
        if db_setup(driver, DB_NAME, db_reset=False):
            queries(driver, DB_NAME)
        else:
            print("Terminating...")
            exit()


if __name__ == "__main__":
    main()

Environment setup

To run this sample application, you’ll need:

  1. TypeDB: either a TypeDB Cloud cluster or a self-hosted deployment. For installation instructions, see the Installation manual page.

  2. Python and TypeDB Python driver. For the driver installation instructions, see the Python driver page.

Use pip for the Python driver installation:

pip install typedb-driver

Imported modules

To be able to use the TypeDB Python driver API in the Sample application, use the following import statements:

from typedb.driver import TypeDB, TransactionType, Credentials, DriverOptions
from enum import Enum

Default values

We store default values as constants in the source code:

DB_NAME = "sample_app_db"
SERVER_ADDR = "127.0.0.1:1729"

class Edition(Enum):
    Cloud = 1
    Core = 2

TYPEDB_EDITION = Edition.Core
USERNAME = "admin"
PASSWORD = "password"

where DB_NAME — the name of the database to use; SERVER_ADDR — address of the TypeDB server to connect to; TYPEDB_EDITION — TypeDB Community Edition or Cloud edition selector; USERNAME/PASSWORD — authentication credentials.

Program structure

The main workflow of this sample application includes establishing a connection to TypeDB, performing a new database setup, and querying.

def main():
    with driver_connect(TYPEDB_EDITION, SERVER_ADDR) as driver:
        if db_setup(driver, DB_NAME, db_reset=False):
            queries(driver, DB_NAME)
        else:
            print("Terminating...")
            exit()

The entire main() function code is executed in the context of the network connection, represented by the driver object that is returned by the function.

TypeDB connection

The driver_connect() function takes edition and addr as mandatory parameters.

def driver_connect(edition, uri, username=USERNAME, password=PASSWORD):
    if edition is Edition.Core:
        driver = TypeDB.core_driver(uri, Credentials(username, password), DriverOptions(False, None))
        return driver
    if edition is Edition.Cloud:
        driver = TypeDB.cloud_driver([uri], Credentials(username, password), DriverOptions(False, None))
        return driver

The edition is expected to be an Enum for selecting a TypeDB edition. Depending on the TypeDB edition selected, this function initializes either a TypeDB Community Edition or a TypeDB Cloud / Enterprise connection.

TypeDB connections require objects of the Credentials (authentication credentials) and DriverOptions (driver-specific connection options like TLS settings) classes. For our sample application, we have suitable default values set for all editions.

Database setup

To set up a TypeDB database, we need to make sure that it exists and has the correct schema and data. First, we check whether a database with the provided name already exists on the server.

If such a database doesn’t exist, we create a new database, define its schema, and load initial data.

To prevent data loss, avoid deleting an existing database without confirmation from a user.

If a database with the specified name already exists, we check whether we need to replace it. To do so, we check the db_reset parameter, and, if it’s False, ask for an input from a user. If any of the two suggesting replacement of the database is acceptable, we replace the database by deleting the existing database and then creating a new one.

As the final step of the database setup, we test it.

def db_setup(driver, db_name, db_reset=False) -> bool:
    print(f"Setting up the database: {db_name}")
    if driver.databases.contains(db_name):
        if db_reset or (input("Found a pre-existing database. Do you want to replace it? (Y/N) ").lower() == "y"):
            if not replace_database(driver, db_name):
                return False
        else:
            print("Reusing an existing database.")
    else:  # No such database found on the server
        if not create_database(driver, db_name):
            print("Failed to create a new database. Terminating...")
            return False
    if driver.databases.contains(db_name):
        return validate_data(driver, db_name)
    else:
        print("Database not found. Terminating...")
        return False

Creating a new database

We create a new database with the specified name (sample_app_db by default) and call functions to define its schema and load initial data.

def create_database(driver, db_name) -> bool:
    print("Creating a new database", end="...")
    driver.databases.create(db_name)
    print("OK")
    db_schema_setup(driver, db_name)
    db_dataset_setup(driver, db_name)
    return True

Replacing a database

We delete a database with the specified name (sample_app_db by default) and call a function to create a new one instead:

def replace_database(driver, db_name) -> bool:
    print("Deleting an existing database", end="...")
    driver.databases.get(db_name).delete()  # Delete the database if it exists already
    print("OK")
    if create_database(driver, db_name):
        return True
    else:
        print("Failed to create a new database. Terminating...")
        return False

Defining a schema

We use a define query to define a schema for the newly created database:

def db_schema_setup(driver, db_name, schema_file='schema.tql'):
    with open(schema_file, 'r') as data:
        define_query = data.read()
    with driver.transaction(db_name, TransactionType.SCHEMA) as tx:
        print("Defining schema", end="...")
        tx.query(define_query).resolve()
        tx.commit()
        print("OK")

The schema for the sample application is stored in the schema.tql file.

See the full schema
schema.tql
define
  entity content @abstract,
    owns id @key;

  entity page @abstract, sub content,
    owns page-id,
    owns name,
    owns bio,
    owns profile-picture,
    plays posting:page,
    plays following:page;

  entity profile @abstract, sub page,
    owns username,
    owns name @card(0..3),
    plays group-membership:member,
    plays location:located,
    plays content-engagement:author,
    plays following:follower;

  entity user sub profile,
    owns email,
    owns phone @regex("^\d{8,15}$") @unique,
    owns karma,
    owns relationship-status,
    plays friendship:friend,
    plays family:relative,
    plays relationship:partner,
    plays marriage:spouse,
    plays employment:employee;

  relation social-relation @abstract,
    relates related @card(0..);

  relation friendship sub social-relation,
    relates friend as related @card(0..);

  relation family sub social-relation,
    relates relative as related @card(0..1000);

  relation relationship sub social-relation,
    relates partner as related,
    owns start-date;

  relation marriage sub relationship,
    relates spouse as partner,
    owns exact-date,
    plays location:located;

  entity organisation sub profile,
    owns tag @card(0..100),
    plays employment:employer;

  entity company sub organisation;
  entity charity sub organisation;

  relation employment,
    relates employer,
    relates employee,
    owns start-date,
    owns end-date;

  entity group sub page,
    owns group-id,
    owns tag @card(0..100),
    plays group-membership:group;

  relation group-membership,
    relates group,
    relates member,
    owns start-timestamp,
    owns end-timestamp;

  entity post @abstract, sub content,
    owns post-id,
    owns post-text,
    owns creation-timestamp @range(1970-01-01T00:00:00..),
    owns tag @card(0..10),
    plays posting:post,
    plays commenting:parent,
    plays reaction:parent,
    plays location:located;

  entity text-post sub post;

  entity image-post sub post,
    owns post-image;

  entity comment sub content,
    owns comment-id,
    owns comment-text,
    owns creation-timestamp,
    owns tag @card(0..5),
    plays commenting:comment,
    plays commenting:parent,
    plays reaction:parent;

  relation interaction @abstract,
    relates subject @abstract,
    relates content;

  relation content-engagement @abstract, sub interaction,
    relates author as subject;

  relation posting sub content-engagement,
    relates page as content,
    relates post @card(0..1000);

  relation commenting sub content-engagement,
    relates parent as content,
    relates comment;

  relation reaction sub content-engagement,
    relates parent as content,
    owns emoji @values("like", "love", "funny", "surprise", "sad", "angry"),
    owns creation-timestamp;

  relation following,
    relates follower,
    relates page;

  entity place,
    owns place-id,
    owns name,
    plays location:place;

  entity country sub place,
    plays city-location:parent;

  entity city sub place,
    plays city-location:city;

  relation location,
    relates place,
    relates located;

  relation city-location sub location,
    relates parent as place,
    relates city as located;

  attribute id @abstract, value string;
  attribute page-id @abstract, sub id;
  attribute username sub page-id;
  attribute group-id sub page-id;
  attribute post-id sub id;
  attribute comment-id sub id;
  attribute place-id sub id;

  attribute name value string;
  attribute email value string @regex("^.*@\w+\.\w+$");
  attribute phone value string;
  attribute karma value double;
  attribute relationship-status value string @values("single", "married", "other");
  attribute latitude value double;
  attribute longitude value double;

  attribute event-date @abstract, value datetime;
  attribute start-date sub event-date;
  attribute end-date sub event-date;
  attribute exact-date @abstract, sub event-date;

  attribute payload @abstract, value string;
  attribute text-payload @abstract, sub payload;
  attribute image-payload @abstract, sub payload;
  attribute bio sub text-payload;
  attribute comment-text sub text-payload;
  attribute post-text sub text-payload;
  attribute post-image sub image-payload;
  attribute profile-picture sub image-payload;

  attribute tag value string;
  attribute emoji value string;

  attribute creation-timestamp value datetime;
  attribute start-timestamp value datetime;
  attribute end-timestamp value datetime;

  fun all_relatives($user: user) -> { user }:
    match
      $relative isa user;
      {
        family (relative: $user, relative: $relative);
      } or {
        let $intermediate in all_relatives($user);
        family (relative: $intermediate, relative: $relative);
      };
      return { $relative };

We use a database name passed as a parameter to open a transaction. Then we send the contents of the file as a TypeQL define query and commit the changes made by the transaction.

Loading initial data

With the schema defined, we can load initial data into our database with the insert query:

def db_dataset_setup(driver, db_name, data_file='data_small_single_query.tql'):
    with open(data_file, 'r') as data:
        insert_query = data.read()
    with driver.transaction(db_name, TransactionType.WRITE) as tx:
        print("Loading data", end="...")
        tx.query(insert_query).resolve()
        tx.commit()
        print("OK")

We read the data_small_single_query.tql file, send its contents as a single query, and then commit the changes.

See the full insert query
data_small_single_query.tql
insert
    $u1 isa user,
        has username "masako-holley",
        has phone "185800100011",
        has email "masako.holley@typedb.com";

    $u2 isa user,
        has username "pearle-goodman",
        has phone "171255522222",
        has email "pearle.goodman@typedb.com";

    $u3 isa user,
        has username "kevin-morrison",
        has phone "110000000",
        has email "kevin.morrison@typedb.com";

    $relatives1 isa family (relative: $u1, relative: $u2);
    $relatives2 isa family (relative: $u2, relative: $u3);

Testing a database

With the schema defined and data loaded, we test our database to make sure it’s ready. To test the database, we send a query to count the number of users in the database:

def validate_data(driver, db_name) -> bool:
    with driver.transaction(db_name, TransactionType.READ) as tx:
        count_query = "match $u isa user; reduce $count = count;"
        print("Testing the dataset", end="...")
        count = next(tx.query(count_query).resolve().as_concept_rows()).get("count").try_get_integer()
        if count == 3:
            print("Passed")
            return True
        else:
            print("Validation failed, unexpected number of users:", count, "\n Expected result: 3. Terminating...")
            return False

Query examples

After database setup is complete, we proceed with querying our database with different types of queries in the queries() function:

def queries(driver, db_name):
    print("\nRequest 1 of 6: Fetch all users as JSON objects with emails and phone numbers")
    users = fetch_all_users(driver, DB_NAME)
    assert len(users) == 3

    new_phone = "17778889999"
    new_email = "jk@typedb.com"
    new_username = "k-koolidge"
    print(f"\nRequest 2 of 6: Add a new user with the email {new_email} and phone {new_phone}")
    insert_new_user(driver, DB_NAME, new_email, new_phone, new_username)

    kevin_email = "kevin.morrison@typedb.com"
    print(f"\nRequest 3 of 6: Find direct relatives of a user with email {kevin_email}")
    relatives = get_direct_relatives_by_email(driver, DB_NAME, kevin_email)
    assert relatives is not None
    assert len(relatives) == 1

    print(f"\nRequest 4 of 6: Transitively find all relatives of a user with email {kevin_email}")
    relatives = get_all_relatives_by_email(driver, DB_NAME, kevin_email)
    assert relatives is not None
    assert len(relatives) == 2

    old_kevin_phone = "110000000"
    new_kevin_phone = "110000002"
    print(f"\nRequest 5 of 6: Update the phone of a of user with email {kevin_email} from {old_kevin_phone} to {new_kevin_phone}")
    updated_users = update_phone_by_email(driver, DB_NAME, kevin_email, old_kevin_phone, new_kevin_phone)
    assert updated_users is not None
    assert len(updated_users) == 1

    print(f'\nRequest 6 of 6: Delete the user with email "{new_email}"')
    delete_user_by_email(driver, DB_NAME, new_email)

The queries are as follows:

  1. Fetch query — to retrieve information in a JSON format

  2. Insert query — to insert new data into the database

  3. Match query — to retrieve data from the database as rows

  4. Match query with a function call — to retrieve data from the database as rows using functions

  5. Update query — to replace data in the database

  6. Delete query — to delete data from the database

Every query is implemented as a function that includes some output of the query response and returns some meaningful data.

Fetch query

Fetching allows you to retrieve data from a TypeDB database as JSON documents.

Let’s use a fetch pipeline to fetch phone s and email s for all user s in the database:

def fetch_all_users(driver, db_name) -> list:
    with driver.transaction(db_name, TransactionType.READ) as tx:
        query = "match $u isa user; fetch { 'phone': $u.phone, 'email': $u.email };"
        answers = list(tx.query(query).resolve().as_concept_documents())
        for i, JSON in enumerate(answers, start=0):
            print(f"JSON #{i}: {JSON}")
        return answers

We collect response in a list and store it in the users variable that is returned by the function. We iterate through the list and print the results from every JSON (dict) in the list. You can also use this result as a standard Python dict with keys and values.

Insert query

Let’s insert a new user with username, phone, and email attributes to the database.

def insert_new_user(driver, db_name, email, phone, username) -> list:
    with driver.transaction(db_name, TransactionType.WRITE) as tx:
        query = f"""
        insert 
          $u isa user, has $e, has $p, has $username; 
          $e isa email '{email}'; 
          $p isa phone '{phone}'; 
          $username isa username '{username}';
        """
        answers = list(tx.query(query).resolve().as_concept_rows())
        tx.commit()
        for i, row in enumerate(answers, start=1):
            phone = row.get("p").try_get_string()
            email = row.get("e").try_get_string()
            print(f"Added new user. Phone: {phone}, E-mail: {email}")
        return answers

The insert query returns an Iterator of ConceptRow s: one for every insert clause execution. We collect the Iterator to a list to store the inserted data. Then we commit the changes, print phone s and email s by iterating though the list of ConceptRow s, and return the stored list.

Since the insert query has no match clause, the insert clause is executed exactly once. Insert queries always return an ConceptRowIterator, where every ConceptRow represents an inserted result: column names (variable names) and their respective concepts.

Match query

If we don’t need to convert the result into a JSON document, we can use a single match stage, which returns ConceptRow s similarly to insert.

Let’s retrieve all direct relatives for a user using its email.

def get_direct_relatives_by_email(driver, db_name, email):
    with driver.transaction(db_name, TransactionType.READ) as tx:
        users = list(tx.query(f"match $u isa user, has email '{email}';").resolve().as_concept_rows())
        users_len = len(users)
        if users_len == 1:
            answers = list(tx.query(f"""
              match 
                $e == '{email}';
                $u isa user, has email $e;
                $family isa family ($u, $relative);
                $relative has username $username;
                not {{ $u is $relative; }};
              select $username;
              sort $username asc;
            """).resolve().as_concept_rows())
            for row in answers:
                print(f"Relative: {row.get('username').try_get_string()}")
            if len(answers) == 0:
                print("No relatives found.")
            return answers
        else:
            print(f"Error: Found {users_len} users, expected 1.")
            return None

The get_direct_relatives_by_email() function checks that there is only one user matched with the email provided by an input parameter. It then executes the query to find the relatives, collects the results, and iterates through them to print the username of every matched relative.

Match query with a function call

Let’s change the query a little to get not only direct relatives, but all relatives of a user. Additionally, let’s use a TypeDB function called all_relatives that we previously defined in the schema.

def get_all_relatives_by_email(driver, db_name, email):
    with driver.transaction(db_name, TransactionType.READ) as tx:
        users = list(tx.query(f"match $u isa user, has email '{email}';").resolve().as_concept_rows())
        users_len = len(users)
        if users_len == 1:
            answers = list(tx.query(f"""
              match 
                $u isa user, has email $e;
                $e == '{email}';
                let $relative in all_relatives($u);
                not {{ $u is $relative; }};
                $relative has username $username;
                select $username;
                sort $username asc;
            """).resolve().as_concept_rows())
            for row in answers:
                print(f"Relative: {row.get('username').try_get_string()}")
            if len(answers) == 0:
                print("No relatives found.")
            return answers
        else:
            print(f"Error: Found {users_len} users, expected 1.")
            return None

Update query

Let’s replace a phone of one of the user s by a new one. The fastest way is to use an update stage, which replaces the old data with the specified values:

def update_phone_by_email(driver, db_name, email, old, new):
    with driver.transaction(db_name, TransactionType.WRITE) as tx:
        answers = list(tx.query(f"""
          match $u isa user, has email '{email}', has phone $phone; $phone == '{old}';
          update $u has phone '{new}';
        """).resolve().as_concept_rows())
        tx.commit()
        answers_len = len(answers)
        if answers_len == 0:
            print("Error: No phones updated")
            return None
        else:
            print(f"Total number of phones updated: {len(answers)}")
            return answers

We could also do that by deleting ownership of the old path attribute from the file entity and assigning it with ownership of the new path attribute manually by writing consecutive delete and insert pipeline stages.

Executing multiple write stages in a single transaction isolates these changes from other transactions: there won’t be any point of time for other TypeDB users where they won’t see a user s phone. Moreover, if any other transaction makes a conflicting change before we commit this transaction, then our transaction fails upon a commit.

Delete query

Finally, let’s delete a user by a given email. It can be simply done by a single statement inside a match, and a very short delete operation.

def delete_user_by_email(driver, db_name, email):
    with driver.transaction(db_name, TransactionType.WRITE) as tx:
        answers = list(tx.query(f"match $u isa user, has email '{email}'; delete $u;").resolve().as_concept_rows())
        tx.commit()
        answers_len = len(answers)
        if answers_len == 0:
            print("Error: No users deleted")
            return None
        else:
            print(f"Total number of users deleted: {len(answers)}")
            return answers

Learn more

The full source code of this sample application.

The full API reference for the TypeDB Python driver.