TypeDB 3.0 is live! Get started for free.

TypeDB Fundamentals

Creating complex DB workflows with pipelines



This article is part of our TypeDB 3.0 preview series. Sign up to our newsletter to stay up-to-date with future updates and webinars on the topic!

A query pipeline is a sequence of query clauses. Each clause describes an operation that takes an input stream of data, and produces an output stream of data. In this article we describe how, from a set of basic query operations, complex data pipelines can be crafted in this way.

At a glance: pipeline syntax essentials

In TypeDB, query inputs and outputs are streams of so-called concept maps: these are maps associating variable names to concepts (i.e., objects, values, or types). TypeDB’s query pipelines are composed of the following basic query clauses as building blocks to operate on such streams.

  • A “match” clause is a query clause of the form match P , where P is a TypeQL pattern. A match clause takes a concept map stream as its input, and produces an output stream by augmenting (in zero or more ways) each mapping in its input with the matched results for additional variables in the pattern P.
  • The “insert” clause insert S (for a sequence of statements S) takes in a stream of maps. It executes each insert statement in S with the given variable mappings. To produce an output stream, each map in the stream is augmented with the newly inserted concepts bound to the given variables.
  • The “delete” clause delete S takes in a map stream, and executes each delete statement in S with the given variable mapping. To produce the output stream, we remove the deleted concepts from each map in the stream.
  • The (new!) “put” clause put S should be thought of as “try matching all of S; if results are found, behave like a match, and otherwise like an insert”.
  • The (new!) “update” clause put S should be thought of as “insert a relation’s roleplayer or an object’s attribute, unless that roleplayer/attribute is already filled, in which case overwrite the existing data instead”.
  • We provide various stream operators (this are called stream operators since, unlike clauses, they do not depend on the state of the database), such as select $x1, $x2 ... (which selects specific variables from maps in streams), or sort $x (which re-sorts the entire stream based on value that$x is mapped to), etc.
  • And, finally, we introduce a new assert control operator which can be used both to impose custom constraints at query runtime or to control the flow of query pipelines.

We remark that the above concerns data queries, which are separate from the schema queries (using the usual clauses define and undefine as well as the new redefine) and which we discuss in more detail in the schema fundamentals.

  • The beginning of a query pipeline may contain with clauses (which we collectively refer to as the preamble of the query) used to define additional query-level function, as discussed in our function fundamentals.
  • The end of a query pipeline may comprise a reduce clause which reduces the incoming stream into a single return value (or tuple thereof), or a fetch clause which format the stream in its final format.

Example pipeline

Let’s see how query pipelines work in practice with a slightly more complex example. Feel free to read through this example in full already — in the following sections we will go through each individual part of the pipeline in more detail.

with fun available_cars($model: car_model) -> cars[] :
  match $c isa car, has car_model $model, has status "available";
  return list($c);
with fun requests_by_priority($model: car_model, $limit: int) -> request[] :
  match $request (car_model: $model) isa request, has priority $priority;
  sort $priority desc; # sort by priority high to low
  limit $limit;
  return list($request);
match
  $model isa car_model;
match
  $cars = available_cars($model);
  $requests = requests_by_priority($model, length($cars));
match
  $request = $requests[$number], links (customer: $customer);
  $car = $cars_for_rent[$number];
select $request, $car, $customer;
put
  $assign (car: $car) isa car_assignment;
assert count($assign) == 1;
assert has_payment_method($customer) == true;
update
  $assign links (customer: $customer);
  $request has status "processed";
match
  $left_over_request isa request;
  not { $left_over has status "processed" };
reduce count($left_over_request); 

The with preamble

We’ve already seen how to use with clauses to define query-level functions. In the above pipeline we have two such clauses back-to-back, each defining a function. Note that these functions are using stream modifier clauses (like sort and limit) in their body! We will discuss this shortly.

Extending results with match

Let’s first settle some terminology once and for all:

  • A concept map M is a mapping of the form ($x1 -> c1, $x2 -> c2, ...) where the $x‘s are variables and the c‘s are concepts (a concept can be either a data instance, some other computed value, or a type).
  • A map stream S is an (ordered) set of concept maps { M1, M2, ... }.

