Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 7 additions & 13 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ addCommandAlias("rctc", "reload; ctc")

// ### Dependencies ###

val zioVersion = "2.1.25"

lazy val testKitLibs = Seq(
"org.scalacheck" %% "scalacheck" % "1.19.0",
"org.scalactic" %% "scalactic" % "3.2.20",
"org.scalatest" %% "scalatest" % "3.2.20",
"dev.zio" %% "zio-test" % zioVersion,
"dev.zio" %% "zio-test-sbt" % zioVersion,
).map(_ % Test)

lazy val poi =
Expand All @@ -37,15 +38,6 @@ lazy val poi =
)
)("4.1.0")

lazy val monix =
(
(version: String) =>
Seq(
"io.monix" %% "monix-execution" % version,
"io.monix" %% "monix-eval" % version,
)
)("3.4.1")

// ### Modules ###

lazy val root =
Expand All @@ -61,7 +53,9 @@ lazy val core =
.settings(stdSettings *)
.settings(
libraryDependencies ++= Seq(
"dev.zio" %% "zio" % zioVersion,
"io.github.kantan-scala" %% "kantan.csv" % "0.11.0",
"com.github.pathikrit" %% "better-files" % "3.9.2",
) ++ monix ++ poi ++ testKitLibs
) ++ poi ++ testKitLibs,
testFrameworks += new TestFramework("zio.test.sbt.ZTestFramework"),
)
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
package com.colisweb.jruby.concurrent.constant.memory.excel

import cats.effect.Resource
import com.colisweb.jruby.concurrent.constant.memory.excel.utils.KantanExtension
import kantan.csv.{CellDecoder, CellEncoder}
import monix.eval.Task
import monix.execution.Scheduler
import monix.execution.atomic.Atomic
import org.apache.poi.ss.usermodel.*
import org.apache.poi.ss.util.WorkbookUtil
import org.apache.poi.xssf.streaming.SXSSFWorkbook
import zio.*

import java.io.{File, FileOutputStream}
import java.nio.file.{Files, Path}
import java.util.UUID
import java.util.concurrent.atomic.AtomicReference
import scala.annotation.switch
import scala.collection.immutable.SortedSet
import scala.collection.mutable.ListBuffer
Expand Down Expand Up @@ -68,46 +66,47 @@ object ConcurrentConstantMemoryExcel {

private[excel] type Row = Array[Cell]

private given Codec = Codec.UTF8
private given Scheduler = Scheduler.computation(name = "ConcurrentConstantMemoryExcel-computation")
private given Codec = Codec.UTF8

final val blankCell: Cell = Cell.BlankCell

final def stringCell(value: String): Cell = Cell.StringCell(value)

final def numericCell(value: Double): Cell = Cell.NumericCell(value)

final def newWorkbookState(sheetName: String, headerValues: Array[String]): Atomic[ConcurrentConstantMemoryState] =
Atomic(
final def newWorkbookState(
sheetName: String,
headerValues: Array[String],
): AtomicReference[ConcurrentConstantMemoryState] =
AtomicReference(
Comment thread
guizmaii marked this conversation as resolved.
ConcurrentConstantMemoryState(
sheetName = WorkbookUtil.createSafeSheetName(sheetName),
headerData = headerValues,
tmpDirectory = Files.createTempDirectory(UUID.randomUUID().toString).toFile,
tasks = List.empty,
pages = SortedSet.empty
pages = SortedSet.empty,
)
)

final def addRows(
atomicCms: Atomic[ConcurrentConstantMemoryState],
atomicCms: AtomicReference[ConcurrentConstantMemoryState],
computeRows: => Array[Row],
pageIndex: Int
pageIndex: Int,
): Unit = {
import KantanExtension.arrayEncoder

val tmpCsvFile = java.io.File.createTempFile(UUID.randomUUID().toString, ".csv", atomicCms.get().tmpDirectory)
val newPage = Page(pageIndex, tmpCsvFile.toPath)
val task = Task(tmpCsvFile.writeCsv[Row](computeRows, rfc))
val task = ZIO.attempt(tmpCsvFile.writeCsv[Row](computeRows, rfc))

atomicCms.transform { cms =>
cms.copy(pages = cms.pages + newPage, tasks = cms.tasks :+ task)
}
atomicCms.updateAndGet(cms => cms.copy(pages = cms.pages + newPage, tasks = cms.tasks :+ task))
()
}

final def writeFile(atomicCms: Atomic[ConcurrentConstantMemoryState], fileName: String): Unit = {
final def writeFile(atomicCms: AtomicReference[ConcurrentConstantMemoryState], fileName: String): Unit = {
val cms = atomicCms.get()

def computeWorkbookData(wb: SXSSFWorkbook): Task[Unit] = Task {
def computeWorkbookData(wb: SXSSFWorkbook): Task[Unit] = ZIO.attempt {
val sheet = wb.createSheet(cms.sheetName)
sheet.setDefaultColumnWidth(24)

Expand Down Expand Up @@ -147,37 +146,29 @@ object ConcurrentConstantMemoryExcel {
}

// TODO: Expose the `swallowIOExceptions` parameter in the `writeFile` function ?
def clean(swallowIOExceptions: Boolean = false): Task[Unit] = Task {
def clean(swallowIOExceptions: Boolean = false): Task[Unit] = ZIO.attempt {
import better.files.* // better-files `delete()` method also works on directories, unlike the Java one.
cms.tmpDirectory.toScala.delete(swallowIOExceptions)
()
}

// Used as a Resource to ease the clean of the temporary CSVs created during the tasks calcultation.
val computeIntermediateTmpCsvFiles: Resource[Task, Unit] =
Resource.make(Task.parSequenceUnordered(cms.tasks).flatMap(_ => Task.unit))(_ => clean())

val workbookResource: Resource[Task, SXSSFWorkbook] =
Resource.make {
// We'll manually manage the `flush` to the hard drive.
Task(new SXSSFWorkbook(-1))
}((wb: SXSSFWorkbook) =>
Task {
wb.dispose() // dispose of temporary files backing this workbook on disk. Necessary because not done in the `close()`. See: https://stackoverflow.com/a/50363245
wb.close()
}
)

val fileOutputStreamResource: Resource[Task, FileOutputStream] =
Resource.make(Task(new FileOutputStream(fileName)))(out => Task(out.close()))

computeIntermediateTmpCsvFiles
.use { _ =>
workbookResource.use { wb =>
computeWorkbookData(wb).flatMap(_ => fileOutputStreamResource.use(out => Task(wb.write(out))))
}
}
.runSyncUnsafe()
val program: ZIO[Scope, Throwable, Unit] =
for {
_ <- ZIO.acquireRelease(ZIO.collectAllParDiscard(cms.tasks))(_ => clean().orDie)
Comment thread
guizmaii marked this conversation as resolved.
wb <- ZIO.acquireRelease(ZIO.attempt(new SXSSFWorkbook(-1)))(wb =>
ZIO.succeed {
wb.dispose() // dispose of temporary files backing this workbook on disk. Necessary because not done in the `close()`. See: https://stackoverflow.com/a/50363245
wb.close()
}
)
_ <- computeWorkbookData(wb)
out <- ZIO.acquireRelease(ZIO.attempt(new FileOutputStream(fileName)))(out => ZIO.succeed(out.close()))
Comment on lines +159 to +165
Comment on lines +159 to +165
_ <- ZIO.attempt(wb.write(out))
} yield ()

Unsafe.unsafe { implicit unsafe =>
Runtime.default.unsafe.run(ZIO.scoped(program)).getOrThrowFiberFailure()
}
}

}
Loading