Apache Ignite

GridGain Developer Hub - Apache Ignitetm

Welcome to the Apache Ignite developer hub run by GridGain. Here you'll find comprehensive guides and documentation to help you start working with Apache Ignite as quickly as possible, as well as support if you get stuck.

 

GridGain also provides Community Edition which is a distribution of Apache Ignite made available by GridGain. It is the fastest and easiest way to get started with Apache Ignite. The Community Edition is generally more stable than the Apache Ignite release available from the Apache Ignite website and may contain extra bug fixes and features that have not made it yet into the release on the Apache website.

 

Let's jump right in!

 

Documentation     Ask a Question     Download

 

Javadoc     Scaladoc     Examples

Distributed Queries

Overview

Ignite supports free-form SQL queries without any limitations. The SQL syntax is ANSI-99 compliant which means that you can use any kind of SQL functions, aggregations, groupings or joins, defined by the specification, as a part of an SQL query.

Furthermore, the queries are fully distributed. The SQL engine is capable of not only mapping a query to specific nodes and reducing their responses into a final result set, it is also able to join the data sets stored in different caches on various nodes. Additionally, the SQL engine performs in a fault-tolerant fashion guaranteeing that you will never get an incomplete or wrong result in case a new node joins the cluster or an old one leaves it.

How SQL Queries Work

Apache Ignite SQL Grid component is tightly coupled with H2 Database which, in short, is a fast in-memory and disk-based database written in Java and available under a number of open source licenses.

An embedded H2 instance is always started as a part of an Apache Ignite node process whenever ignite-indexing module is added to the node's classpath. Ignite leverages from H2's SQL query parser and optimizer as well as the execution planner. Lastly, H2 executes a query locally on a particular node (a distributed query is mapped to the node or the query is executed in LOCAL mode) and passes a local result to a distributed Ignite SQL engine for further processing.

However, the data, as well as the indexes, are always stored in the Ignite Data Grid. Additionally, Ignite executes queries in a distributed and fault-tolerant manner which is not supported by H2.

Ignite SQL Grid executes queries in two ways:

First, if a query is executed against a REPLICATED cache on a node where the cache is deployed, then Ignite assumes that all the data is available locally and will run a simple local SQL query passing it directly to the H2 database engine. The same execution flow is true for LOCAL caches.

Local Queries

Learn more about local SQL queries in Ignite from this page.

Second, if a query is executed over a PARTITIONED cache, then the execution flow will be the following:

  • The query will be parsed and split into multiple map queries and a single reduce query.
  • All the map queries are executed on all the data nodes where cache data resides.
  • All the nodes provide result sets of local execution to the query initiator (reducing node) that, in turn, will accomplish the reduce phase by properly merging provided result sets.

Execution Flow of Cross-Cache Queries

The execution flow of cross-cache or join queries is not different from the one described for the PARTITIONED cache above and will be covered later as part of this documentation.

Query Types

There are two general types of SQL queries that are available at Java API level - SqlQuery and SqlFieldsQuery.

Alternative APIs

Apache Ignite In-Memory SQL Grid is not bound to Java APIs only. You can connect to an Ignite cluster from .NET and C++ using Ignite ODBC/JDBC driver and execute SQL queries. Learn more about additional APIs from the pages listed below:

SqlQuery

SqlQuery is useful for scenarios when at the end of a query execution you need to get the whole object, stored in a cache (key and value), back in a final result set. The code snippet below shows how this can be done.

IgniteCache<Long, Person> cache = ignite.cache("personCache");

SqlQuery sql = new SqlQuery(Person.class, "salary > ?");

// Find all persons earning more than 1,000.
try (QueryCursor<Entry<Long, Person>> cursor = cache.query(sql.setArgs(1000))) {
  for (Entry<Long, Person> e : cursor)
    System.out.println(e.getValue().toString());
}

SqlFieldsQueries

Instead of selecting the whole object, you can choose to select only specific fields in order to minimize network and serialization overhead. For this purpose, Ignite implements a concept of fields queries. Basically, SqlFieldsQuery accepts a conventional ANSI-99 SQL query as its constructor​ parameter and executes it, as shown in the example below.

IgniteCache<Long, Person> cache = ignite.cache("personCache");

// Execute query to get names of all employees.
SqlFieldsQuery sql = new SqlFieldsQuery(
  "select concat(firstName, ' ', lastName) from Person");