Note that we use the word set as usually our streams will have no duplicates. However, by deleting concepts using a delete a stream state with duplicates may be reached (as we may forego de-duplication for performance reasons).

A match clause is of the form match P where P is a called a pattern (a term familiar term in TypeQL land). A match clause operates on streams as follows. It takes an input stream S, and for each map M in S it assigns variables in P the data given in M. It then matches possible results R for the remaining unassigned variables, e.g. ($y1 -> r1, $y2 -> r2, ...). For each such result R, it combines M and R into a single map, and adds it to the output stream. Note, there may be zero results R! So in this case, nothing gets added to the output stream.

Match clause #1 in our pipeline

Let’s get one technical aside out of the way: in the beginning of the pipeline the incoming stream is set to only contain the empty concept map M = () — this is not the empty stream, as it does contains a map!

Now, the very first match clause extends the unit map M with all possible results R for its “new” (i.e. all of its) variables. These extended maps are the form ($model -> "some car model attribute"). The output stream of this first clause could look something like:

{ 
  ( $model -> "Ford Fiesta" ),
  ( $model -> "Audi A3" ),
  ( $model -> "Rolls-Royce Phantom" ) 
}

This output stream becomes the input stream to the second match clause.

Match clause #2

The second match clause is of the form:

match
  $cars = available_cars($model);
  $requests = requests_by_priority($model, length($cars));

extends maps in its input stream with two further variables: $cars and $requests. Inspecting the query, both these variables are assigned to the single-return of functions returning lists. The output may look something like this:

{ 
  ( $model -> "Ford Fiesta", $cars -> [<car7>, <car2>, <car4>],
    $requests -> [<req1>, <req3>] },
  ( $model -> "Audi A3", $cars -> [<car4>], $requests -> [<req2>] ) 
}

where we use <obj> to indicate objects (i.e. entities or relations) in our database.

Note how the third map (for the Rolls-Royce Phantom car model) from the input stream was dropped, because no results matching the new variables were found: in other words, either no available cars or no requests were found in the database for the instance "Rolls-Royce Phantom" of $model.

Match clause #3

In the third match, we query:

match
  $request = $requests[$number], links (customer: $customer);
  $car = $cars_for_rent[$number];

This extend the maps our stream with four further variables: $number (a member of an integer range list), $car, $request, and $customer. The end state of this stage of the query could look something like the following:

{ 
  ( $model -> "Ford Fiesta", $cars -> [<car7>, <car2>, <car4>], 
    $requests -> [<req1>, <req3>], $number -> 0, $car -> <car7>, 
    $request -> <req1>, $customer -> <cust113> ),
  ( $model -> "Ford Fiesta", $cars -> [<car7>, <car2>, <car4>], 
    $requests -> [<req1>, <req3>], $number -> 0, $car -> <car2>, 
    $request -> <req1>, $customer -> <cust284> ),
  ( $model -> "Audi A3", $cars -> [<car4>], $requests -> [<req2>], 
    $number -> 0, $car -> <car4>, $request -> <req2>, $customer -> <cust8> ) 
}

Effectively, this last match clause “unwinds” in parallel the two lists into their individual items while still keeping track of the original lists … these will be dropped by the subsequent select as we discuss shortly.

About match clause chaining

If we have two match clauses match P; and match Q; for patterns P; and Q; there is, in some sense, no difference between writing match P; match Q;, match P; Q;, match Q; P; and match Q; match P;: indeed, in the declarative semantics of TypeDB, either option will result in the same set of maps providing all solutions to the constraints of both P and Q.

Nonetheless, in the above pipeline example, we have split our patterns across multiple match clauses. Why? There are two reasons:

  1. It is often intuitive to think of your concept map being built gradually in steps. This ties in with our later discussion of using concept API calls. (On the other hand, for performance, it may be beneficial to inline all constraints.)
  2. Since a match clause extends each map in its input stream individually, chaining match clause has a “grouping” effect, i.e. later matches are grouped by earlier matches. In other words, while the set of results is unaffected by splitting up matches, the order of results may be.

