Iceberg ingestion user guide
Existing CDC vs. Iceberg Workflows
Section titled “Existing CDC vs. Iceberg Workflows”This document outlines the proposed user experience for our Iceberg data ingestion experience, contrasting it with the existing CDC workflow.
Workflow Comparison
Section titled “Workflow Comparison”| Aspect | Existing CDC Workflow | Iceberg Workflow |
|---|---|---|
| Authentication | Relies on application session token to talk to Snowflake databases. | Relies on Programming Access Tokens (PAT) to exchange for Iceberg OAuth token to talk to Iceberg catalog. |
| Authorization | Users grant us access via delegation using object reference. | Users grant us access to data (Snowflake DB, schema, table) explicitly. |
| Change Detection | Relies on Snowflake’s native change tracking feature. | Scan Iceberg catalog to detect change for maximum catalog compatibility. |
| Network Boundary | All within Snowflake network boundary. | Requires egress to access Iceberg table storage. |
Limitations
Section titled “Limitations”While the functionality works for the customer outcomes, there are limitation in the current integration of Native Iceberg table with RAI Native application. This impacts:
Authentication Model
Section titled “Authentication Model”- Relies on Programming Access Tokens (PATs) instead of standard SPCS session tokens.
Network Requirements
Section titled “Network Requirements”- Requires network egress configurations for Iceberg access.
Iceberg user workflow
Section titled “Iceberg user workflow”The workflow requires additional privileges on the consumer side. Since Snowflake does not allow exposure of app-owned stored procedures running with consumer privileges, functions are exposed via the RAI CLI.
Admin Setup for using Iceberg data
Section titled “Admin Setup for using Iceberg data”All these SQL statements requires elevated privileges:
- Create database for all configurations
- Object creations require the privilege to create that object type. E.g.
CREATE ROLE. - Object modifications require the ownership of that object. E.g.
OWNERSHIPto that role.
Create database for all privileges configurations
Section titled “Create database for all privileges configurations”We need a database for user-owned Iceberg utilities and configurations, you can reuse an existing one or create a new one. If you skip this step and reuse an existing one, remember to update later queries accordingly.
CREATE DATABASE ICEBERG_UTILS_DB;USE DATABASE ICEBERG_UTILS_DB;CREATE SCHEMA UTILS;CREATE SCHEMA CONFIGS;Create Service User and Role
Section titled “Create Service User and Role”This User and Role are created to be able to creation the PAT tokens as part of accessing the Iceberg rest catalog.
CREATE OR REPLACE ROLE HORIZON_REST_SRV_ACCOUNT_ROLE;
CREATE OR REPLACE USER HORIZON_REST_SRV_ACCOUNT_USER TYPE=SERVICE DEFAULT_ROLE=HORIZON_REST_SRV_ACCOUNT_ROLE;
GRANT ROLE HORIZON_REST_SRV_ACCOUNT_ROLE TO USER HORIZON_REST_SRV_ACCOUNT_USER;Create Network Rule and Policy
Section titled “Create Network Rule and Policy”This network policy rule (EAI) is created for the RAI native app to be able to communicate with the Iceberg rest catalog.
USE SCHEMA ICEBERG_UTILS_DB.CONFIGS;CREATE OR REPLACE NETWORK RULE BLOCK_PUBLIC_ACCESS_NT_RULE MODE = INGRESS TYPE = IPV4 VALUE_LIST = ('0.0.0.0/0');
CREATE NETWORK POLICY IF NOT EXISTS BLOCK_PUBLIC_POLICY BLOCKED_NETWORK_RULE_LIST=('BLOCK_PUBLIC_ACCESS_NT_RULE');
ALTER USER HORIZON_REST_SRV_ACCOUNT_USER SET NETWORK_POLICY = 'BLOCK_PUBLIC_POLICY';The service user’s role and name may permit customizable options.
Authorize App to manage the service User
Section titled “Authorize App to manage the service User”//requires user owner previliege, practically, accountadmin Grant RAI permision for user.GRANT MODIFY PROGRAMMATIC AUTHENTICATION METHODS ON USER HORIZON_REST_SRV_ACCOUNT_USER TO APPLICATION RELATIONALAI;Bind External Access Integration (EAI)
Section titled “Bind External Access Integration (EAI)”//require CREATE NETWORK RULE privilegeCREATE OR REPLACE NETWORK RULE iceberg_nt_rule TYPE = HOST_PORT MODE = EGRESS VALUE_LIST = ();
//require either previliege of CREATE INTEGRATION OR CREATE EAICREATE OR REPLACE EXTERNAL ACCESS INTEGRATION ICEBERG_EXT_ACC_INT ALLOWED_NETWORK_RULES = (ICEBERG_UTILS_DB.CONFIGS.iceberg_nt_rule) ENABLED = TRUE;
//must be called using accountadmin roleCALL relationalai.app.update_reference('iceberg_eai', 'SET', SYSTEM$REFERENCE('EXTERNAL ACCESS INTEGRATION', 'ICEBERG_EXT_ACC_INT', 'PERSISTENT', 'USAGE'));Delegates app users to manage network rules
Section titled “Delegates app users to manage network rules”This is only to create the sproc and delegate it to data stream users. The actual invocation of it happens during data registration (next section). This allows the users to separate elevated privileges from regular data user’s privileges.
// Set up the sproc by the account admin so that users can run it.// requires account_admin
CREATE OR REPLACE PROCEDURE ICEBERG_UTILS_DB.UTILS.setup_iceberg_network_rule( network_rule_name STRING, iceberg_table_name STRING)RETURNS STRINGLANGUAGE SQLEXECUTE AS OWNERAS$$DECLARE iceberg_info VARIANT; bucket_name STRING; base_s3_url STRING; curl_response STRING; region STRING; regional_s3_url STRING; storage_location STRING; storage_type STRING; storage_host STRING; dfs_host STRING; current_value_list_str STRING; new_value_list_str STRING;BEGIN -- 1. find out the bucket name iceberg_info := (SELECT PARSE_JSON(SYSTEM$GET_ICEBERG_TABLE_INFORMATION(:iceberg_table_name))); storage_location := iceberg_info:metadataLocation::STRING;
-- Determine storage type IF (STARTSWITH(storage_location, 's3://')) THEN storage_type := 'S3'; -- Parse the JSON to extract the bucket name from the base location -- Example: s3://nsh-ibt/path -> extract nsh-ibt bucket_name := REGEXP_SUBSTR(storage_location, 's3://([^/]+)/', 1, 1, 'e', 1); base_s3_url := bucket_name || '.s3.amazonaws.com'; storage_host := base_s3_url; ELSEIF (STARTSWITH(storage_location, 'azure://')) THEN storage_type := 'AZURE'; -- Parse Azure blob storage URL -- Example: azure://ibtexteranl.blob.core.windows.net/... -> extract ibtexteranl.blob.core.windows.net storage_host := REGEXP_SUBSTR(storage_location, 'azure://([^/]+)', 1, 1, 'e', 1); ELSE RETURN 'Unsupported storage type: ' || storage_location; END IF;
-- 2. whitelist the storage host -- Retrieve current VALUE_LIST using DESCRIBE + RESULT_SCAN EXECUTE IMMEDIATE 'DESCRIBE NETWORK RULE ' || network_rule_name; SELECT "value_list" INTO :current_value_list_str FROM TABLE(RESULT_SCAN(LAST_QUERY_ID())); IF (current_value_list_str IS NULL OR TRIM(current_value_list_str) = '') THEN new_value_list_str := storage_host; ELSE -- Only append if not already present IF (POSITION(storage_host IN current_value_list_str) = 0) THEN new_value_list_str := current_value_list_str || ',' || storage_host; ELSE new_value_list_str := current_value_list_str; END IF; END IF; -- Convert comma-separated string to individually quoted values: 'a,b' -> '''a'',''b''' LET quoted_value_list STRING := '''' || REPLACE(new_value_list_str, ',', ''',''') || ''''; EXECUTE IMMEDIATE 'ALTER NETWORK RULE ' || network_rule_name || ' SET VALUE_LIST = (' || quoted_value_list || ')';
-- 3. Re-read the current VALUE_LIST after step 2 update EXECUTE IMMEDIATE 'DESCRIBE NETWORK RULE ' || network_rule_name; SELECT "value_list" INTO :current_value_list_str FROM TABLE(RESULT_SCAN(LAST_QUERY_ID())); new_value_list_str := current_value_list_str;
-- For S3, find the region and add the regional URL IF (storage_type = 'S3') THEN -- curl -I https://s3.amazonaws.com/bucket-name | grep bucket-region CALL curl_request('https://' || :base_s3_url, 'HEAD', NULL, NULL) INTO :curl_response; -- Extract region from x-amz-bucket-region header region := REGEXP_SUBSTR(:curl_response, 'x-amz-bucket-region: ([^\n\r]+)', 1, 1, 'ie', 1); regional_s3_url := bucket_name || '.s3.' || :region || '.amazonaws.com';
IF (new_value_list_str IS NULL OR TRIM(new_value_list_str) = '') THEN new_value_list_str := base_s3_url; ELSE IF (POSITION(base_s3_url IN new_value_list_str) = 0) THEN new_value_list_str := new_value_list_str || ',' || base_s3_url; END IF; END IF; IF (POSITION(regional_s3_url IN new_value_list_str) = 0) THEN new_value_list_str := new_value_list_str || ',' || regional_s3_url; END IF; -- Convert comma-separated string to individually quoted values: 'a,b' -> '''a'',''b''' LET quoted_value_list2 STRING := '''' || REPLACE(new_value_list_str, ',', ''',''') || ''''; EXECUTE IMMEDIATE 'ALTER NETWORK RULE ' || network_rule_name || ' SET VALUE_LIST = (' || quoted_value_list2 || ')';
RETURN 'Iceberg EAI access setup completed for S3 bucket: ' || :bucket_name || ' in region: ' || :region; ELSEIF (storage_type = 'AZURE') THEN -- For Azure, also add the DFS endpoint -- e.g. ibtexteranl.blob.core.windows.net -> ibtexteranl.dfs.core.windows.net dfs_host := REPLACE(storage_host, '.blob.core.windows.net', '.dfs.core.windows.net'); IF (POSITION(dfs_host IN new_value_list_str) = 0) THEN new_value_list_str := new_value_list_str || ',' || dfs_host; END IF; -- Convert comma-separated string to individually quoted values LET quoted_value_list3 STRING := '''' || REPLACE(new_value_list_str, ',', ''',''') || ''''; EXECUTE IMMEDIATE 'ALTER NETWORK RULE ' || network_rule_name || ' SET VALUE_LIST = (' || quoted_value_list3 || ')';
RETURN 'Iceberg EAI access setup completed for Azure storage: ' || :storage_host || ' and ' || :dfs_host; END IF;END;$$;
GRANT USAGE ON PROCEDURE ICEBERG_UTILS_DB.UTILS.setup_iceberg_network_rule(STRING,STRING) TO ROLE rai_user;
CREATE OR REPLACE PROCEDURE ICEBERG_UTILS_DB.UTILS.curl_request(url STRING, method STRING, headers ARRAY, body STRING)RETURNS STRINGLANGUAGE PYTHONRUNTIME_VERSION = '3.9'PACKAGES = ('snowflake-snowpark-python', 'requests')HANDLER = 'run'EXTERNAL_ACCESS_INTEGRATIONS = (ICEBERG_EXT_ACC_INT)AS$$def run(session, url, method, headers, body): import requests import urllib3
# Disable SSL warnings for S3 requests urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# Prepare headers as a dictionary headers_dict = {} if headers: for h in headers: if isinstance(h, dict): headers_dict.update(h) elif isinstance(h, (list, tuple)) and len(h) == 2: headers_dict[h[0]] = h[1]
try: response = requests.request( method=method.upper(), url=url, headers=headers_dict if headers_dict else None, data=body if body else None, timeout=30, allow_redirects=True, verify=True )
# For HEAD requests or when method is 'HEAD', return headers like curl -I if method.upper() == 'HEAD': result = f"HTTP/1.1 {response.status_code} {response.reason}\n" for key, value in response.headers.items(): result += f"{key}: {value}\n" return result else: return response.text except Exception as e: return f"Error: {str(e)}"$$;Register data integration for Iceberg with RAI
Section titled “Register data integration for Iceberg with RAI”- Grant privileges (db, schema, and table) to the designated service user
- Update the External Access Integration
- Establish the Data Stream using PyRel.
Grant service users access to Iceberg tables
Section titled “Grant service users access to Iceberg tables”//Requires resource owner previliege and Grant access to roleGRANT USAGE,MONITOR ON DATABASE iceberg_db TO ROLE HORIZON_REST_SRV_ACCOUNT_ROLE;GRANT USAGE,MONITOR ON SCHEMA iceberg_db.PUBLIC TO ROLE HORIZON_REST_SRV_ACCOUNT_ROLE;GRANT SELECT ON TABLE iceberg_db.PUBLIC.customer_iceberg TO ROLE HORIZON_REST_SRV_ACCOUNT_ROLE;
//If the external volume is not snowflake-managed, you also need thisGRANT USAGE ON EXTERNAL VOLUME $underlying_volume TO ROLE $user_role.Update External Access Integration
Section titled “Update External Access Integration”CALL ICEBERG_UTILS_DB.UTILS.setup_iceberg_network_rule( 'iceberg_nt_rule', 'iceberg_db.PUBLIC.customer_iceberg')Network rule updates scale automatically as buckets increase.
Register and refresh Iceberg data
Section titled “Register and refresh Iceberg data”call relationalai.api.refresh_iceberg( api.object_reference('TABLE', 'iceberg_db.public.customer_iceberg'), 'CDC_MANAGED_ENGINE');