11from datetime import datetime
22
33import luigi
4- import luigi .contrib .gcs
5- import luigi .contrib .s3
64from luigi .format import Format
75
8- from gokart .gcs_config import GCSConfig
9- from gokart .gcs_zip_client import GCSZipClient
10- from gokart .s3_config import S3Config
11- from gokart .s3_zip_client import S3ZipClient
126from gokart .zip_client import ZipClient
137
8+ try :
9+ from gokart .gcs_config import GCSConfig
10+ from gokart .gcs_zip_client import GCSZipClient
11+
12+ # to avoid warning, import here which means gcs dependencies are exist
13+ import luigi .contrib .gcs # isort: skip
14+ GCS_AVAILABLE = True
15+ except ImportError :
16+ GCS_AVAILABLE = False
17+
18+ try :
19+ from gokart .s3_config import S3Config
20+ from gokart .s3_zip_client import S3ZipClient
21+
22+ # to avoid warning, import here which means s3 dependencies are exist
23+ import luigi .contrib .s3 # isort: skip
24+ S3_AVAILABLE = True
25+ except ImportError :
26+ S3_AVAILABLE = False
27+
1428object_storage_path_prefix = ['s3://' , 'gs://' ]
1529
1630
31+ def assert_gcs_available ():
32+ if GCS_AVAILABLE :
33+ return
34+
35+ raise ImportError ('gs:// is not available. Please install gokart[gcs]' )
36+
37+
38+ def assert_s3_available ():
39+ if S3_AVAILABLE :
40+ return
41+
42+ raise ImportError ('s3:// is not available. Please install gokart[s3]' )
43+
44+
1745class ObjectStorage (object ):
1846
1947 @staticmethod
@@ -26,26 +54,32 @@ def if_object_storage_path(path: str) -> bool:
2654 @staticmethod
2755 def get_object_storage_target (path : str , format : Format ) -> luigi .Target :
2856 if path .startswith ('s3://' ):
57+ assert_s3_available ()
2958 return luigi .contrib .s3 .S3Target (path , client = S3Config ().get_s3_client (), format = format )
3059 elif path .startswith ('gs://' ):
60+ assert_gcs_available ()
3161 return luigi .contrib .gcs .GCSTarget (path , client = GCSConfig ().get_gcs_client (), format = format )
3262 else :
3363 raise
3464
3565 @staticmethod
3666 def exists (path : str ) -> bool :
3767 if path .startswith ('s3://' ):
68+ assert_s3_available ()
3869 return S3Config ().get_s3_client ().exists (path )
3970 elif path .startswith ('gs://' ):
71+ assert_gcs_available ()
4072 return GCSConfig ().get_gcs_client ().exists (path )
4173 else :
4274 raise
4375
4476 @staticmethod
4577 def get_timestamp (path : str ) -> datetime :
4678 if path .startswith ('s3://' ):
79+ assert_s3_available ()
4780 return S3Config ().get_s3_client ().get_key (path ).last_modified
4881 elif path .startswith ('gs://' ):
82+ assert_gcs_available ()
4983 # for gcs object
5084 # should PR to luigi
5185 bucket , obj = GCSConfig ().get_gcs_client ()._path_to_bucket_and_key (path )
@@ -57,12 +91,14 @@ def get_timestamp(path: str) -> datetime:
5791 @staticmethod
5892 def get_zip_client (file_path : str , temporary_directory : str ) -> ZipClient :
5993 if file_path .startswith ('s3://' ):
94+ assert_s3_available ()
6095 return S3ZipClient (file_path = file_path , temporary_directory = temporary_directory )
6196 elif file_path .startswith ('gs://' ):
97+ assert_gcs_available ()
6298 return GCSZipClient (file_path = file_path , temporary_directory = temporary_directory )
6399 else :
64100 raise
65101
66102 @staticmethod
67103 def is_buffered_reader (file : object ):
68- return not isinstance (file , luigi .contrib .s3 .ReadableS3File )
104+ return not ( S3_AVAILABLE and isinstance (file , luigi .contrib .s3 .ReadableS3File ) )
0 commit comments