Note that in performance critical situations, we will potentially provide an option that disables the “grouping” effect described in (2.): this this may yield speed-ups in some situations.

Stream operators

TypeDB 3.0 will ship with four stream operators: select, distinct, sort, limit, and offset.

Selecting variables and the distinct operator

The select $x1, $x2, ... ; operation selects only mappings for the given variables from an input stream. By default, selecting will not deduplicate the maps in the resulting streams (we will provide disabling this with an option to allow duplication, as this may yield speed-ups in some situations). For example, for the map

{
  ($car -> <car1>, $model -> "Fiat 500"),
  ($car -> <car9>, $model -> "Fiat 500"),
  ($car -> <car9>, $model -> "Seat Ibiza")
}

the operation select $model; would yield the output stream

{
  ($model -> "Fiat 500"),
  ($model -> "Fiat 500"),
  ($model -> "Seat Ibiza")
}

To deduplicate this stream, we may apply the distinct operator, yielding

{
  ($model -> "Fiat 500"),
  ($model -> "Seat Ibiza")
}

Note that, we can also apply select on streams with optional variables (i.e. those we need not be in every map of a stream — see our optionality fundamentals): for example, select $x, $y will turn the map ($x -> <x>, $z -> <z>) into ($x -> <x>) (so $y is the optional variable here).

Sorting streams

The sort $x (asc | desc); clause will sort its input stream (in ascending or descending order) based on the data of the variable $x.

{
  ($car -> <car1>, $model -> "Fiat 500"),
  ($car -> <car9>, $model -> "Fiat 500"),
  ($car -> <car5>, $model -> "Seat Ibiza")
}

Then the clause sort $model desc; would yield the output stream

{
  ($car -> <car5>, $model -> "Seat Ibiza"),
  ($car -> <car1>, $model -> "Fiat 500"),
  ($car -> <car9>, $model -> "Fiat 500")
}

Note that, we can also sort on optional variables $x, in which case maps with missing data for $x will be put at the end of the stream (independent on whether the sort is ascending or descending).

Limiting streams

The limit NUM; operator limits the length of a stream to NUM elements: it truncates its input stream after the NUMth element.

In a query pipeline, the NUM must be an expression not containing variables from the pipeline. In functions, however, it may include variables that are supplied as arguments to the function.

Complementing the limit operator, the offset NUM; operator offsets a stream by NUM elements: it ignores the first NUM elements of the its input stream, and outputs the rest of the stream.

Modifiers in functions

As illustrated by our example pipeline, all modifiers can be used in functions as well, but only in between the match and return clauses of a function.

Next, let us see how we can change the data of our database from within query pipeline, using the dedicated clauses insert, delete, and put

Inserting data with insert

An insert clause is of the form insert S, where S is a sequence of “insert statements”. These statements perform two different operations: creating new data and new dependencies between data.

Inserting new data

For each map M in the input stream, we first assign data from M to the corresponding variables in S—all remaining variables $y are the “data to be created”. Each such “to-be-created” variable $y must appear in unique a typing statement of the following form.

  • Object creation: a statement $y isa T where T resolves to an entity or relation type. In this case, the insert clause will then create a new object <obj> of type T. We then extend the map M with the mapping $y -> <obj>.
  • Attribute creation: a statement of the form $y EXPR isa T, where T resolves to an attribute type. In this case, the insert clause will then create a new attribute <attr> of type T with underlying value EXPR. We then extend the map M with the mapping $y -> EXPR.

The final case of inserting new data, is copying a pre-assigned value into an attribute. Namely, a statement $y isa T, where a mapping $y -> EXPR is already in the input concept map M, will create a new attribute <attr> of type T with underlying value EXPR. Copying a value into an attribute will not extend the map M (i.e. no new variables are being created).

Inserting new data dependencies