// Iterate over the result set.
try (QueryCursor<List<?>> cursor = cache.query(sql) {
  for (List<?> row : cursor)
    System.out.println("personName=" + row.get(0));
}

Queryable Fields Definition

Before specific fields can be accessed inside of SqlQuery or SqlFieldsQuery, they have to be annotated at a POJO level or defined in a QueryEntity so that the SQL engines becomes aware of them. Refer to indexes documentation that covers this topic.

Accessing Entry's Key and Value

Use _key and _value keywords in an SQL query in order to compare to an entry's complete key or value rather than to individual fields. Apply the same keywords if you need to return a key or a value as a result of an SQL query execution.

Cross-Cache Queries

Data can be queried from multiple caches as part of a single SqlQuery or SqlFieldsQuery query. In this case, cache names act as schema names in conventional RDBMS like SQL queries. The name of the cache that is used to create an IgniteCache instance, that is used to execute the query, will be used as a default schema name and does not need to be explicitly specified. The rest of the objects, that are stored in different caches and will be queried, have to be prefixed with the names of their caches (additional schemas names).

// In this example, suppose Person objects are stored in a 
// cache named 'personCache' and Organization objects 
// are stored in a cache named 'orgCache'.
IgniteCache<Long, Person> personCache = ignite.cache("personCache");

// Select with join between Person and Organization to 
// get the names of all the employees of a specific organization.
SqlFieldsQuery sql = new SqlFieldsQuery(
    "select Person.name  "
        + "from Person as p, \"orgCache\".Organization as org where "
        + "p.orgId = org.id "
        + "and org.name = ?");

// Execute the query and obtain the query result cursor.
try (QueryCursor<List<?>> cursor =  personCache.query(sql.setArgs("Ignite"))) {
    for (List<?> row : cursor)
        System.out.println("Person name=" + row.get(0));
}

In the example above, an instance of SqlFieldsQuery is created from personCache whose name is treated as a default schema name right after that. This is why Person object is accessed without explicitly specified schema name (from Person as p). As for Organization object, since it's stored in a separate cache named orgCache, the name of this cache must be set as a schema name explicitly in the query ("orgCache".Organization as org).

Changing Schema Name

If you prefer to use a schema name that is different from a cache name, then you can take advantage of CacheConfiguration.setSqlSchema(...) method.

Distributed Joins

Ignite supports collocated and non-collocated distributed SQL joins. Moreover, if the data resides in different caches, Ignite allows for cross-cache joins as well.

IgniteCache<Long, Person> cache = ignite.cache("personCache");

// SQL join on Person and Organization.
SqlQuery sql = new SqlQuery(Person.class,
  "from Person as p, \"orgCache\".Organization as org"
  + "where p.orgId = org.id "
  + "and lower(org.name) = lower(?)");

// Find all persons working for Ignite organization.
try (QueryCursor<Entry<Long, Person>> cursor = cache.query(sql.setArgs("Ignite"))) {
  for (Entry<Long, Person> e : cursor)
    System.out.println(e.getValue().toString());
}

Joins between PARTITIONED and REPLICATED caches always work without any limitations.

However, if you do a join between at least two PARTITIONED data sets, then you must make sure that the keys you are joining on are either collocated or you have to enable the non-collocated joins parameter for the query. The two types of distributed joins modes are explained further below.

Data Collocation

To learn more about data collocation concept and how to use it in practice refer to the dedicated documentation section

Distributed Collocated Joins

By default, if an SQL join has to be done across a number of Ignite caches, then all the caches have to be collocated. Otherwise, you will get an incomplete result at the end of query execution because at the join phase a node uses the data that is available only locally. Referring to Picture 1. below you will see that, first, an SQL query is sent to all the nodes (Q) where data, required for a join, is located. After that the query is executed right away by every node (E(Q)) over the local data set and, finally, the overall execution result is aggregated on the client side (R).

Picture 1. Collocated SQL Query

Picture 1. Collocated SQL Query

Distributed Non-Collocated Joins

Besides the fact that the affinity collocation is a powerful concept that, once set up for an application's business entities (caches), will let you execute cross-cache joins in the most optimal way by returning a complete and consistent result set, there is always a chance that you won't be able to collocate all the data. Thus, you may not be able to execute the whole range of SQL queries that are needed to satisfy your use case.

The non-collocated distributed joins have been designed and supported by Apache Ignite for cases when it's extremely difficult or impossible to collocate all the data but you still need to execute a number of SQL queries over non-collocated caches.

Do not overuse the non-collocated distributed joins based approach in practice because the performance of this type of joins is worse then the performance of the affinity collocation based joins due to the fact that there will be much more network round-trips and data movement between the nodes to fulfill a query.

When the non-collocated distributed joins setting is enabled for a specific SQL query with the SqlQuery.setDistributedJoins(boolean) parameter, then, the node to which the query was mapped will request for the missing data (that is not present locally) from the remote nodes by sending either broadcast or unicast requests. This is depicted on Picture 2. below as a potential data movement step (D(Q)). The potential unicast requests are only sent in cases when a join is done on a primary key (cache key) or an affinity key, since the node performing the join knows the location of the missing data. The broadcast requests are sent in all the other cases.

Picture 2. Non-collocated SQL Query

Picture 2. Non-collocated SQL Query

Neither broadcast nor unicast requests, that are sent by one node to another in order to get the missing data, are executed sequentially. The SQL engine combines all the request into batches. This batch size can be managed using SqlQuery.setPageSize(int) parameter.

The following code snippet is provided from the CacheQueryExample included in the Ignite distribution.

IgniteCache<AffinityKey<Long>, Person> cache = ignite.cache("personCache");

// SQL clause query with join over non-collocated data.
String joinSql =
	"from Person, \"orgCache\".Organization as org " +
  "where Person.orgId = org.id " +
  "and lower(org.name) = lower(?)";

SqlQuery qry = new SqlQuery<AffinityKey<Long>, Person>(Person.class, joinSql).setArgs("ApacheIgnite");

// Enable distributed joins for the query.
qry.setDistributedJoins(true);

// Execute the query to find out employees for specified organization.
System.out.println("Following people are 'ApacheIgnite' employees (distributed join): ", cache.query(qry).getAll());

Refer to the non-collocated distributed joins blog post for more technical details.

Known Limitations

Transactional SQL

Presently, SQL queries do not take into account transactional boundaries. This means that you can expect dirty reads if an SQL query is executed in parallel with Ignite distributed transactions. For instance, if a transaction atomically changes balances of two accounts, then a concurrent SQL query can see a partially committed transaction.

Multiversion Concurrence Control (MVCC)

Once Apache Ignite SQL Grid is empowered with MVCC, the transactional SQL limitation mentioned above will be eliminated. MVCC development is tracked in this JIRA ticket.

Example

A complete example that demonstrates the usage distributed queries, covered under this documentation section, is delivered as a part of every Apache Ignite distribution and named CacheQueryExample. The example is available in Git Hub as well.

Distributed Queries