Skip to content

Commit 0a980d2

Browse files
author
eisenbahnplatte
committed
finished mappings (probably)
fixed some roundtriptests
1 parent 391cd10 commit 0a980d2

53 files changed

Lines changed: 4945 additions & 1168 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ Example Application Deployment: Download the files of 5 datasets as given in the
1010
## Current State
1111

1212
**beta**:
13-
most of the times it should produce expected results for compression and RDF conversion. Please expect some code refactoring and fluctuation. There will be an open-source licence, either GPL or Apache.
13+
most of the times it should produce expected results for compression and RDF format.conversion. Please expect some code refactoring and fluctuation. There will be an open-source licence, either GPL or Apache.
1414

1515

1616
## Concept
@@ -28,7 +28,7 @@ The databus-client is designed to unify and convert data on the client-side in s
2828
* Level 1: all features finished, testing required
2929
* Level 2: using Apache Compress library covers most of the compression formats, more testing required
3030
* Level 3: Scalable RDF libraries from [SANSA-Stack](http://sansa-stack.net/) and [Databus Derive](https://github.com/dbpedia/databus-derive). Step by step, extension for all (quasi-)isomorphic [IANA mediatypes](https://www.iana.org/assignments/media-types/media-types.xhtml).
31-
* Level 4: In addition, we plan to provide a plugin mechanism to incorporate more sophisticated mapping engines as [Tarql](https://tarql.github.io/) (already implemented), [RML](http://rml.io), R2RML, [R2R](http://wifo5-03.informatik.uni-mannheim.de/bizer/r2r/) (for owl:equivalence translation) and XSLT.
31+
* Level 4: In addition, we plan to provide a plugin mechanism to incorporate more sophisticated format.mapping engines as [Tarql](https://tarql.github.io/) (already implemented), [RML](http://rml.io), R2RML, [R2R](http://wifo5-03.informatik.uni-mannheim.de/bizer/r2r/) (for owl:equivalence translation) and XSLT.
3232

3333

3434
## Usage

errorLog.log

Lines changed: 4597 additions & 0 deletions
Large diffs are not rendered by default.

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@
201201
<artifactId>scalatest-maven-plugin</artifactId>
202202
<version>1.0</version>
203203
<configuration>
204-
<suites>conversionTests.mapping.roundTripTests, conversionTests.conversion.roundTripTests</suites>
204+
<suites>archived.format.mapping.roundTripTests, format.conversion.format.conversion.roundTripTests</suites>
205205
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
206206
<junitxml>.</junitxml>
207207
<filereports>WDF TestSuite.txt</filereports>

src/main/scala/org/dbpedia/databus/client/filehandling/FileUtil.scala

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ object FileUtil {
191191
* @return format
192192
*/
193193
def getFormatType(inputFile: File, compressionInputFile: String): String = {
194-
{
194+
val format ={
195195
try {
196196
if (!(getFormatTypeWithDataID(inputFile) == "")) {
197197
getFormatTypeWithDataID(inputFile)
@@ -202,6 +202,23 @@ object FileUtil {
202202
case _: FileNotFoundException => getFormatTypeWithoutDataID(inputFile, compressionInputFile)
203203
}
204204
}
205+
206+
if (format == "rdf") "rdfxml"
207+
else format
208+
}
209+
210+
/**
211+
* read a query file as string
212+
*
213+
* @param file query file
214+
* @return query string
215+
*/
216+
def readQueryFile(file: File): String = {
217+
var queryString: String = ""
218+
for (line <- file.lineIterator) {
219+
queryString = queryString.concat(line).concat("\n")
220+
}
221+
queryString
205222
}
206223

207224
/**

src/main/scala/org/dbpedia/databus/client/filehandling/SourceHandler.scala

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ class SourceHandler(conf:CLI_Config) {
2626
val sourceFile: File = File(conf.source())
2727

2828
if (sourceFile.hasExtension && sourceFile.extension.get.matches(".sparql|.query")) { // conf.source() is a query file
29-
val queryString = readQueryFile(sourceFile)
29+
val queryString = FileUtil.readQueryFile(sourceFile)
3030
handleQuery(queryString)
3131
}
3232
else { // conf.source() is an already existing file or directory
@@ -173,20 +173,6 @@ class SourceHandler(conf:CLI_Config) {
173173
handler.handleResponse(response)
174174
}
175175

176-
/**
177-
* read a query file as string
178-
*
179-
* @param file query file
180-
* @return query string
181-
*/
182-
def readQueryFile(file: File): String = {
183-
var queryString: String = ""
184-
for (line <- file.lineIterator) {
185-
queryString = queryString.concat(line).concat("\n")
186-
}
187-
queryString
188-
}
189-
190176
def printTask(sourceType: String, source: String, target: String):Unit = {
191177
val str =
192178
s"""

src/main/scala/org/dbpedia/databus/client/filehandling/convert/FormatConverter.scala

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import org.slf4j.LoggerFactory
1414

1515
import scala.util.control.Breaks.{break, breakable}
1616
import org.apache.jena.graph.Triple
17+
import org.apache.jena.sparql.core.Quad
1718
import org.apache.spark.rdd.RDD
1819

1920
import java.net.URLEncoder
@@ -90,7 +91,8 @@ object FormatConverter {
9091
//read process
9192
val quads = {
9293
if (RDF_QUADS.contains(conf.inputFormat)) quadsHandler.read(file.pathAsString, conf.inputFormat)
93-
else RDF_Triples_Mapper.map_to_quads(new TripleHandler().read(file.pathAsString, conf.inputFormat), conf.graphURI)
94+
else if (RDF_TRIPLES.contains(conf.inputFormat)) RDF_Triples_Mapper.map_to_quads(new TripleHandler().read(file.pathAsString, conf.inputFormat), conf.graphURI)
95+
else Spark.context.emptyRDD[Quad]
9496
}
9597

9698
//write process
@@ -113,19 +115,8 @@ object FormatConverter {
113115
}
114116
}
115117

118+
//write process
116119
tsdHandler.write(data, conf.outputFormat)
117120
}
118121
}
119-
// FileUtil.unionFiles(tempDir, targetFile)
120-
// if (mappingFile.exists && mappingFile != File("")) {
121-
// val mapDir = File("./mappings/")
122-
// mapDir.createDirectoryIfNotExists()
123-
// mappingFile.moveTo(mapDir / FileUtil.getSha256(targetFile), overwrite = true)
124-
// }
125-
//}
126-
//catch {
127-
// case _: RuntimeException => LoggerFactory.getLogger("UnionFilesLogger").error(s"File $targetFile already exists") //deleteAndRestart(inputFile, inputFormat, outputFormat, targetFile: File)
128-
//}
129-
//
130-
// targetFile
131122
}

src/main/scala/org/dbpedia/databus/client/filehandling/convert/format/rdf/quads/QuadsHandler.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import org.apache.spark.SparkContext
66
import org.apache.spark.rdd.RDD
77
import org.dbpedia.databus.client.filehandling.FileUtil
88
import org.dbpedia.databus.client.filehandling.convert.format.EquivalenceClassHandler
9-
import org.dbpedia.databus.client.filehandling.convert.format.rdf.quads.format.{NQuads, Trig, Trix}
9+
import org.dbpedia.databus.client.filehandling.convert.format.rdf.quads.format.{JsonLD, NQuads, Trig, Trix}
1010

1111
class QuadsHandler extends EquivalenceClassHandler[RDD[Quad]]{
1212

@@ -23,6 +23,7 @@ class QuadsHandler extends EquivalenceClassHandler[RDD[Quad]]{
2323
case "nq" => new NQuads().read(source)
2424
case "trig" => new Trix().read(source)
2525
case "trix" => new Trig().read(source)
26+
case "jsonld" => new JsonLD().read(source)
2627
}
2728
}
2829

@@ -38,6 +39,7 @@ class QuadsHandler extends EquivalenceClassHandler[RDD[Quad]]{
3839
case "nq" => new NQuads().write(data)
3940
case "trig" => new Trig().write(data)
4041
case "trix" => new Trix().write(data)
42+
case "jsonld" => new JsonLD().write(data)
4143
}
4244

4345
}

src/main/scala/org/dbpedia/databus/client/filehandling/convert/format/rdf/quads/format/JsonLD.scala

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,40 @@
11
package org.dbpedia.databus.client.filehandling.convert.format.rdf.quads.format
22

33
import better.files.File
4-
import org.apache.jena.graph.{NodeFactory, Triple}
5-
import org.apache.jena.rdf.model.{ModelFactory, ResourceFactory}
6-
import org.apache.jena.riot.{RDFDataMgr, RDFFormat}
4+
import org.apache.jena.riot.Lang
75
import org.apache.jena.sparql.core.Quad
8-
import org.apache.spark.SparkContext
96
import org.apache.spark.rdd.RDD
10-
import org.apache.spark.sql.SparkSession
117
import org.dbpedia.databus.client.filehandling.convert.format.Format
128

13-
import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
14-
import scala.io.{Codec, Source}
9+
class JsonLD extends Format[RDD[Quad]] {
10+
11+
override def read(source: String): RDD[Quad] = {
12+
new Trix(Lang.JSONLD).read(source)
13+
}
14+
15+
override def write(t: RDD[Quad]): File = {
16+
new Trix(Lang.JSONLD).write(t)
17+
}
18+
}
19+
//
20+
//import better.files.File
21+
//import org.apache.jena.graph.{NodeFactory, Triple}
22+
//import org.apache.jena.rdf.model.{ModelFactory, ResourceFactory}
23+
//import org.apache.jena.riot.{RDFDataMgr, RDFFormat}
24+
//import org.apache.jena.sparql.core.Quad
25+
//import org.apache.spark.SparkContext
26+
//import org.apache.spark.rdd.RDD
27+
//import org.apache.spark.sql.SparkSession
28+
//import org.dbpedia.databus.client.filehandling.convert.format.Format
29+
//
30+
//import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
31+
//import scala.io.{Codec, Source}
1532

1633
//class JsonLD extends Format[RDD[Quad]] {
1734
//
1835
//// def readJSONL(spark: SparkSession, inputFile: File): RDD[Triple] = {
19-
//// val sc = spark.sparkContext
20-
//// val data = sc.textFile(inputFile.pathAsString)
36+
// val sc = spark.sparkContext
37+
// val data = sc.textFile(inputFile.pathAsString)
2138
//// var tripleRDD = sc.emptyRDD[Triple]
2239
////
2340
//// // data.foreach(println(_))

src/main/scala/org/dbpedia/databus/client/filehandling/convert/format/rdf/triples/TripleHandler.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ class TripleHandler extends EquivalenceClassHandler[RDD[Triple]] {
2323

2424
inputFormat match {
2525
case "nt" => new NTriples().read(source)
26-
case "rdf" => new RDFXML().read(source)
26+
case "rdfxml" => new RDFXML().read(source)
2727
case "ttl" =>
2828
//wie geht das besser?
2929
try {

src/main/scala/org/dbpedia/databus/client/filehandling/convert/mapping/RDF_Quads_Mapper.scala

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package org.dbpedia.databus.client.filehandling.convert.mapping
33
import org.apache.spark.rdd.RDD
44
import org.apache.jena.graph.Triple
55
import org.apache.jena.sparql.core.Quad
6-
import org.apache.spark.sql.functions.lit
6+
import org.apache.spark.sql.functions.{col, lit}
77
import org.apache.spark.sql.{Column, DataFrame}
88
import org.dbpedia.databus.client.filehandling.convert.Spark
99
import org.dbpedia.databus.client.filehandling.convert.mapping.util.TriplesResult
@@ -31,18 +31,29 @@ object RDF_Quads_Mapper {
3131
}
3232

3333
def map_to_tsd(data:RDD[Quad], createMapping:Boolean):DataFrame={
34+
//calculate partly results
3435
val triplesData = map_to_triples(data)
3536
val dataFrameForEachGraph = triplesData.map(triplesResult => {
3637
val dataFrame = RDF_Triples_Mapper.map_to_tsd(triplesResult.graph, createMapping)
37-
dataFrame.show()
3838
dataFrame.withColumn("graph", lit(triplesResult.graphName))
3939
})
4040

41-
val resultDataFrame = dataFrameForEachGraph.head
41+
//join partly results
42+
var resultDataFrame = dataFrameForEachGraph.head
4243

43-
dataFrameForEachGraph.foreach()
44-
df1.join(df2, df1.col("column").equalTo(df2("column")))
45-
dataFrameForEachGraph.reduce(_ join _)
44+
dataFrameForEachGraph.foreach(df => {
45+
var columns = Seq.empty[String]
46+
resultDataFrame.columns.foreach(col => {
47+
if (df.columns.contains(col)) columns = columns:+col
48+
})
49+
resultDataFrame=resultDataFrame.join(df, columns, "outer")
50+
})
51+
52+
//sort DataFrame
53+
val columns = resultDataFrame.columns
54+
val graphColIndex = columns.indexOf("graph")
55+
val cols = columns.updated(graphColIndex, columns.head).updated(0, "graph").toSeq
56+
resultDataFrame.select(cols.map(x=>col(x)):_*).sort("graph")
4657
}
4758
}
4859

0 commit comments

Comments
 (0)