Skip to content

Latest commit

 

History

History

README.md

SANSA Query

Maven Central Build Status License Twitter

Description

SANSA Query is a library to perform SPARQL queries over RDF data using big data engines Spark and Flink. It allows to query RDF data that resides both in HDFS and in a local file system. Queries are executed distributed and in parallel across Spark RDDs/DataFrames or Flink DataSets. Further, SANSA-Query can query non-RDF data stored in databases e.g., MongoDB, Cassandra, MySQL or file format Parquet, using Spark.

For RDF data, SANSA uses vertical partitioning (VP) approach and is designed to support extensible partitioning of RDF data. Instead of dealing with a single triple table (s, p, o), data is partitioned into multiple tables based on the used RDF predicates, RDF term types and literal datatypes. The first column of these tables is always a string representing the subject. The second column always represents the literal value as a Scala/Java datatype. Tables for storing literals with language tags have an additional third string column for the language tag. It supports Sparqlify or Ontop as scalable SPARQL-to-SQL rewriters.

For heterogeneous data sources (data lake), SANSA uses virtual property tables (PT) partitioning, whereby data relevant to a query is loaded on the fly into Spark DataFrames composed of attributes corresponding to the properties of the query.

SANSA Query SPARK - RDF

On SANSA Query Spark for RDF the method for partitioning an RDD[Triple] is located in RdfPartitionUtilsSpark. It uses an RdfPartitioner which maps a Triple to a single RdfPartitionStateDefault instance.

  • RdfPartition - as the name suggests, represents a partition of the RDF data and defines two methods:
    • matches(Triple): Boolean: This method is used to test whether a triple fits into a partition.
    • layout: TripleLayout: This method returns the TripleLayout associated with the partition, as explained below.
    • Furthermore, RdfPartitions are expected to be serializable, and to define equals and hash code.
  • TripleLayout instances are used to obtain framework-agnostic compact tabular representations of triples according to a partition. For this purpose it defines the two methods:
    • fromTriple(triple: Triple): Product: This method must, for a given triple, return its representation as a Product (this is the super class of all Scala tuples)
    • schema: Type: This method must return the exact Scala type of the objects returned by fromTriple, such as typeOf[Tuple2[String, Double]]. Hence, layouts are expected to only yield instances of one specific type.

See the available layouts for details.

SANSA Query SPARK - Heterogeneous Data Sources

SANSA Query Spark for heterogeneous data sources (data data) is composed of three main components:

  • Analyser: it extracts SPARQL triple patters and groups them by subject, it also extracts any operation on subjects like filters, group by, order by, distinct, limit.
  • ِPlanner: it extracts joins between subject-based triple patter groups and generates join plan accordingly. The join order followed is left-deep.
  • Mapper: it access (RML) mappings and matches properties of a subject-based triples patter group against the attributes of individual data sources. If a match exists of every property of the triple pattern, the respective data source is declared relavant and loaded into Spark DataFrame. The loading into DataFrame is performed using Spark Connectors.
  • Executor: it analyses SPARQL query and generates equivalent Spark SQL functions over DataFrames, for SELECT, WHERE, GROUP-BY, ORDER-BY, LIMIT. Connection between subject-based triple pattern groups are translated into JOINs between relevant Spark DataFrames.

Usage

Requirements

We currently require a Spark 2.4.x with Scala 2.12 setup.

Release Version

Some of our dependencies are not in Maven central, so you need to add following Maven repository to your project POM file repositories section:

<repository>
   <id>maven.aksw.internal</id>
   <name>AKSW Release Repository</name>
   <url>http://maven.aksw.org/archiva/repository/internal</url>
   <releases>
      <enabled>true</enabled>
   </releases>
   <snapshots>
      <enabled>false</enabled>
   </snapshots>
</repository>

Add the following Maven dependency to your project POM file:

<!-- SANSA Querying -->
<dependency>
   <groupId>net.sansa-stack</groupId>
   <artifactId>sansa-query-spark_2.12</artifactId>
   <version>$LATEST_RELEASE_VERSION$</version>
</dependency>

SNAPSHOT Version

While the release versions are available on Maven Central, latest SNAPSHOT versions have to be installed from source code:

git clone https://github.com/SANSA-Stack/SANSA-Stack.git
cd SANSA-Stack
mvn -am -DskipTests -pl :sansa-query-spark_2.12 clean install 

