|
1 | 1 | package com.colisweb.jruby.concurrent.constant.memory.excel |
2 | 2 |
|
3 | | -import cats.effect.Resource |
4 | 3 | import com.colisweb.jruby.concurrent.constant.memory.excel.utils.KantanExtension |
5 | 4 | import kantan.csv.{CellDecoder, CellEncoder} |
6 | | -import monix.eval.Task |
7 | | -import monix.execution.Scheduler |
8 | | -import monix.execution.atomic.Atomic |
9 | 5 | import org.apache.poi.ss.usermodel.* |
10 | 6 | import org.apache.poi.ss.util.WorkbookUtil |
11 | 7 | import org.apache.poi.xssf.streaming.SXSSFWorkbook |
| 8 | +import zio.* |
12 | 9 |
|
13 | 10 | import java.io.{File, FileOutputStream} |
14 | 11 | import java.nio.file.{Files, Path} |
15 | 12 | import java.util.UUID |
| 13 | +import java.util.concurrent.atomic.AtomicReference |
16 | 14 | import scala.annotation.switch |
17 | 15 | import scala.collection.immutable.SortedSet |
18 | 16 | import scala.collection.mutable.ListBuffer |
@@ -68,46 +66,47 @@ object ConcurrentConstantMemoryExcel { |
68 | 66 |
|
69 | 67 | private[excel] type Row = Array[Cell] |
70 | 68 |
|
71 | | - private given Codec = Codec.UTF8 |
72 | | - private given Scheduler = Scheduler.computation(name = "ConcurrentConstantMemoryExcel-computation") |
| 69 | + private given Codec = Codec.UTF8 |
73 | 70 |
|
74 | 71 | final val blankCell: Cell = Cell.BlankCell |
75 | 72 |
|
76 | 73 | final def stringCell(value: String): Cell = Cell.StringCell(value) |
77 | 74 |
|
78 | 75 | final def numericCell(value: Double): Cell = Cell.NumericCell(value) |
79 | 76 |
|
80 | | - final def newWorkbookState(sheetName: String, headerValues: Array[String]): Atomic[ConcurrentConstantMemoryState] = |
81 | | - Atomic( |
| 77 | + final def newWorkbookState( |
| 78 | + sheetName: String, |
| 79 | + headerValues: Array[String], |
| 80 | + ): AtomicReference[ConcurrentConstantMemoryState] = |
| 81 | + AtomicReference( |
82 | 82 | ConcurrentConstantMemoryState( |
83 | 83 | sheetName = WorkbookUtil.createSafeSheetName(sheetName), |
84 | 84 | headerData = headerValues, |
85 | 85 | tmpDirectory = Files.createTempDirectory(UUID.randomUUID().toString).toFile, |
86 | 86 | tasks = List.empty, |
87 | | - pages = SortedSet.empty |
| 87 | + pages = SortedSet.empty, |
88 | 88 | ) |
89 | 89 | ) |
90 | 90 |
|
91 | 91 | final def addRows( |
92 | | - atomicCms: Atomic[ConcurrentConstantMemoryState], |
| 92 | + atomicCms: AtomicReference[ConcurrentConstantMemoryState], |
93 | 93 | computeRows: => Array[Row], |
94 | | - pageIndex: Int |
| 94 | + pageIndex: Int, |
95 | 95 | ): Unit = { |
96 | 96 | import KantanExtension.arrayEncoder |
97 | 97 |
|
98 | 98 | val tmpCsvFile = java.io.File.createTempFile(UUID.randomUUID().toString, ".csv", atomicCms.get().tmpDirectory) |
99 | 99 | val newPage = Page(pageIndex, tmpCsvFile.toPath) |
100 | | - val task = Task(tmpCsvFile.writeCsv[Row](computeRows, rfc)) |
| 100 | + val task = ZIO.attempt(tmpCsvFile.writeCsv[Row](computeRows, rfc)) |
101 | 101 |
|
102 | | - atomicCms.transform { cms => |
103 | | - cms.copy(pages = cms.pages + newPage, tasks = cms.tasks :+ task) |
104 | | - } |
| 102 | + atomicCms.updateAndGet(cms => cms.copy(pages = cms.pages + newPage, tasks = cms.tasks :+ task)) |
| 103 | + () |
105 | 104 | } |
106 | 105 |
|
107 | | - final def writeFile(atomicCms: Atomic[ConcurrentConstantMemoryState], fileName: String): Unit = { |
| 106 | + final def writeFile(atomicCms: AtomicReference[ConcurrentConstantMemoryState], fileName: String): Unit = { |
108 | 107 | val cms = atomicCms.get() |
109 | 108 |
|
110 | | - def computeWorkbookData(wb: SXSSFWorkbook): Task[Unit] = Task { |
| 109 | + def computeWorkbookData(wb: SXSSFWorkbook): Task[Unit] = ZIO.attempt { |
111 | 110 | val sheet = wb.createSheet(cms.sheetName) |
112 | 111 | sheet.setDefaultColumnWidth(24) |
113 | 112 |
|
@@ -147,37 +146,29 @@ object ConcurrentConstantMemoryExcel { |
147 | 146 | } |
148 | 147 |
|
149 | 148 | // TODO: Expose the `swallowIOExceptions` parameter in the `writeFile` function ? |
150 | | - def clean(swallowIOExceptions: Boolean = false): Task[Unit] = Task { |
| 149 | + def clean(swallowIOExceptions: Boolean = false): Task[Unit] = ZIO.attempt { |
151 | 150 | import better.files.* // better-files `delete()` method also works on directories, unlike the Java one. |
152 | 151 | cms.tmpDirectory.toScala.delete(swallowIOExceptions) |
153 | 152 | () |
154 | 153 | } |
155 | 154 |
|
156 | | - // Used as a Resource to ease the clean of the temporary CSVs created during the tasks calcultation. |
157 | | - val computeIntermediateTmpCsvFiles: Resource[Task, Unit] = |
158 | | - Resource.make(Task.parSequenceUnordered(cms.tasks).flatMap(_ => Task.unit))(_ => clean()) |
159 | | - |
160 | | - val workbookResource: Resource[Task, SXSSFWorkbook] = |
161 | | - Resource.make { |
162 | | - // We'll manually manage the `flush` to the hard drive. |
163 | | - Task(new SXSSFWorkbook(-1)) |
164 | | - }((wb: SXSSFWorkbook) => |
165 | | - Task { |
166 | | - wb.dispose() // dispose of temporary files backing this workbook on disk. Necessary because not done in the `close()`. See: https://stackoverflow.com/a/50363245 |
167 | | - wb.close() |
168 | | - } |
169 | | - ) |
170 | | - |
171 | | - val fileOutputStreamResource: Resource[Task, FileOutputStream] = |
172 | | - Resource.make(Task(new FileOutputStream(fileName)))(out => Task(out.close())) |
173 | | - |
174 | | - computeIntermediateTmpCsvFiles |
175 | | - .use { _ => |
176 | | - workbookResource.use { wb => |
177 | | - computeWorkbookData(wb).flatMap(_ => fileOutputStreamResource.use(out => Task(wb.write(out)))) |
178 | | - } |
179 | | - } |
180 | | - .runSyncUnsafe() |
| 155 | + val program: ZIO[Scope, Throwable, Unit] = |
| 156 | + for { |
| 157 | + _ <- ZIO.acquireRelease(ZIO.collectAllParDiscard(cms.tasks))(_ => clean().orDie) |
| 158 | + wb <- ZIO.acquireRelease(ZIO.attempt(new SXSSFWorkbook(-1)))(wb => |
| 159 | + ZIO.succeed { |
| 160 | + wb.dispose() // dispose of temporary files backing this workbook on disk. Necessary because not done in the `close()`. See: https://stackoverflow.com/a/50363245 |
| 161 | + wb.close() |
| 162 | + } |
| 163 | + ) |
| 164 | + _ <- computeWorkbookData(wb) |
| 165 | + out <- ZIO.acquireRelease(ZIO.attempt(new FileOutputStream(fileName)))(out => ZIO.succeed(out.close())) |
| 166 | + _ <- ZIO.attempt(wb.write(out)) |
| 167 | + } yield () |
| 168 | + |
| 169 | + Unsafe.unsafe { implicit unsafe => |
| 170 | + Runtime.default.unsafe.run(ZIO.scoped(program)).getOrThrowFiberFailure() |
| 171 | + } |
181 | 172 | } |
182 | 173 |
|
183 | 174 | } |
0 commit comments