33using FlowSynx . Plugins . Csv . Models ;
44using FlowSynx . PluginCore . Extensions ;
55using FlowSynx . Plugins . Csv . Services ;
6+ using CsvHelper . Configuration ;
7+ using CsvHelper ;
8+ using System . Globalization ;
9+ using System . Dynamic ;
610
711namespace FlowSynx . Plugins . Csv ;
812
913public class CsvPlugin : IPlugin
1014{
1115 private IPluginLogger ? _logger ;
12- private ICsvManger _manager = null ! ;
16+ private readonly IGuidProvider _guidProvider ;
17+ private readonly IReflectionGuard _reflectionGuard ;
1318 private CsvPluginSpecifications _csvSenderSpecifications = null ! ;
1419 private bool _isInitialized ;
1520
16- public PluginMetadata Metadata
21+ public CsvPlugin ( ) : this ( new GuidProvider ( ) , new DefaultReflectionGuard ( ) ) { }
22+
23+ internal CsvPlugin ( IGuidProvider guidProvider , IReflectionGuard reflectionGuard )
1724 {
18- get
19- {
20- return new PluginMetadata
21- {
22- Id = Guid . Parse ( "81c99765-9581-4f13-ba77-86c32ae21d97" ) ,
23- Name = "Csv" ,
24- CompanyName = "FlowSynx" ,
25- Description = Resources . PluginDescription ,
26- Version = new PluginVersion ( 1 , 0 , 0 ) ,
27- Category = PluginCategory . Data ,
28- Authors = new List < string > { "FlowSynx" } ,
29- Copyright = "© FlowSynx. All rights reserved." ,
30- Icon = "flowsynx.png" ,
31- ReadMe = "README.md" ,
32- RepositoryUrl = "https://github.com/flowsynx/plugin-csv" ,
33- ProjectUrl = "https://flowsynx.io" ,
34- Tags = new List < string > ( ) { "flowSynx" , "csv" , "comma-separated-values" , "data" , "data-platform" } ,
35- } ;
36- }
25+ _guidProvider = guidProvider ?? throw new ArgumentNullException ( nameof ( guidProvider ) ) ;
26+ _reflectionGuard = reflectionGuard ?? throw new ArgumentNullException ( nameof ( reflectionGuard ) ) ;
3727 }
3828
29+ public PluginMetadata Metadata => new PluginMetadata
30+ {
31+ Id = Guid . Parse ( "81c99765-9581-4f13-ba77-86c32ae21d97" ) ,
32+ Name = "Csv" ,
33+ CompanyName = "FlowSynx" ,
34+ Description = Resources . PluginDescription ,
35+ Version = new PluginVersion ( 1 , 1 , 0 ) ,
36+ Category = PluginCategory . Data ,
37+ Authors = new List < string > { "FlowSynx" } ,
38+ Copyright = "© FlowSynx. All rights reserved." ,
39+ Icon = "flowsynx.png" ,
40+ ReadMe = "README.md" ,
41+ RepositoryUrl = "https://github.com/flowsynx/plugin-csv" ,
42+ ProjectUrl = "https://flowsynx.io" ,
43+ Tags = new List < string > ( ) { "flowSynx" , "csv" , "comma-separated-values" , "data" , "data-platform" } ,
44+ } ;
45+
3946 public PluginSpecifications ? Specifications { get ; set ; }
4047
4148 public Type SpecificationsType => typeof ( CsvPluginSpecifications ) ;
4249
50+ private Dictionary < string , ICsvOperationHandler > OperationMap => new ( StringComparer . OrdinalIgnoreCase )
51+ {
52+ [ "filter" ] = new FilterOperationHandler ( ) ,
53+ [ "map" ] = new MapOperationHandler ( )
54+ } ;
55+
56+ public IReadOnlyCollection < string > SupportedOperations => OperationMap . Keys ;
57+
4358 public Task Initialize ( IPluginLogger logger )
4459 {
4560 if ( ReflectionHelper . IsCalledViaReflection ( ) )
@@ -48,35 +63,117 @@ public Task Initialize(IPluginLogger logger)
4863 ArgumentNullException . ThrowIfNull ( logger ) ;
4964 _csvSenderSpecifications = Specifications . ToObject < CsvPluginSpecifications > ( ) ;
5065 _logger = logger ;
51- _manager = new CsvManager ( logger ) ;
5266 _isInitialized = true ;
5367 return Task . CompletedTask ;
5468 }
5569
56- public Task < object ? > ExecuteAsync ( PluginParameters parameters , CancellationToken cancellationToken )
70+ public async Task < object ? > ExecuteAsync ( PluginParameters parameters , CancellationToken cancellationToken )
5771 {
58- if ( ReflectionHelper . IsCalledViaReflection ( ) )
72+ cancellationToken . ThrowIfCancellationRequested ( ) ;
73+
74+ if ( _reflectionGuard . IsCalledViaReflection ( ) )
5975 throw new InvalidOperationException ( Resources . ReflectionBasedAccessIsNotAllowed ) ;
6076
6177 if ( ! _isInitialized )
6278 throw new InvalidOperationException ( $ "Plugin '{ Metadata . Name } ' v{ Metadata . Version } is not initialized.") ;
6379
64- var operationParameter = parameters . ToObject < OperationParameter > ( ) ;
65- var operation = operationParameter . Operation ;
66-
67- if ( OperationMap . TryGetValue ( operation , out var handler ) )
80+ var inputParameter = parameters . ToObject < InputParameter > ( ) ;
81+ if ( ! OperationMap . TryGetValue ( inputParameter . Operation , out var handler ) )
6882 {
69- return handler ( parameters , cancellationToken ) ;
83+ throw new NotSupportedException ( $ "Operation ' { inputParameter . Operation } ' is not supported." ) ;
7084 }
7185
72- throw new NotSupportedException ( $ "CSV plugin: Operation '{ operation } ' is not supported.") ;
86+ var context = ParseDataToContext ( inputParameter . Data ) ;
87+ var csv = context . Content ?? throw new ArgumentException ( "Input CSV is required." ) ;
88+
89+ using var reader = new StringReader ( csv ) ;
90+ using var csvReader = new CsvReader ( reader , new CsvConfiguration ( CultureInfo . InvariantCulture )
91+ {
92+ Delimiter = inputParameter . Delimiter ?? "," ,
93+ IgnoreBlankLines = inputParameter . IgnoreBlankLines ?? true ,
94+ HasHeaderRecord = true ,
95+ TrimOptions = TrimOptions . Trim ,
96+ DetectColumnCountChanges = true ,
97+ BadDataFound = null
98+ } ) ;
99+
100+ var records = csvReader . GetRecords < dynamic > ( ) . Select ( row =>
101+ {
102+ var expando = new ExpandoObject ( ) as IDictionary < string , object ? > ;
103+ foreach ( var kvp in ( IDictionary < string , object ? > ) row )
104+ {
105+ expando [ kvp . Key ] = kvp . Value ;
106+ }
107+ return ( ExpandoObject ) expando ;
108+ } ) . ToList ( ) ;
109+
110+ var result = handler . Handle ( records , inputParameter ) ;
111+ var csvString = await ToCsvStringAsync ( result , inputParameter ) ;
112+
113+ string filename = $ "{ _guidProvider . NewGuid ( ) } .csv";
114+ return new PluginContext ( filename , "Data" )
115+ {
116+ Format = "Csv" ,
117+ Content = csvString
118+ } ;
73119 }
74120
75- private Dictionary < string , Func < PluginParameters , CancellationToken , Task < object ? > > > OperationMap => new ( StringComparer . OrdinalIgnoreCase )
121+ private PluginContext ParseDataToContext ( object ? data )
76122 {
77- [ "read" ] = async ( parameters , cancellationToken ) => await _manager . Read ( parameters , cancellationToken ) ,
78- [ "write" ] = async ( parameters , cancellationToken ) => { await _manager . Write ( parameters , cancellationToken ) ; return null ; } ,
79- } ;
123+ if ( data is null )
124+ throw new ArgumentNullException ( nameof ( data ) , "Input data cannot be null." ) ;
80125
81- public IReadOnlyCollection < string > SupportedOperations => OperationMap . Keys ;
126+ return data switch
127+ {
128+ PluginContext singleContext => singleContext ,
129+ IEnumerable < PluginContext > => throw new NotSupportedException ( "List of PluginContext is not supported." ) ,
130+ string strData => new PluginContext ( _guidProvider . NewGuid ( ) . ToString ( ) , "Data" ) { Content = strData } ,
131+ _ => throw new NotSupportedException ( "Unsupported input data format." )
132+ } ;
133+ }
134+
135+ private async Task < string > ToCsvStringAsync ( IEnumerable < ExpandoObject > records , InputParameter inputParameter )
136+ {
137+ using var writer = new StringWriter ( ) ;
138+ using var csvWriter = new CsvWriter ( writer , new CsvConfiguration ( CultureInfo . InvariantCulture )
139+ {
140+ Delimiter = inputParameter . Delimiter ?? "," ,
141+ IgnoreBlankLines = inputParameter . IgnoreBlankLines ?? true ,
142+ HasHeaderRecord = true ,
143+ TrimOptions = TrimOptions . Trim ,
144+ DetectColumnCountChanges = true ,
145+ BadDataFound = null
146+ } ) ;
147+
148+ // Write header
149+ var firstRecord = records . FirstOrDefault ( ) ;
150+ if ( firstRecord is not null )
151+ {
152+ var headerRow = ( ( IDictionary < string , object ? > ) firstRecord ) . Keys ;
153+ foreach ( var header in headerRow )
154+ {
155+ csvWriter . WriteField ( header ) ;
156+ _logger ? . LogInfo ( $ "Column: '{ header } '") ;
157+ }
158+ await csvWriter . NextRecordAsync ( ) ;
159+
160+ // Write rows
161+ foreach ( var record in records )
162+ {
163+ var values = ( IDictionary < string , object ? > ) record ;
164+ foreach ( var value in values . Values )
165+ {
166+ csvWriter . WriteField ( value ) ;
167+ _logger ? . LogInfo ( $ "Value: '{ value } '") ;
168+ }
169+ await csvWriter . NextRecordAsync ( ) ;
170+ }
171+ }
172+
173+ await csvWriter . FlushAsync ( ) ;
174+
175+ _logger ? . LogInfo ( writer . ToString ( ) ) ;
176+
177+ return writer . ToString ( ) ;
178+ }
82179}
0 commit comments