Skip to content

Commit 391cd10

Browse files
added more mapping
1 parent c8606ff commit 391cd10

13 files changed

Lines changed: 81 additions & 66 deletions

File tree

src/main/scala/org/dbpedia/databus/client/filehandling/CompileConfig.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@ class CompileConfig(val inputFormat: String,
1010
val mapping: String,
1111
val delimiter: Character,
1212
val quotation: Character,
13-
val createMapping: Boolean) {
13+
val createMapping: Boolean,
14+
val graphURI: String,
15+
val outFile: File) {
1416

1517
var sha = ""
1618
}

src/main/scala/org/dbpedia/databus/client/filehandling/FileHandler.scala

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -39,20 +39,20 @@ class FileHandler(cliConfig: CLI_Config) {
3939
mapping = cliConfig.mapping(),
4040
delimiter = cliConfig.delimiter().toCharArray.head,
4141
quotation = cliConfig.quotation().toCharArray.head,
42-
createMapping = cliConfig.createMapping()
42+
createMapping = cliConfig.createMapping(),
43+
graphURI = cliConfig.graphURI(),
44+
outFile = getOutputFile(inputFile)
4345
)
4446

45-
val outFile: File = getOutputFile(inputFile)
46-
4747
// Without any Conversion
4848
if ((config.inputCompression == config.outputCompression) && (config.inputFormat == config.outputFormat)) {
49-
copyStream(new FileInputStream(inputFile.toJava), new FileOutputStream(outFile.toJava))
50-
Some(outFile)
49+
copyStream(new FileInputStream(inputFile.toJava), new FileOutputStream(config.outFile.toJava))
50+
Some(config.outFile)
5151
}
5252
// Only Compression Conversion
5353
else if (config.inputCompression != config.outputCompression && (config.inputFormat == config.outputFormat)) {
54-
copyStream(Compressor.decompress(inputFile), Compressor.compress(outFile, config.outputCompression))
55-
Some(outFile)
54+
copyStream(Compressor.decompress(inputFile), Compressor.compress(config.outFile, config.outputCompression))
55+
Some(config.outFile)
5656
}
5757

5858
// File Format Conversion (need to uncompress anyway)
@@ -77,27 +77,25 @@ class FileHandler(cliConfig: CLI_Config) {
7777
}
7878

7979
if (formatConvertedData.isDirectory){
80-
outFile.createDirectoryIfNotExists()
80+
config.outFile.createDirectoryIfNotExists()
8181
val formatConvertedFiles = formatConvertedData.children
82-
var i = 1
8382
while(formatConvertedFiles.hasNext) {
8483
val formatConvertedFile = formatConvertedFiles.next()
8584
val newOutFile = {
86-
if (config.outputCompression.nonEmpty) outFile / s"$i.${config.outputFormat}.${config.outputCompression}"
87-
else outFile / s"$i.${config.outputFormat}"
85+
if (config.outputCompression.nonEmpty) config.outFile / s"${formatConvertedFile.name}.${config.outputCompression}"
86+
else config.outFile / s"${formatConvertedFile.name}"
8887
}
8988
val compressedOutStream = Compressor.compress(newOutFile, config.outputCompression)
9089
copyStream(new FileInputStream(formatConvertedFile.toJava), compressedOutStream)
91-
i+=1
9290
}
9391
} else {
94-
val compressedOutStream = Compressor.compress(outFile, config.outputCompression)
92+
val compressedOutStream = Compressor.compress(config.outFile, config.outputCompression)
9593
copyStream(new FileInputStream(formatConvertedData.toJava), compressedOutStream)
9694
}
9795

9896
//DELETE TEMPDIR
9997
// if (typeConvertedFile.parent.exists) typeConvertedFile.parent.delete()
100-
Some(outFile)
98+
Some(config.outFile)
10199
}
102100

103101
}

src/main/scala/org/dbpedia/databus/client/filehandling/convert/FormatConverter.scala