Besides creating new objects and copying existing values, an insert clause may contain the following statements that insert dependencies between data.

  • Attribute owners: a statement of the form $obj has ATT or $obj has T EXPR will insert ownership of either an existing typed attribute expression ATT, or, a to-be-created instance of (list) attribute type T and (list) value EXPR. Note, in the second case the type T needs to be supplied. For example, we cannot write $p has "John" as "John" is an value expression of type string; we must write $p has name "John" instead.
  • Relation roles: a statement of the form $obj links (EXPR) or $obj links (T: EXPR) will insert the role player(s) EXPR in the relation object $obj. In the second variation a role type T is supplied. Note, a type needs to be supplied if EXPR could play multiple roles in $obj.

Both of the above statements can also be used in a update clause, which is closely tied to cardinality constraints, and explained in more detail in our constraint fundamentals.

The fineprint

An insert clause will result in an error if any schema constraints are violated.

The usual syntactic shortenings apply as well. E.g. $obj isa T; obj links (A); $obj links (B); can be shortened to $obj isa T, links (A, B); (and, even more concisely, to $obj (A, B) isa T; for compatibility with pre-3.0 syntax).

Deleting data with delete

A delete clause is of the form delete S, where S is a sequence of “delete statements”. For each map M in the input stream, we again assign data from M to the corresponding variables in S—no other variables may appear in the clause.

With all our variables assigned, we then perform deletion of the resulting data-assigned statements. There are three cases of such statements:

  • x isa T deletes x from the type T unless, unless the object x appears as a roleplayer in some “deletion-blocking” relation object of type R which has not been marked with @cascade, or, a query-level cascade annotation is provided (see next section).
  • x has y (or x has T y) deletes y as an owned attribute for the object x.
  • x links y (or x links (T: y)) deletes y as a roleplayer from the relation object x.

The delete clause will result in an error if any schema constraints are violated by performing the above deletions.

Schema- and Query-level @cascade

At the schema-level, we allow deleting role players of relation objects of type R which have not been marked with @cascade. But only once role player cardinality drops below what has been specified in the schema, we delete the relation object itself.

At the query-level, the deletion blocking behavior of relation objects can be modified by supplying the annotation @cascade(R1, R2, ...) where R1, R2, ... is a list of relation types: in this case, any deletion-blocking relation object from the listed types will be deleted together with x. This also works for nested relations: if x is a roleplayer in a R1-typed relation y, which is a roleplayer in a R2-typed relation z, then the deletion of x will trigger both the deletion of y and z.

We emphasize again that the query- and schema-level behavior of cascading deletes differs in an important point: for schema-level @cascade, relations get deleted once they have insufficient role player cardinality; for query-level @cascade, relations get deleted if any of their role players gets deleted in the delete query.

Inserting data conditionally with put

The put clause has been newly introduced in TypeDB 3.0. Its the purpose to “insert data only if it doesn’t exist yet”. The clause is of the form put S, for a sequence of statements S. It can explained very easily as follows.

  1. For each map M in the input stream, we first run match S which matches all of S (as a single pattern).
  2. If we any results are returned, we extend the map M accordingly with those results.
  3. Otherwise, if no results are returned, we run insert S, and extend the map M accordingly.

Continuing our pipeline (put in action)

Let’s continue our pipeline example from where we left off (just after the first three match clauses). First, after selecting variables from our last stream of maps, we arrive at the stream:

{ 
  ( $car -> <car7>, $request -> <req1>, $customer -> <cust113> ),
  ( $car -> <car2>, $request -> <req1>, $customer -> <cust284> ),
  ( $car -> <car4>, $request -> <req2>, $customer -> <cust8> ) 
}

Starting with the first map, the subsequent put clause, performs the following.

  1. Run match $assign (car: <car7>) isa car_assignment; where we assigned the object <car7> from the first map in the stream to the variable $car.
  2. If the match is non-empty, extend the map from the stream with the results for the new variable $assign.
  3. Otherwise, create a new car_assignment object, that links the (car: <car7>), and add that new object to the

The same logic will be applied to the remaining maps in the input stream.

Note that our usage of put here ensures that if a car had been assigned already then, instead of creating another duplicate car_assignment, we overwrite the existing one in the next step.

