@@ -8,6 +8,8 @@ import org.joda.time.format.PeriodFormat
88import scopt .OParser
99import scala .math ._
1010import cats .effect .kernel .Ref
11+ import cats .effect .unsafe .implicits .global
12+
1113
1214object MainGlucosinolates extends IOApp {
1315
@@ -27,34 +29,57 @@ object MainGlucosinolates extends IOApp {
2729
2830 private val builder = OParser .builder[Config ]
2931 private val parser1 = {
30- import builder ._
31- OParser .sequence(
32- programName(" MainGlucosinolates" ),
33- head(" MainGlucosinolates" , " 1.0" ),
34- opt[String ]('o' , " output" )
35- .required()
36- .action((x, c) => c.copy(outputFile = Some (x)))
37- .text(" Output file" ),
38- opt[Double ]('s' , " startRT" )
39- .optional()
40- .action((x, c) => c.copy(startRT = Some (x)))
41- .text(" start RT" ),
42- opt[Double ]('e' , " endRT" )
43- .optional()
44- .action((x, c) => c.copy(endRT = Some (x)))
45- .text(" end RT" ),
46- opt[Double ]('i' , " minIntensity" )
47- .optional()
48- .action((x, c) => c.copy(minIntensity = x))
49- .text(" Minimum intensity threshold selecting the ions of interest" ),
50- arg[String ](" <file>" )
51- .required()
52- .action((x, c) => c.copy(mzFile = Some (x)))
53- .text(" Input mzXML file" ),
54- help(" help" ).text(" prints this usage text" )
55- )
56- }
57-
32+ import builder ._
33+ OParser .sequence(
34+ programName(" MainGlucosinolates" ),
35+ head(" MainGlucosinolates" , " 1.0" ),
36+ opt[String ]('o' , " output" )
37+ .required()
38+ .action((x, c) => c.copy(outputFile = Some (x)))
39+ .text(" Output file" ),
40+ opt[Double ]('s' , " startRT" )
41+ .optional()
42+ .action((x, c) => c.copy(startRT = Some (x)))
43+ .text(" Start retention time" ),
44+ opt[Double ]('e' , " endRT" )
45+ .optional()
46+ .action((x, c) => c.copy(endRT = Some (x)))
47+ .text(" End retention time" ),
48+ opt[Double ]('i' , " minIntensity" )
49+ .optional()
50+ .action((x, c) => c.copy(minIntensity = x))
51+ .text(" Minimum intensity threshold" ),
52+ opt[Double ](" deltaMp0Mp2" )
53+ .optional()
54+ .action((x, c) => c.copy(deltaMp0Mp2 = x))
55+ .text(" Delta between M0 and M2 peaks" ),
56+ opt[Int ](" carbonMin" )
57+ .optional()
58+ .action((x, c) => c.copy(numberCarbonMin = x))
59+ .text(" Minimum number of carbon atoms" ),
60+ opt[Int ](" carbonMax" )
61+ .optional()
62+ .action((x, c) => c.copy(numberCarbonMax = x))
63+ .text(" Maximum number of carbon atoms" ),
64+ opt[Double ](" sulfurMin" )
65+ .optional()
66+ .action((x, c) => c.copy(numberSulfurMin = x))
67+ .text(" Minimum number of sulfur atoms" ),
68+ opt[Double ](" sulfurMax" )
69+ .optional()
70+ .action((x, c) => c.copy(numberSulfurMax = x))
71+ .text(" Maximum number of sulfur atoms" ),
72+ opt[Double ](" precisionMz" )
73+ .optional()
74+ .action((x, c) => c.copy(precisionMz = x))
75+ .text(" Precision for m/z matching" ),
76+ arg[String ](" <file>" )
77+ .required()
78+ .action((x, c) => c.copy(mzFile = Some (x)))
79+ .text(" Input mzXML file" ),
80+ help(" help" ).text(" prints this usage text" )
81+ )
82+ }
5883 def run (args : List [String ]): IO [ExitCode ] = {
5984 OParser .parse(parser1, args, Config ()) match {
6085 case Some (config) =>
@@ -63,54 +88,79 @@ object MainGlucosinolates extends IOApp {
6388 IO .raiseError(new IllegalArgumentException (" Failed to parse arguments" )).as(ExitCode .Error )
6489 }
6590 }
66-
67- private def processFile (config : Config ): IO [Unit ] = {
91+ private def processFile (config : Config ): IO [Unit ] = {
6892 for {
6993 mzXMLFile <- IO .fromOption(config.mzFile)(new IllegalArgumentException (" Missing mzXML file" ))
7094 startTime = config.startRT.getOrElse(0.0 )
7195 endTime = config.endRT.getOrElse(Double .MaxValue )
96+
7297 processStart <- IO .realTime.map(rt => DateTime .now().plus(rt.toMillis))
7398 _ <- IO .println(s " Start analyze: $mzXMLFile" )
74- // resultsRef <-
75- // Ref.of[IO, List[(Double, ((Double, Double), (Double, Double), (Double, Double)))]](List.empty)
76- _ <- SpectrumRequest (mzXMLFile)
99+
100+ // Progress tracking
101+ progressRef <- Ref [IO ].of(0 )
102+ totalSpectraRef <- Ref [IO ].of(0 )
103+
104+ // First, count total spectra
105+ totalSpectra <- SpectrumRequest (mzXMLFile)
77106 .msLevel(1 )
78107 .filter(_.isDefined)
79108 .map(_.get)
80-
81- .map(s => {
82- s.retentionTimeInSeconds.foreach(rt => println(f " RT: $rt%.2f " ))
83- s
84- })
85- .filter(s => s.retentionTimeInSeconds.exists(rt => rt >= startTime && rt <= endTime))
86- .evalMap(spectrum => IO .blocking(processSpectrum(config, spectrum)))
87- // .evalMap(results => resultsRef.update(_ ++ results))
88- // .chunks
89- // .mapAsync(4)(chunk => IO.cede *> IO.pure(chunk))
90- .map(x => x.map(y => y.toString + " \n " ).mkString)
91- .through(text.utf8.encode)
92- .through(Files [IO ].writeAll(Path (config.outputFile.getOrElse(" output.txt" ))))
109+ .filter(s => s.retentionTimeInSeconds.exists(rt => rt >= startTime && rt <= endTime))
93110 .compile
94- .drain
111+ .count
95112
113+ _ <- totalSpectraRef.set(totalSpectra.toInt)
114+
115+ results <- SpectrumRequest (mzXMLFile)
116+ .msLevel(1 )
117+ .filter(_.isDefined)
118+ .map(_.get)
119+ .filter(s => s.retentionTimeInSeconds.exists(rt => rt >= startTime && rt <= endTime))
120+ .evalMap { spectrum =>
121+ for {
122+ _ <- progressRef.update(_ + 1 )
123+ current <- progressRef.get
124+ total <- totalSpectraRef.get
125+ _ <- IO .println(s " Progress: ${(current.toDouble / total * 100 ).round}% ( $current/ $total) " )
126+ result <- IO .blocking(processSpectrum(config, spectrum))
127+ } yield result
128+ }
129+ .compile
130+ .toList
96131
97- /*
98- results <- resultsRef.get
99- sortedResults = results.sortBy { case (_, ((mz0, _), _, _)) => -mz0 }
100- _ <- IO.println(s"Sorting completed. Number of results: ${sortedResults.length}")
101- _ <- Stream.emits(sortedResults)
102- .evalMap(result => IO.println(result.toString))
132+ _ <- IO .println(s " Processing completed. Number of results: ${results.length}" )
133+ sortedResults = results.flatten.sortBy { case (_, ((mz0, _), _, _)) => - mz0.toDouble }
134+
135+ _ <- IO .println(s " Sorting completed. Number of results: ${sortedResults.size}" )
136+
137+ // CSV Header
138+ csvHeader = " RetentionTime;M0_mz;M0_intensity;M1_mz;M1_intensity;M2_mz;M2_intensity"
139+
140+ _ <- Stream .emits(Seq (csvHeader) ++ sortedResults)
141+ .map {
142+ case header : String => header
143+ case result : Product =>
144+ result match {
145+ case (rt : Double , ((mz0 : Double , int0 : Double ), (mz1 : Double , int1 : Double ), (mz2 : Double , int2 : Double ))) =>
146+ s " $rt; $mz0; $int0; $mz1; $int1; $mz2; $int2"
147+ case _ => " "
148+ }
149+ }
150+ .map(_ + " \n " )
151+ .through(text.utf8.encode)
152+ .through(Files [IO ].writeAll(Path (config.outputFile.getOrElse(" output.csv" ))))
103153 .compile
104154 .drain
155+
105156 processEnd <- IO .realTime.map(rt => DateTime .now().plus(rt.toMillis))
106157 duration = new Interval (processStart, processEnd).toPeriod
107- _ <- IO.println(s"Duration: ${PeriodFormat.getDefault.print(duration)}")*/
158+ _ <- IO .println(s " Duration: ${PeriodFormat .getDefault.print(duration)}" )
159+
108160 } yield ()
109161}
110162
111-
112-
113- private def processSpectrum (config : Config , spectrum : Spectrum ): Seq [(Double , ((Double , Double ), (Double , Double ), (Double , Double )))] = {
163+ private def processSpectrum (config : Config , spectrum : Spectrum ): Seq [(Double , ((Double , Double ), (Double , Double ), (Double , Double )))] = {
114164 spectrum.peaks
115165 .filter { case (_, int0) => int0 > config.minIntensity }
116166 .flatMap { case (mz0, int0) =>
0 commit comments