Lines changed: 27 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,16 @@ import org.dbpedia.databus.client.filehandling.convert.format.rdf.quads.QuadsHan
77
import org.dbpedia.databus.client.filehandling.{CompileConfig, FileUtil}
88
import org.dbpedia.databus.client.filehandling.convert.format.tsd.TSDHandler
99
import org.dbpedia.databus.client.filehandling.convert.format.rdf.triples.TripleHandler
10-
import org.dbpedia.databus.client.filehandling.convert.mapping.{MappingInfo, RDF_Quads_Mapper, RDF_Triples_Mapper, TSD_Mapper}
10+
import org.dbpedia.databus.client.filehandling.convert.mapping.{RDF_Quads_Mapper, RDF_Triples_Mapper, TSD_Mapper}
1111
import org.dbpedia.databus.client.main.CLI_Config
1212
import org.dbpedia.databus.client.sparql.QueryHandler
1313
import org.slf4j.LoggerFactory
1414

1515
import scala.util.control.Breaks.{break, breakable}
1616
import org.apache.jena.graph.Triple
1717
import org.apache.spark.rdd.RDD
18+
19+
import java.net.URLEncoder
1820
/**
1921
* Converter for tsv, csv and several RDF serializations (nt,ttl,rdfxml,json-ld, nq, trix, trig)
2022
*/
@@ -56,29 +58,28 @@ object FormatConverter {
5658
val tripleHandler = new TripleHandler()
5759

5860
//read process
59-
val triples:Array[RDD[Triple]] = {
60-
if (RDF_TRIPLES.contains(conf.inputFormat)) {
61-
Array(tripleHandler.read(file.pathAsString, conf.inputFormat))
62-
}
63-
if (RDF_QUADS.contains(conf.inputFormat)) {
64-
val quads = new QuadsHandler().read(file.pathAsString, conf.inputFormat)
65-
RDF_Quads_Mapper.map_to_triples(quads)
66-
}
67-
else Array(TSD_Mapper.map_to_triples(file, conf))
68-
}
69-
70-
//write process
71-
if (triples.length>1){
72-
tripleHandler.write(triples.head, conf.outputFormat)
73-
} else {
74-
var i=1
75-
triples.foreach(rdd => {
76-
val convertedFile = tripleHandler.write(rdd, conf.outputFormat)
77-
convertedFile.moveTo(targetTempDir / s"$i.${conf.outputFormat}")
78-
i+=1
61+
if (RDF_QUADS.contains(conf.inputFormat)) {
62+
val quads = new QuadsHandler().read(file.pathAsString, conf.inputFormat)
63+
val triples = RDF_Quads_Mapper.map_to_triples(quads)
64+
65+
triples.foreach(triplesResult => {
66+
val convertedFile = tripleHandler.write(triplesResult.graph, conf.outputFormat)
67+
val outFile = targetTempDir / s"${conf.outFile.nameWithoutExtension}_graph=${URLEncoder.encode(triplesResult.graphName, "UTF-8")}.${conf.outputFormat}"
68+
convertedFile.moveTo(outFile)
7969
})
8070

8171
targetTempDir
72+
} else {
73+
val triples:Array[RDD[Triple]] = {
74+
if (RDF_TRIPLES.contains(conf.inputFormat)) {
75+
Array(tripleHandler.read(file.pathAsString, conf.inputFormat))
76+
}
77+
else { //TSD.contains(conf.inputFormat)
78+
Array(TSD_Mapper.map_to_triples(file, conf))
79+
}
80+
}
81+
82+
tripleHandler.write(triples.head, conf.outputFormat)
8283
}
8384

8485
}
@@ -89,41 +90,30 @@ object FormatConverter {
8990
//read process
9091
val quads = {
9192
if (RDF_QUADS.contains(conf.inputFormat)) quadsHandler.read(file.pathAsString, conf.inputFormat)
92-
else RDF_Triples_Mapper.map_to_quads(new TripleHandler().read(file.pathAsString, conf.inputFormat))
93+
else RDF_Triples_Mapper.map_to_quads(new TripleHandler().read(file.pathAsString, conf.inputFormat), conf.graphURI)
9394
}
9495

9596
//write process
9697
quadsHandler.write(quads, conf.outputFormat)
9798
}
99+
98100
else { // convert to Tabular structured data (TSD)
99101
val tsdHandler = new TSDHandler(conf.delimiter)
100102

101103
//read process
102104
val data = {
103-
if (TSD.contains(conf.inputFormat)) Array(tsdHandler.read(file.pathAsString, conf.inputFormat))
105+
if (TSD.contains(conf.inputFormat)) tsdHandler.read(file.pathAsString, conf.inputFormat)
104106
else if (RDF_QUADS.contains(conf.inputFormat)) {
105107
val quads = new QuadsHandler().read(file.pathAsString, conf.inputFormat)
106108
RDF_Quads_Mapper.map_to_tsd(quads, conf.createMapping)
107109
}
108110
else { //RDF_TRIPLES.contains(conf.inputFormat)
109111
val triples = new TripleHandler().read(file.pathAsString, conf.inputFormat)
110-
Array(RDF_Triples_Mapper.map_to_tsd(triples, conf.createMapping))
112+
RDF_Triples_Mapper.map_to_tsd(triples, conf.createMapping)
111113
}
112114
}
113115

114-
//write
115-
if (data.length==1){
116-
tsdHandler.write(data.head, conf.outputFormat)
117-
} else {
118-
var i=1
119-
data.foreach(rdd => {
120-
val convertedFile = tsdHandler.write(rdd, conf.outputFormat)
121-
convertedFile.moveTo(targetTempDir / s"$i.${conf.outputFormat}")
122-
i+=1
123-
})
124-
125-
targetTempDir
126-
}
116+
tsdHandler.write(data, conf.outputFormat)
127117
}
128118
}
129119
// FileUtil.unionFiles(tempDir, targetFile)

src/main/scala/org/dbpedia/databus/client/filehandling/convert/format/tsd/TSDHandler.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import org.apache.spark.rdd.RDD
77
import org.apache.spark.sql.{DataFrame, SparkSession}
88
import org.dbpedia.databus.client.filehandling.convert.format.tsd.format.{CSV, TSV}
99
import org.dbpedia.databus.client.filehandling.convert.format.{EquivalenceClassHandler, tsd}
10-
import org.dbpedia.databus.client.filehandling.convert.mapping.MappingInfo
10+
import org.dbpedia.databus.client.filehandling.convert.mapping.util.MappingInfo
1111

1212
/**
1313
* object to handle csv and tsv files

src/main/scala/org/dbpedia/databus/client/filehandling/convert/mapping/RDF_Quads_Mapper.scala

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,14 @@ package org.dbpedia.databus.client.filehandling.convert.mapping
33
import org.apache.spark.rdd.RDD
44
import org.apache.jena.graph.Triple
55
import org.apache.jena.sparql.core.Quad
6-
import org.apache.spark.sql.DataFrame
6+
import org.apache.spark.sql.functions.lit
7+
import org.apache.spark.sql.{Column, DataFrame}
78
import org.dbpedia.databus.client.filehandling.convert.Spark
9+
import org.dbpedia.databus.client.filehandling.convert.mapping.util.TriplesResult
810

911
object RDF_Quads_Mapper {
1012

11-
def map_to_triples(data:RDD[Quad]):Array[RDD[Triple]]={
13+
def map_to_triples(data:RDD[Quad]):Array[TriplesResult]={
1214
val graphs = data
1315
.groupBy(quad quad.getGraph)
1416
.map(_._2)
@@ -17,16 +19,30 @@ object RDF_Quads_Mapper {
1719
graphs.map(iterable => {
1820
var data: Seq[Triple] = Seq.empty
1921
val iterator = iterable.iterator
22+
var graphName = ""
2023
while (iterator.hasNext) {
21-
data = data :+ iterator.next().asTriple()
24+
val quad = iterator.next()
25+
graphName = quad.getGraph.toString
26+
data = data :+ quad.asTriple()
2227
}
23-
Spark.context.parallelize(data)
28+
new TriplesResult(graphName, Spark.context.parallelize(data))
2429
})
2530

2631
}
2732

28-
def map_to_tsd(data:RDD[Quad], createMapping:Boolean):Array[DataFrame]={
33+
def map_to_tsd(data:RDD[Quad], createMapping:Boolean):DataFrame={
2934
val triplesData = map_to_triples(data)
30-
triplesData.map(graph => RDF_Triples_Mapper.map_to_tsd(graph, createMapping))
35+
val dataFrameForEachGraph = triplesData.map(triplesResult => {
36+
val dataFrame = RDF_Triples_Mapper.map_to_tsd(triplesResult.graph, createMapping)
37+
dataFrame.show()
38+
dataFrame.withColumn("graph", lit(triplesResult.graphName))
39+
})
40+
41+
val resultDataFrame = dataFrameForEachGraph.head
42+
43+
dataFrameForEachGraph.foreach()
44+
df1.join(df2, df1.col("column").equalTo(df2("column")))
45+
dataFrameForEachGraph.reduce(_ join _)
3146
}
3247
}
48+

src/main/scala/org/dbpedia/databus/client/filehandling/convert/mapping/RDF_Triples_Mapper.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,15 @@ import org.apache.jena.sparql.core.Quad
88
import org.apache.spark.sql.types.{StringType, StructField, StructType}
99
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
1010
import org.dbpedia.databus.client.filehandling.convert.Spark
11+
import org.dbpedia.databus.client.filehandling.convert.mapping.util.Tarql_Writer
1112

1213
object RDF_Triples_Mapper {
1314

1415
val tempDir:File = File("./target/databus.tmp/temp/")
1516

16-
def map_to_quads(data:RDD[Triple]): RDD[Quad] ={
17-
data.map(triple => Quad.create(NodeFactory.createBlankNode(), triple))
17+
def map_to_quads(data:RDD[Triple], graphName:String): RDD[Quad] = {
18+
if (graphName == "DefaultGraph") data.map(triple => Quad.create(Quad.defaultGraphIRI, triple))
19+
else data.map(triple => Quad.create(NodeFactory.createURI(graphName), triple))
1820
}
1921

2022
/**

src/main/scala/org/dbpedia/databus/client/filehandling/convert/mapping/TSD_Mapper.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import org.apache.parquet.io.InputFile
1313
import org.dbpedia.databus.client.filehandling.CompileConfig
1414
import org.dbpedia.databus.client.filehandling.convert.Spark
1515
import org.dbpedia.databus.client.filehandling.convert.format.tsd
16+
import org.dbpedia.databus.client.filehandling.convert.mapping.util.MappingInfo
1617
import org.deri.tarql.{CSVOptions, TarqlParser, TarqlQueryExecutionFactory}
1718
import org.slf4j.LoggerFactory
1819

src/main/scala/org/dbpedia/databus/client/filehandling/convert/mapping/MappingInfo.scala renamed to src/main/scala/org/dbpedia/databus/client/filehandling/convert/mapping/util/MappingInfo.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package org.dbpedia.databus.client.filehandling.convert.mapping
1+
package org.dbpedia.databus.client.filehandling.convert.mapping.util
22

33
class MappingInfo(
44
val mappingFile: String,

src/main/scala/org/dbpedia/databus/client/filehandling/convert/mapping/Tarql_Writer.scala renamed to src/main/scala/org/dbpedia/databus/client/filehandling/convert/mapping/util/Tarql_Writer.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
1-
package org.dbpedia.databus.client.filehandling.convert.mapping
1+
package org.dbpedia.databus.client.filehandling.convert.mapping.util
22

3-
import better.files.File
43
import org.apache.spark.sql.DataFrame
54

65
import java.io.PrintWriter
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package org.dbpedia.databus.client.filehandling.convert.mapping.util
2+
3+
import org.apache.jena.graph.Triple
4+
import org.apache.spark.rdd.RDD
5+
6+
class TriplesResult(val graphName: String, val graph: RDD[Triple]) {}

0 commit comments

Comments
 (0)