diff --git a/azure-blob-store/docs/AzureBlobStore-batchsource.md b/azure-blob-store/docs/AzureBlobStore-batchsource.md
index a448f20..7b01b81 100644
--- a/azure-blob-store/docs/AzureBlobStore-batchsource.md
+++ b/azure-blob-store/docs/AzureBlobStore-batchsource.md
@@ -3,59 +3,203 @@
Description
-----------
-Batch source to use Microsoft Azure Blob Storage as a source.
+Batch source to read from Microsoft Azure Blob Storage (WASB) or Azure Data Lake Storage Gen2 (ABFS).
Use Case
--------
-This source is used whenever you need to read from Microsoft Azure Blob Storage. For
-example, you may want to read in files from Microsoft Azure Blob Storage, parse them and
-then store them in a Microsoft SQL Server Database.
+This source is used whenever you need to read files from Azure Blob Storage or Azure Data Lake Storage Gen2.
+For example, you may want to read log files from Azure every hour and store the results in a database or
+another storage system.
+
+Supported Path Schemes
+----------------------
+
+| Scheme | Driver | Typical Use |
+|--------|--------|-------------|
+| `wasb://` | NativeAzureFileSystem (WASB) | Azure Blob Storage |
+| `wasbs://` | NativeAzureFileSystem (WASB, SSL) | Azure Blob Storage (secure) |
+| `abfs://` | AzureBlobFileSystem (ABFS) | Azure Data Lake Storage Gen2 |
+| `abfss://` | SecureAzureBlobFileSystem (ABFS, SSL) | Azure Data Lake Storage Gen2 (secure) |
+
+Supported Authentication Methods
+---------------------------------
+
+| Method | Works with | Credentials required |
+|--------|-----------|----------------------|
+| Storage Account Key | wasb, wasbs, abfs, abfss | Storage key |
+| SAS Token | wasb, wasbs | SAS token + container |
+| Service Principal | abfs, abfss | Tenant ID, Client ID, Client secret |
+| Managed Identity | abfs, abfss | None (identity from Azure runtime) |
Properties
----------
-**Reference Name:** This will be used to uniquely identify this source for lineage, annotating metadata, etc.
-**Path:** The path on Microsoft Azure Blob Storage to use as input. The path uses filename expansion (globbing) to read
-files. The path must start with `wasb://` or `wasbs://`, for example, `wasb://mycontainer@mystorageaccount.blob.core.windows.net/filename.txt`. (Macro-enabled)
+**Reference Name:** Name used to uniquely identify this source for lineage, annotating metadata, etc.
+
+**Path:** The path to use as input. Uses filename expansion (globbing) to read files. The path scheme
+determines which driver and authentication methods are available. (Macro-enabled)
+
+- WASB example: `wasb://mycontainer@myaccount.blob.core.windows.net/data/`
+- ABFS example: `abfss://mycontainer@myaccount.dfs.core.windows.net/data/`
+
+**Account:** The Microsoft Azure Storage account FQDN. (Macro-enabled)
+
+- For `wasb://` / `wasbs://` paths, must end with `.blob.core.windows.net`
+ (e.g. `mystorageaccount.blob.core.windows.net`).
+- For `abfs://` / `abfss://` paths, must end with `.dfs.core.windows.net`
+ (e.g. `mystorageaccount.dfs.core.windows.net`).
+
+**Authentication Method:** The method used to authenticate with Azure. Defaults to `Storage Account Key`.
+
+---
+
+### Storage Account Key
+
+**Storage Key:** The base64-encoded storage key for the Azure Storage account.
+Required when authentication method is `Storage Account Key`. (Macro-enabled)
+
+---
+
+### SAS Token
+
+SAS Token authentication is supported for `wasb://` and `wasbs://` paths only.
+
+**SAS Token:** The Shared Access Signature token for the container.
+Required when authentication method is `SAS Token`. (Macro-enabled)
+
+**Container:** The blob container to connect to.
+Required when authentication method is `SAS Token`. (Macro-enabled)
+
+---
+
+### Service Principal (Azure AD)
+
+Service Principal authentication uses Azure Active Directory OAuth2 client credentials.
+Requires `abfs://` or `abfss://` paths.
+
+**Tenant ID:** The Azure Active Directory tenant (directory) ID.
+Required when authentication method is `Service Principal`. (Macro-enabled)
+
+**Client ID:** The application (client) ID of the registered Azure AD service principal.
+Required when authentication method is `Service Principal`. (Macro-enabled)
+
+**Client Secret:** The client secret value generated for the Azure AD service principal.
+Required when authentication method is `Service Principal`. (Macro-enabled)
+
+> The service principal must be assigned the **Storage Blob Data Reader** role (or equivalent)
+> on the storage account or container in Azure IAM.
+
+---
+
+### Managed Identity (Azure AD)
+
+Managed Identity authentication uses the Azure-assigned identity of the compute resource running
+the pipeline (e.g. an Azure VM, AKS node pool, or HDInsight cluster). No credentials are required.
+Requires `abfs://` or `abfss://` paths.
-**Account:** The Microsoft Azure Blob Storage account to use. The account must end with `.blob.core.windows.net`.
-For example, `mystorageaccount.blob.core.windows.net`, where `mystorageaccount` is the Microsoft
-Azure Storage account name. (Macro-enabled)
+> The managed identity must be assigned the **Storage Blob Data Reader** role (or equivalent)
+> on the storage account or container in Azure IAM.
-**Authentication Method:** The authentication method to use to connect to Microsoft Azure. Can be either
-`Storage Account Key` or `SAS Token`. Defaults to `Storage Account Key`.
+---
-**Storage Key:** The storage key for the container on the Microsoft Azure Storage account.
-Must be a valid base64 encoded storage key provided by Microsoft Azure. Required when authentication method is set
-to `Storage Account Key`. (Macro-enabled)
+**Format:** Format of the data to read. The format must be one of `avro`, `blob`, `csv`, `delimited`,
+`json`, `parquet`, `text`, `tsv`, or `xls`.
+- `blob`: Every input file is read into a separate record. Requires a schema with a field named `body` of type `bytes`.
+- `text`: Files are read line by line. Requires a schema with a field named `body` of type `string`.
+- `csv`: Comma-separated values. Supports header row and quoted values.
+- `tsv`: Tab-separated values. Supports header row and quoted values.
+- `delimited`: Delimiter-separated values using a configurable delimiter.
+- `avro`: Apache Avro binary format. Schema is read from the file.
+- `parquet`: Apache Parquet columnar format. Schema is read from the file.
+- `json`: JSON objects, one per line.
+- `xls`: Microsoft Excel spreadsheet format.
-**SAS Token:** The SAS token to use to connect to the specified container. Required when authentication method is set
-to `SAS Token`. (Macro-enabled)
+**Sample Size:** The maximum number of rows investigated for automatic data type detection.
+Default is 1000. Only used when format is `xls`.
-**Container:** The container to connect to. Required when authentication method is set to`SAS Token`. (Macro-enabled)
+**Override:** Per-column data type overrides that skip automatic type detection.
+Only used when format is `xls`.
-**Ignore Non-Existing Folders:** Identify if path needs to be ignored or not, for case when directory or file does not
-exists. If set to true it will treat the not present folder as 0 input and log a warning. Default is `false`.
+**Delimiter:** The delimiter to use when format is `delimited`. Ignored for other formats.
-**Recursive:** Boolean value to determine if files are to be read recursively from the path. Default is `false`.
+**Enable Quoted Values:** Whether to treat content between quotes as a value. Applies to `csv`, `tsv`,
+and `delimited` formats.
-Example
--------
+**Use First Row as Header:** Whether to use the first row as a header. Applies to `csv`, `tsv`,
+`delimited`, and `xls` formats.
-This example connects to Microsoft Azure Blob Storage and reads in files found in the
-specified directory. This example uses Microsoft Azure Storage 'mystorageaccount.blob.core.windows.net', using the
-'mystorageaccount' account name:
+**Terminate Reading After Empty Row:** Stop reading after the first empty row. Defaults to false.
+Only used when format is `xls`.
+
+**Select Sheet Using / Sheet Value:** Sheet selection by name or number (0-based). Only used when format is `xls`.
+
+**Maximum Split Size:** Maximum size in bytes for each input partition. Default is 128MB.
+
+**Regex Path Filter:** Regular expression that file paths must match to be included in the input.
+
+**Path Field:** Output field to store the file path each record was read from.
+Must be a string field in the output schema.
+
+**Path Filename Only:** Whether to use only the filename instead of the full URI in the path field.
+Default is false.
+
+**Read Files Recursively:** Whether to read files recursively from the path. Default is false.
+
+**Allow Empty Input:** Whether to allow an input path with no data. When false, the plugin errors if
+there is no data. Default is false.
+
+**File System Properties:** Additional Hadoop filesystem properties as a JSON object. (Macro-enabled)
+
+**File Encoding:** Character encoding for the file(s). Default is UTF-8.
+
+Examples
+--------
+
+### Storage Account Key (WASB)
{
"name": "AzureBlobStore",
"type": "batchsource",
"properties": {
- "path": "`wasb://mycontainer@mystorageaccount.blob.core.windows.net/filename.txt",
- "account": "mystorageaccount.blob.core.windows.net",
+ "referenceName": "azure_blob_input",
+ "path": "wasb://mycontainer@myaccount.blob.core.windows.net/data/",
+ "account": "myaccount.blob.core.windows.net",
"authenticationMethod": "storageAccountKey",
"storageKey": "XXXXXEEESSS/YYYY=",
- "ignoreNonExistingFolders": "false",
- "recursive": "false"
+ "format": "csv",
+ "skipHeader": "true"
+ }
+ }
+
+### Service Principal (ABFS)
+
+ {
+ "name": "AzureBlobStore",
+ "type": "batchsource",
+ "properties": {
+ "referenceName": "adls_gen2_input",
+ "path": "abfss://mycontainer@myaccount.dfs.core.windows.net/data/",
+ "account": "myaccount.dfs.core.windows.net",
+ "authenticationMethod": "servicePrincipal",
+ "tenantId": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
+ "clientId": "yyyyyyyy-yyyy-yyyy-yyyy-yyyyyyyyyyyy",
+ "clientSecret": "your-client-secret",
+ "format": "parquet",
+ "recursiveRead": "true"
+ }
+ }
+
+### Managed Identity (ABFS)
+
+ {
+ "name": "AzureBlobStore",
+ "type": "batchsource",
+ "properties": {
+ "referenceName": "adls_gen2_input",
+ "path": "abfss://mycontainer@myaccount.dfs.core.windows.net/data/",
+ "account": "myaccount.dfs.core.windows.net",
+ "authenticationMethod": "managedIdentity",
+ "format": "json"
}
- }
\ No newline at end of file
+ }
diff --git a/azure-blob-store/pom.xml b/azure-blob-store/pom.xml
index 8900f01..6ab9aa6 100644
--- a/azure-blob-store/pom.xml
+++ b/azure-blob-store/pom.xml
@@ -31,14 +31,24 @@
io.cdap.plugin
- filesource-common
- ${project.parent.version}
+ format-common
+ ${cdap.plugin.version}
org.apache.hadoop
hadoop-azure
${hadoop.version}
+
+ com.fasterxml.jackson.core
+ jackson-databind
+ ${jackson.databind.version}
+
+
+ org.apache.commons
+ commons-lang3
+ 3.14.0
+
diff --git a/azure-blob-store/src/main/java/io/cdap/plugin/source/AzureBatchSource.java b/azure-blob-store/src/main/java/io/cdap/plugin/source/AzureBatchSource.java
index 8fa14dd..090a9ec 100644
--- a/azure-blob-store/src/main/java/io/cdap/plugin/source/AzureBatchSource.java
+++ b/azure-blob-store/src/main/java/io/cdap/plugin/source/AzureBatchSource.java
@@ -17,128 +17,347 @@
package io.cdap.plugin.source;
import com.google.common.base.Strings;
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Macro;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.batch.BatchSource;
-import io.cdap.plugin.common.AbstractFileBatchSource;
-import io.cdap.plugin.common.FileSourceConfig;
+import io.cdap.cdap.etl.api.batch.BatchSourceContext;
+import io.cdap.plugin.common.Asset;
+import io.cdap.plugin.common.LineageRecorder;
+import io.cdap.plugin.format.input.PathTrackingInputFormat;
+import io.cdap.plugin.format.plugin.AbstractFileSource;
+import io.cdap.plugin.format.plugin.AbstractFileSourceConfig;
+import java.lang.reflect.Type;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
/**
- * {@link BatchSource} for Azure Blob Store.
+ * {@link BatchSource} for Azure Blob Store and Azure Data Lake Storage Gen2.
+ *
+ * Supported path schemes:
+ * wasb:// / wasbs:// — Azure Blob Storage (WASB driver)
+ * abfs:// / abfss:// — Azure Data Lake Storage Gen2 (ABFS driver)
+ *
+ * Supported authentication methods:
+ * storageAccountKey — shared key; works with wasb and abfs paths
+ * sasToken — SAS token; wasb paths only
+ * servicePrincipal — Azure AD OAuth2 client credentials; abfs paths only
+ * managedIdentity — Azure AD managed identity; abfs paths only
*/
@Plugin(type = BatchSource.PLUGIN_TYPE)
@Name("AzureBlobStore")
-@Description("Batch source to read from Azure Blob Storage.")
-public class AzureBatchSource extends AbstractFileBatchSource {
- private static final String PATH = "path";
- private static final String ACCOUNT = "account";
- private static final String AUTHENTICATION_METHOD = "authenticationMethod";
- private static final String STORAGE_ACCOUNT_KEY = "storageKey";
- private static final String SAS_TOKEN = "sasToken";
- private static final String CONTAINER = "container";
- private static final String STORAGE_ACCOUNT_KEY_AUTH_METHOD = "storageAccountKey";
- private static final String SAS_TOKEN_AUTH_METHOD = "sasToken";
-
- @SuppressWarnings("unused")
+@Description("Batch source to read from Azure Blob Storage or Azure Data Lake Storage Gen2.")
+public class AzureBatchSource extends AbstractFileSource {
+
private final AzureBatchConfig config;
+ private Asset asset;
public AzureBatchSource(AzureBatchConfig config) {
super(config);
this.config = config;
}
+ @Override
+ public void prepareRun(BatchSourceContext context) throws Exception {
+ asset = Asset.builder(config.getReferenceName())
+ .setFqn(config.getPath()).build();
+ super.prepareRun(context);
+ }
+
+ @Override
+ protected LineageRecorder getLineageRecorder(BatchSourceContext context) {
+ return new LineageRecorder(context, asset);
+ }
+
+ @Override
+ protected Map getFileSystemProperties(BatchSourceContext context) {
+ Map properties = new HashMap<>(config.getFilesystemProperties());
+
+ String path = config.getPath();
+
+ if (path != null && (path.startsWith("wasb://") || path.startsWith("wasbs://"))) {
+ properties.put("fs.wasb.impl", "org.apache.hadoop.fs.azure.NativeAzureFileSystem");
+ properties.put("fs.wasb.impl.disable.cache", "true");
+ properties.put("fs.wasbs.impl.disable.cache", "true");
+ properties.put("fs.AbstractFileSystem.wasb.impl", "org.apache.hadoop.fs.azure.Wasb");
+ } else if (path != null && (path.startsWith("abfs://") || path.startsWith("abfss://"))) {
+ properties.put("fs.abfs.impl", "org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem");
+ properties.put("fs.abfss.impl", "org.apache.hadoop.fs.azurebfs.SecureAzureBlobFileSystem");
+ properties.put("fs.AbstractFileSystem.abfs.impl", "org.apache.hadoop.fs.azurebfs.Abfs");
+ properties.put("fs.AbstractFileSystem.abfss.impl", "org.apache.hadoop.fs.azurebfs.Abfss");
+ }
+
+ String authMethod = config.authenticationMethod;
+ String account = config.account;
+
+ if (AzureBatchConfig.AUTH_STORAGE_ACCOUNT_KEY.equalsIgnoreCase(authMethod)) {
+ properties.put(String.format("fs.azure.account.key.%s", account), config.storageKey);
+
+ } else if (AzureBatchConfig.AUTH_SAS_TOKEN.equalsIgnoreCase(authMethod)) {
+ properties.put(String.format("fs.azure.sas.%s.%s", config.container, account), config.sasToken);
+
+ } else if (AzureBatchConfig.AUTH_SERVICE_PRINCIPAL.equalsIgnoreCase(authMethod)) {
+ properties.put(String.format("fs.azure.account.auth.type.%s", account), "OAuth");
+ properties.put(String.format("fs.azure.account.oauth.provider.type.%s", account),
+ "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider");
+ properties.put(String.format("fs.azure.account.oauth2.client.endpoint.%s", account),
+ String.format("https://login.microsoftonline.com/%s/oauth2/token", config.tenantId));
+ properties.put(String.format("fs.azure.account.oauth2.client.id.%s", account), config.clientId);
+ properties.put(String.format("fs.azure.account.oauth2.client.secret.%s", account), config.clientSecret);
+
+ } else if (AzureBatchConfig.AUTH_MANAGED_IDENTITY.equalsIgnoreCase(authMethod)) {
+ properties.put(String.format("fs.azure.account.auth.type.%s", account), "OAuth");
+ properties.put(String.format("fs.azure.account.oauth.provider.type.%s", account),
+ "org.apache.hadoop.fs.azurebfs.oauth2.MsiTokenProvider");
+ }
+
+ if (config.shouldCopyHeader()) {
+ properties.put(PathTrackingInputFormat.COPY_HEADER, "true");
+ }
+ if (config.getFileEncoding() != null && !config.getFileEncoding().equals(config.getDefaultFileEncoding())) {
+ properties.put(PathTrackingInputFormat.SOURCE_FILE_ENCODING, config.getFileEncoding());
+ }
+ return properties;
+ }
+
+ @Override
+ protected void recordLineage(LineageRecorder lineageRecorder, List outputFields) {
+ lineageRecorder.recordRead("Read", "Read from Azure Blob Storage.", outputFields);
+ }
+
+ @Override
+ protected boolean shouldGetSchema() {
+ return !config.containsMacro(AzureBatchConfig.NAME_PATH)
+ && !config.containsMacro("format")
+ && !config.containsMacro("delimiter")
+ && !config.containsMacro(AzureBatchConfig.NAME_ACCOUNT)
+ && !config.containsMacro(AzureBatchConfig.NAME_STORAGE_KEY)
+ && !config.containsMacro(AzureBatchConfig.NAME_SAS_TOKEN)
+ && !config.containsMacro(AzureBatchConfig.NAME_TENANT_ID)
+ && !config.containsMacro(AzureBatchConfig.NAME_CLIENT_ID)
+ && !config.containsMacro(AzureBatchConfig.NAME_CLIENT_SECRET)
+ && !config.containsMacro(AzureBatchConfig.NAME_FILE_SYSTEM_PROPERTIES);
+ }
+
/**
* Plugin config for {@link AzureBatchSource}.
*/
- public static class AzureBatchConfig extends FileSourceConfig {
- @Description("Path to file(s) to be read. If a directory is specified,terminate the path name with a '/'. " +
- "The path must start with `wasb://` or `wasbs://`.")
+ public static class AzureBatchConfig extends AbstractFileSourceConfig {
+
+ public static final String NAME_PATH = "path";
+ public static final String NAME_ACCOUNT = "account";
+ public static final String NAME_STORAGE_KEY = "storageKey";
+ public static final String NAME_SAS_TOKEN = "sasToken";
+ public static final String NAME_TENANT_ID = "tenantId";
+ public static final String NAME_CLIENT_ID = "clientId";
+ public static final String NAME_CLIENT_SECRET = "clientSecret";
+ public static final String NAME_FILE_SYSTEM_PROPERTIES = "fileSystemProperties";
+
+ static final String AUTH_STORAGE_ACCOUNT_KEY = "storageAccountKey";
+ static final String AUTH_SAS_TOKEN = "sasToken";
+ static final String AUTH_SERVICE_PRINCIPAL = "servicePrincipal";
+ static final String AUTH_MANAGED_IDENTITY = "managedIdentity";
+
+ private static final String NAME_AUTHENTICATION_METHOD = "authenticationMethod";
+ private static final String NAME_CONTAINER = "container";
+ private static final Gson GSON = new Gson();
+ private static final Type MAP_STRING_STRING_TYPE = new TypeToken