Alternatively, you can use the following Maven repository and add it to your project POM file repositories section:

<repository>
   <id>maven.aksw.snapshots</id>
   <name>AKSW Snapshot Repository</name>
   <url>http://maven.aksw.org/archiva/repository/snapshots</url>
   <releases>
      <enabled>false</enabled>
   </releases>
   <snapshots>
      <enabled>true</enabled>
   </snapshots>
</repository>

Then do the same as for the release version and add the dependency:

<!-- SANSA Querying -->
<dependency>
   <groupId>net.sansa-stack</groupId>
   <artifactId>sansa-query-spark_2.12</artifactId>
   <version>$LATEST_SNAPSHOT_VERSION$</version>
</dependency>

Running from code

The following Scala code shows how to query an RDF file with SPARQL (be it a local file or a file residing in HDFS):

From file

You can find the example code also here

import net.sansa_stack.query.spark.api.domain.ResultSetSpark
import net.sansa_stack.query.spark.ontop.QueryEngineFactoryOntop
import net.sansa_stack.rdf.spark.io._
import org.apache.jena.graph.Triple
import org.apache.jena.query.ResultSet
import org.apache.jena.rdf.model.Model
import org.apache.jena.riot.Lang
import org.apache.jena.sparql.core.Var
import org.apache.jena.sparql.engine.binding.Binding
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

// SparkSession is needed
val spark = SparkSession.builder
        .appName(s"SPARQL engine example")
        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") // we need Kryo serialization enabled with some custom serializers
        .config("spark.kryo.registrator", String.join(
          ", ",
          "net.sansa_stack.rdf.spark.io.JenaKryoRegistrator",
          "net.sansa_stack.query.spark.ontop.OntopKryoRegistrator",
          "net.sansa_stack.query.spark.sparqlify.KryoRegistratorSparqlify"))
        .config("spark.sql.crossJoin.enabled", true) // needs to be enabled if your SPARQL query does make use of cartesian product Note: in Spark 3.x it's enabled by default
        .getOrCreate()

// lets assume two separate RDF files
val pathToRdfFile1 = "path/to/rdf1.nt"
val pathToRdfFile2 = "path/to/rdf2.nt"

// load the first file into an RDD of triples (from an N-Triples file here)
val triples1 = spark.rdf(Lang.NTRIPLES)(pathToRdfFile1)

// create the main query engine
// we do provide two different SPARQL-to-SQL rewriter backends, Sparqlify and Ontop
val queryEngineFactory = new QueryEngineFactoryOntop(spark) // Ontop
// or
// val queryEngineFactory = new QueryEngineFactorySparqlify(spark) // Sparqlify

// create the query execution factory for the first dataset
val qef1 = queryEngineFactory.create(triples1)

// depending on the query type, finally execute the query
doSelectQuery()
doConstructQuery()
doAskQuery()

// a) SELECT query returns a ResultSetSpark which holds an
//    RDD of bindings and the result variables
def doSelectQuery(): Unit = {
  val query = "SELECT ..."
  val qe = qef1.createQueryExecution(query)
  val result: ResultSetSpark = qe.execSelectSpark()
  val resultBindings: RDD[Binding] = result.getBindings // the bindings, i.e. mappings from vars to RDF resources
  val resultVars: Seq[Var] = result.getResultVars // the result vars of the SPARQL query
}

// b) CONSTRUCT query returns an RDD of triples
def doConstructQuery(): Unit = {
  val query = "CONSTRUCT ..."
  val qe = qef1.createQueryExecution(query)
  val result: RDD[Triple] = qe.execConstructSpark()
}

// c) ASK query returns a boolean value
def doAskQuery(): Unit = {
  val query = "ASK ..."
  val qe = qef1.createQueryExecution(query)
  val result: Boolean = qe.execAsk()
}


// you may have noticed that for SELECT and CONSTRUCT queries we used methods ending on "Spark()"
// the reason here is that those method keep the results distributed, i.e. as an RDD
// For convenience, we do also support those methods without this behaviour, i.e. the results will be fetched to the driver
// and can be processed without the Spark pros and cons:
doSelectQueryToLocal()
doConstructQueryToLocal()