This next step after the put clause (ignoring the assert clauses for a moment) is to run an update clause, which updates the customer role in our car assignment by the appropriate new customer for the given car, and sets the request‘s status to "processed". The final output stream would look like:

{ 
  ( $car -> <car7>, $request -> <req1>, $customer -> <cust113> ),
  ( $car -> <car2>, $request -> <req1>, $customer -> <cust284> ),
  ( $car -> <car4>, $request -> <req2>, $customer -> <cust8> ) 
}

Control flow with assert

We use assert clauses to control the data flowing through our pipeline. If its stated condition is not satisfied, an assert clause will throw an error, causing the pipeline to fail before the next step.

An assert clause will comprise a single “condition” statement. This may be either:

  1. A per-stream statement, which uses a reduction function (such as count, count($var), sum($var), …) of the stream, and checks an appropriate comparator statement for the obtained value (e.g. count($x) >= 3).
  2. A per-map statement, which also uses comparator statements but checks these on each map in the input stream. Note in this case our expressions may include user-defined functions of appropriate type.

In our earlier pipeline example, we used two assert clauses:

assert count($assign) == 1;  # per-stream condition
assert has_payment_method($customer) == true;  # per-map condition

Indeed, we wanted to ensure both of these conditions before performing the subsequent insert. Indeed:

  1. If there are two or more assignments to the same car, then something is wrong in our data and we should abort the pipeline.
  2. Similar (somewhat more on the business logic side), we want to double-check that each $customer really has a valid payment method. We do say by calling some function has_payment_method, whose definition we omitted in the example.

Fetch clause with subqueries and functions

Finally, let’s discuss the fetch clause, which is a special “stream formatting” clause that can go at the end of query pipelines. In essence, the clause operates as follows.

Each concept map in the input stream will be formatted as a JSON object, that is: there are as many JSON objects output by the fetch clause there are concept maps in the input to the fetch clause. Recall that a JSON object is i.e. nested key—(list-of-)value pairs. Importantly, these (list-of-)value could be either obtained directly from the concepts map, or they may be constructed “on the fly”; there are two case of such constructions.

  1. We can use attribute attribute projection, which simply obtains a single attribute of a specific object (i.e. relation or entity) or a list of such attributes.
  2. We can use a subquery or a function call to retrieve value, lists of values, or even entire JSON objects (by a nested use of fetch!)

A subquery in a fetch clause is essentially a query pipeline itself, but it has to end in a either fetch clause or a reduce operator. Now, let’s see how this plays out in practice!

New fetch syntax

We’ve properly cleaned up the syntax of fetch: now, users have full control over the JSON structure that they will retrieve, and the nesting and key syntax of the fetch clause is a 1-to-1 representation of the JSON objects that will be outputted by the query.

The valid cases of “nested key—(list-of-)value pairs that can be used in a fetch clause are exhaustively showcased below.

... # incoming pipeline
fetch {
# Printing values directly from pipeline
  "key_1": $x, # var $x (from input stream) holds a value or list
  "key_2": <EXPR>, # <EXPR> is an expression like $x + $y

# Inline attribute retrieval variations
  "key_3": $y.attr, # var $y holds an object with singleton attribute 'attr'
  "key_4": [ $y.attr ], # object var $y has multiple attributes 'attr'
  "key_5": $y.attr[], # object var $y has a list attribute 'attr'

# Function call variations
  "key_6": my_fun1($x,$y), # function my_fun1 has single-return
  "key_7": [ my_fun2($x,$y) ], # function my_fun2 has stream-return

# Match-fetch subqueries
  "key_8": [
    match ...;
    fetch {
      "sub_key": $z, 
      ...
    };
  ]

# Match-reduce-value subqueries
  "key_9": 
    match ...;
    reduce agg($z); # agg could be, e.g., 'count', 'sum', or 'list'

# Nested keys: Nothing stops you from nesting the above!
  "super_key": {
    "sub_key_1": $x,
    "sub_key_2": $y.attr,
    "sub_key_3": [
      ... # some subquery
    ]
  }
};

