2929import org .apache .iotdb .commons .pipe .config .constant .PipeProcessorConstant ;
3030import org .apache .iotdb .commons .pipe .config .constant .PipeSinkConstant ;
3131import org .apache .iotdb .commons .pipe .config .constant .PipeSourceConstant ;
32+ import org .apache .iotdb .commons .pipe .datastructure .visibility .Visibility ;
3233import org .apache .iotdb .commons .pipe .datastructure .visibility .VisibilityUtils ;
3334import org .apache .iotdb .commons .snapshot .SnapshotProcessor ;
3435import org .apache .iotdb .confignode .conf .ConfigNodeConfig ;
@@ -111,6 +112,15 @@ public boolean validateBeforeCreatingPipePlugin(
111112 final String pluginName , final boolean isSetIfNotExistsCondition ) {
112113 // both build-in and user defined pipe plugin should be unique
113114 if (pipePluginMetaKeeper .containsPipePlugin (pluginName )) {
115+ final PipePluginMeta existedPipePluginMeta =
116+ pipePluginMetaKeeper .getPipePluginMeta (pluginName );
117+ final String loadingFailureMessage = existedPipePluginMeta .getPluginLoadingExceptionMessage ();
118+ if (loadingFailureMessage != null ) {
119+ throw new PipeException (
120+ String .format (
121+ "Failed to create PipePlugin [%s], this PipePlugin exists but failed to load: %s" ,
122+ pluginName , loadingFailureMessage ));
123+ }
114124 if (isSetIfNotExistsCondition ) {
115125 return true ;
116126 }
@@ -177,6 +187,7 @@ public void checkPipePluginExistence(
177187 LOGGER .info (exceptionMessage );
178188 throw new PipeException (exceptionMessage );
179189 }
190+ checkPipePluginAvailabilityForPipeCreation (sourcePluginName , "source" );
180191
181192 final PipeParameters processorParameters = new PipeParameters (processorAttributes );
182193 final String processorPluginName =
@@ -190,6 +201,7 @@ public void checkPipePluginExistence(
190201 LOGGER .warn (exceptionMessage );
191202 throw new PipeException (exceptionMessage );
192203 }
204+ checkPipePluginAvailabilityForPipeCreation (processorPluginName , "processor" );
193205
194206 final PipeParameters sinkParameters = new PipeParameters (sinkAttributes );
195207 final String sinkPluginName =
@@ -204,13 +216,14 @@ public void checkPipePluginExistence(
204216 LOGGER .warn (exceptionMessage );
205217 throw new PipeException (exceptionMessage );
206218 }
219+ checkPipePluginAvailabilityForPipeCreation (sinkPluginName , "sink" );
207220 }
208221
209222 /////////////////////////////// Pipe Plugin Management ///////////////////////////////
210223
211224 public TSStatus createPipePlugin (final CreatePipePluginPlan createPipePluginPlan ) {
225+ final PipePluginMeta pipePluginMeta = createPipePluginPlan .getPipePluginMeta ();
212226 try {
213- final PipePluginMeta pipePluginMeta = createPipePluginPlan .getPipePluginMeta ();
214227 final String pluginName = pipePluginMeta .getPluginName ();
215228 final String className = pipePluginMeta .getClassName ();
216229 final String jarName = pipePluginMeta .getJarName ();
@@ -220,6 +233,22 @@ public TSStatus createPipePlugin(final CreatePipePluginPlan createPipePluginPlan
220233 } else {
221234 final String existed = pipePluginMetaKeeper .getPluginNameByJarName (jarName );
222235 if (Objects .nonNull (existed )) {
236+ final PipePluginMeta existedPipePluginMeta =
237+ pipePluginMetaKeeper .getPipePluginMeta (existed );
238+ final String existedLoadingFailureMessage =
239+ existedPipePluginMeta .getPluginLoadingExceptionMessage ();
240+ if (existedLoadingFailureMessage != null ) {
241+ throw new PipeException (
242+ String .format (
243+ "Failed to create PipePlugin [%s], source PipePlugin [%s] failed to load: %s" ,
244+ pluginName , existed , existedLoadingFailureMessage ));
245+ }
246+ if (!pipePluginExecutableManager .hasPluginFileUnderInstallDir (existed , jarName )) {
247+ throw new PipeException (
248+ String .format (
249+ "Failed to create PipePlugin [%s], source PipePlugin [%s] jar [%s] does not exist in install dir." ,
250+ pluginName , existed , jarName ));
251+ }
223252 pipePluginExecutableManager .linkExistedPlugin (existed , pluginName , jarName );
224253 computeFromPluginClass (pluginName , className );
225254 } else {
@@ -237,7 +266,7 @@ public TSStatus createPipePlugin(final CreatePipePluginPlan createPipePluginPlan
237266 pipePluginMetaKeeper .addJarNameAndMd5 (jarName , pipePluginMeta .getJarMD5 ());
238267
239268 return new TSStatus (TSStatusCode .SUCCESS_STATUS .getStatusCode ());
240- } catch (final Exception e ) {
269+ } catch (final Throwable e ) {
241270 final String errorMessage =
242271 String .format (
243272 "Failed to execute createPipePlugin(%s) on config nodes, because of %s" ,
@@ -249,7 +278,7 @@ public TSStatus createPipePlugin(final CreatePipePluginPlan createPipePluginPlan
249278 }
250279
251280 private void savePipePluginWithRollback (final CreatePipePluginPlan createPipePluginPlan )
252- throws Exception {
281+ throws Throwable {
253282 final PipePluginMeta pipePluginMeta = createPipePluginPlan .getPipePluginMeta ();
254283 final String pluginName = pipePluginMeta .getPluginName ();
255284 final String className = pipePluginMeta .getClassName ();
@@ -258,15 +287,15 @@ private void savePipePluginWithRollback(final CreatePipePluginPlan createPipePlu
258287 pipePluginExecutableManager .savePluginToInstallDir (
259288 ByteBuffer .wrap (createPipePluginPlan .getJarFile ().getValues ()), pluginName , jarName );
260289 computeFromPluginClass (pluginName , className );
261- } catch (final Exception e ) {
290+ } catch (final Throwable e ) {
262291 // We need to rollback if the creation has failed
263292 pipePluginExecutableManager .removePluginFileUnderLibRoot (pluginName , jarName );
264293 throw e ;
265294 }
266295 }
267296
268297 private void computeFromPluginClass (final String pluginName , final String className )
269- throws Exception {
298+ throws Throwable {
270299 final String pluginDirPath = pipePluginExecutableManager .getPluginsDirPath (pluginName );
271300 final PipePluginClassLoader pipePluginClassLoader =
272301 classLoaderManager .createPipePluginClassLoader (pluginDirPath );
@@ -275,7 +304,7 @@ private void computeFromPluginClass(final String pluginName, final String classN
275304 pipePluginMetaKeeper .addPipePluginVisibility (
276305 pluginName , VisibilityUtils .calculateFromPluginClass (pluginClass ));
277306 classLoaderManager .addPluginAndClassLoader (pluginName , pipePluginClassLoader );
278- } catch (final Exception e ) {
307+ } catch (final Throwable e ) {
279308 try {
280309 pipePluginClassLoader .close ();
281310 } catch (final Exception ignored ) {
@@ -402,37 +431,84 @@ public void processLoadSnapshot(final File snapshotDir) throws IOException {
402431 if (pipePluginMeta .isBuiltin ()) {
403432 continue ;
404433 }
405- final String pluginName = pipePluginMeta .getPluginName ();
406- try {
407- final String pluginDirPath = pipePluginExecutableManager .getPluginsDirPath (pluginName );
408- final PipePluginClassLoader pipePluginClassLoader =
409- classLoaderManager .createPipePluginClassLoader (pluginDirPath );
410- try {
411- final Class <?> pluginClass =
412- Class .forName (pipePluginMeta .getClassName (), true , pipePluginClassLoader );
413- pipePluginMetaKeeper .addPipePluginVisibility (
414- pluginName , VisibilityUtils .calculateFromPluginClass (pluginClass ));
415- classLoaderManager .addPluginAndClassLoader (pluginName , pipePluginClassLoader );
416- } catch (final Throwable e ) {
417- try {
418- pipePluginClassLoader .close ();
419- } catch (final Exception ignored ) {
420- }
421- throw e ;
422- }
423- } catch (final Throwable e ) {
424- LOGGER .warn (
425- "Failed to load plugin class for plugin [{}] when loading snapshot [{}] " ,
426- pluginName ,
427- snapshotFile .getAbsolutePath (),
428- e );
429- }
434+ createPipePluginOnStartup (pipePluginMeta , snapshotFile );
430435 }
431436 } finally {
432437 releasePipePluginInfoLock ();
433438 }
434439 }
435440
441+ private String getRootCauseMessage (final Throwable throwable ) {
442+ Throwable current = throwable ;
443+ while (current .getCause () != null && current .getCause () != current ) {
444+ current = current .getCause ();
445+ }
446+ final String message = current .getMessage ();
447+ return current .getClass ().getSimpleName () + (message == null ? "" : (": " + message ));
448+ }
449+
450+ private void checkPipePluginAvailabilityForPipeCreation (
451+ final String pluginName , final String pluginType ) {
452+ final PipePluginMeta pipePluginMeta = pipePluginMetaKeeper .getPipePluginMeta (pluginName );
453+ final String loadingFailureMessage = pipePluginMeta .getPluginLoadingExceptionMessage ();
454+ if (loadingFailureMessage != null ) {
455+ final String exceptionMessage =
456+ String .format (
457+ "Failed to create or alter pipe, the pipe %s plugin %s failed to load: %s" ,
458+ pluginType , pluginName , loadingFailureMessage );
459+ LOGGER .warn (exceptionMessage );
460+ throw new PipeException (exceptionMessage );
461+ }
462+ }
463+
464+ private void createPipePluginOnStartup (
465+ final PipePluginMeta pipePluginMeta , final File snapshotFile ) {
466+ final String pluginName = pipePluginMeta .getPluginName ();
467+ try {
468+ final String pluginDirPath = pipePluginExecutableManager .getPluginsDirPath (pluginName );
469+ final PipePluginClassLoader pipePluginClassLoader =
470+ classLoaderManager .createPipePluginClassLoader (pluginDirPath );
471+ try {
472+ final Class <?> pluginClass =
473+ Class .forName (pipePluginMeta .getClassName (), true , pipePluginClassLoader );
474+ pipePluginMetaKeeper .addPipePluginVisibility (
475+ pluginName , VisibilityUtils .calculateFromPluginClass (pluginClass ));
476+ pipePluginMetaKeeper .addPipePluginMeta (
477+ pluginName ,
478+ new PipePluginMeta (
479+ pipePluginMeta .getPluginName (),
480+ pipePluginMeta .getClassName (),
481+ pipePluginMeta .isBuiltin (),
482+ pipePluginMeta .getJarName (),
483+ pipePluginMeta .getJarMD5 (),
484+ null ));
485+ classLoaderManager .addPluginAndClassLoader (pluginName , pipePluginClassLoader );
486+ } catch (final Throwable e ) {
487+ try {
488+ pipePluginClassLoader .close ();
489+ } catch (final Exception ignored ) {
490+ }
491+ throw e ;
492+ }
493+ } catch (final Throwable e ) {
494+ pipePluginMetaKeeper .addPipePluginMeta (
495+ pluginName ,
496+ new PipePluginMeta (
497+ pipePluginMeta .getPluginName (),
498+ pipePluginMeta .getClassName (),
499+ pipePluginMeta .isBuiltin (),
500+ pipePluginMeta .getJarName (),
501+ pipePluginMeta .getJarMD5 (),
502+ getRootCauseMessage (e )));
503+ pipePluginMetaKeeper .addPipePluginVisibility (pluginName , Visibility .BOTH );
504+ LOGGER .warn (
505+ "Failed to load plugin class for plugin [{}] when loading snapshot [{}] " ,
506+ pluginName ,
507+ snapshotFile .getAbsolutePath (),
508+ e );
509+ }
510+ }
511+
436512 /////////////////////////////// hashCode & equals ///////////////////////////////
437513
438514 @ Override
0 commit comments