// a) SELECT query returns an Apache Jena ResultSet wrapping bindings and variables
def doSelectQueryToLocal(): Unit = {
  val query = "SELECT ..."
  val qe = qef1.createQueryExecution(query)
  val result: ResultSet = qe.execSelect()
}


// b) CONSTRUCT query and return an Apache Jena Model wrapping the triples as Statements
def doConstructQueryToLocal(): Unit = {
  val query = "CONSTRUCT ..."
  val qe = qef1.createQueryExecution(query)
  val result: Model = qe.execConstruct()
}

// so far we used only a single dataset, but during the workflow you might be using different datasets
// in that case you have to take into account that a single query execution factory is immutable and bound
// to a specific datasets
// that means for another dataset we have to create another query execution factory similar to our first one:

// load the second file into an RDD of triples (from an N-Triples file here)
val triples2 = spark.rdf(Lang.NTRIPLES)(pathToRdfFile2)

// create the query execution factory for the first dataset
val qef2 = queryEngineFactory.create(triples2)

// then run queries on the second dataset by using the our new query execution factory:
val query = "SELECT ..."
val qe = qef2.createQueryExecution(query)
val result: RDD[Triple] = qe.execConstructSpark()

// if you want to run some queries on both datasets you have to merge both before creating the query execution factory
// so either do it already during loading
val triples12 = spark.rdf(Lang.NTRIPLES)(Seq(pathToRdfFile1, pathToRdfFile2).mkString(","))
// or compute the union of them
// val triples12 = triples1.union(triples2)

// then as before, create query execution factory
val qef12 = queryEngineFactory.create(triples12)

// and run the queries ...

SPARQL 1.1 Language Support

With Ontop integrated and used as SPARQL to SQL rewriter we do cover the following SPARQL 1.1 features (unsupported features are crossed out )¹:

Section in SPARQL 1.1 Features Coverage
5. Graph Patterns BGP, FILTER 2/2
6. Including Optional Values OPTIONAL 1/1
7. Matching Alternatives UNION 1/1
8. Negation MINUS, FILTER [NOT] EXISTS 1/2
9. Property Paths PredicatePath, InversePath, ZeroOrMorePath, ... 0
10. Assignment BIND, VALUES 2/2
11. Aggregates COUNT, SUM, MIN, MAX, AVG, GROUP_CONCAT, SAMPLE 6/6
12. Subqueries Subqueries 1/1
13. RDF Dataset GRAPH, FROM [NAMED\] 1/2
14. Basic Federated Query SERVICE 0
15. Solution Seqs. & Mods. ORDER BY, SELECT, DISTINCT, REDUCED, OFFSET, LIMIT 6/6
16. Query Forms SELECT, CONSTRUCT, ASK, DESCRIBE 4/4
17.4.1. Functional Forms BOUND, IF, COALESCE, EXISTS, NOT EXISTS, ||, &&, =, sameTerm, IN, NOT IN 7/11
17.4.2. Functions on RDF Terms isIRI, isBlank, isLiteral, isNumeric, str, lang, datatype, IRI, BNODE, STRDT, STRLANG, UUID, STRUUID 11/13
17.4.3. Functions on Strings STRLEN, SUBSTR, UCASE, LCASE, STRSTARTS, STRENDS, CONTAINS, STRBEFORE, STRAFTER, ENCODE_FOR_URI, CONCAT, langMatches, REGEX, REPLACE 14/14
17.4.4. Functions on Numerics abs, round, ceil, floor, RAND 5/5
17.4.5. Functions on Dates&Times now, year, month, day, hours, minutes, seconds, timezone, tz 8/9
17.4.6. Hash Functions MD5, SHA1, SHA256, SHA384, SHA512 5/5
17.5 XPath Constructor Functions casting 0
17.6 Extensible Value Testing user defined functions 0

Limitations

  • In the implementation of function langMatches, the second argument has to a be a constant: allowing variables will have a negative impact on the performance in our framework.

¹ taken from the original Ontop web site at: https://ontop-vkg.org/guide/compliance.html#sparql-1-1

An overview is given in the FAQ section of the SANSA project page. Further documentation about the builder objects can also be found on the ScalaDoc page.

For querying heterogeneous data sources, refer to the documentation of the dedicated SANSA-DataLake component.

How to Contribute

We always welcome new contributors to the project! Please see our contribution guide for more details on how to get started contributing to SANSA.