Importantly, in all of the above cases, we can immediately see the structure of the output JSON objects.

  1. Everything inside a [ ... ] will become a list of values or a list of JSON objects. The minor except your is the post-fix [] after an attribute type (case 5 above) which also produces a list of values.
  2. Everything inside a { ... } will produce a single JSON object.

Example

Let’s see a practical example, that also addresses how we handle optional variables. Consider the following query pipeline.

with fun hobbies($employee) -> hobby[] : ...
with fun friends($employee) -> {name, age} : ...
match  
  ($employee, company: $company) isa employment;
  try { ($employee, $spouse) isa marriage; };
select $employee, $spouse;
distinct;
fetch {
  "name" : $employee.name,
  "meta-data" : { 
    "spouse" : $spouse.name,
    "hobbies" : [ 
      hobbies($employee)
    ]
  }
  "good-friends" : [
    match $fr_name, $fr_age in friends($employee);
    fetch {
      "name": $fr_name,
      "age": $fr_age
    }
  ]
  "employment-count": 
    match $emp isa employment, links ($employee, company: $company);
    reduce count;
}

The above pipeline starts with two function declarations (note we do not provide the implementation of the functions here for brevity… what these functions do precisely is less importantly then their type signature here!).

We then match data from the database which finds employees $employee of companies $company. It also optionally matches $spouses of those $employees. The result of the query is a stream of concept maps that look like this:

($employee -> <emp_obj1>, $spouse -> <spouse_obj4>, $company ->  <comp_obj3>)
($employee -> <emp_obj2>, $company -> <comp_obj9>)
($employee -> <emp_obj2>, $company -> <comp_obj31>)
($employee -> <emp_obj2>, $company -> <comp_obj2>)

We then select only $employee and $spouse from this stream, yielding:

($employee -> <emp_obj1>, $spouse -> <spouse_obj4>)
($employee -> <emp_obj2>)
($employee -> <emp_obj2>)
($employee -> <emp_obj2>)

Using the distinct operator we deduplicate this stream to obtain:

($employee -> <emp1_obj1>, $spouse -> <spouse_obj>)
($employee -> <emp2_obj2>)

Finally, we apply our fetch clause. Since our stream above contains two concepts maps, we will obtain two JSON objects from the output of fetch. This could look as follows:

# first JSON
{
  "name" : "John M."
  "meta-data" : {
    "spouse" : "Elena M.",
    "hobbies" : [ "Guitar playing", "Bird watching" ]
  },
  "good-friends" : [
    {
      "name": "Joseph",
      "age": 24
    },
    {
      "name": "Jenny",
      "age": 42
    }
  ],
  "employment-count": 1
}
# second JSON
{
  "name" : "Humanoid 127"
  "meta-data" : {
    "spouse" : null,
    "hobbies" : [ "Vacuum cleaning", "Recharging" ]
  },
  "good-friends" : [
      # still no friends :-(
  ],
  "employment-count": 3
}

Notice, that all brackets (for objects { ... } and for lists [ ... ]) are in 1-to-1 correspondence with the brackets in our earlier fetch clause!

Summary

Pipelines are a powerful tool: they allow user to craft their queries incrementally, and built complex database workflows. The fact that queries become compositional

In the future, we may add further features to pipelines (e.g. branching pipelines, other output formats other than JSON), but for now our model will stick to the “basics” of linear pipelines as outlined above.

Share this article

TypeDB Newsletter

Stay up to date with the latest TypeDB announcements and events.

Subscribe to Newsletter

Further Learning

The TypeDB 3.0 Roadmap

TypeDB 3.0 is a major milestone, delivering significant updates to the architecture, and capabilities based on our research and user feedback.

Read article

Constraints (3.0 Preview)

Learn about the constraint language of TypeDB's functional database programming model.

Read article

Functions (3.0 Preview)

Functions provide powerful abstractions of query logic, which can be nested, recursed, or negated, and they natively embed into TypeQL's declarative patterns.

Read article

Feedback