Skip to content

Iceberg ingestion user guide

This document outlines the proposed user experience for our Iceberg data ingestion experience, contrasting it with the existing CDC workflow.

AspectExisting CDC WorkflowIceberg Workflow
AuthenticationRelies on application session token to talk to Snowflake databases.Relies on Programming Access Tokens (PAT) to exchange for Iceberg OAuth token to talk to Iceberg catalog.
AuthorizationUsers grant us access via delegation using object reference.Users grant us access to data (Snowflake DB, schema, table) explicitly.
Change DetectionRelies on Snowflake’s native change tracking feature.Scan Iceberg catalog to detect change for maximum catalog compatibility.
Network BoundaryAll within Snowflake network boundary.Requires egress to access Iceberg table storage.

While the functionality works for the customer outcomes, there are limitation in the current integration of Native Iceberg table with RAI Native application. This impacts:

  • Relies on Programming Access Tokens (PATs) instead of standard SPCS session tokens.
  • Requires network egress configurations for Iceberg access.

The workflow requires additional privileges on the consumer side. Since Snowflake does not allow exposure of app-owned stored procedures running with consumer privileges, functions are exposed via the RAI CLI.

All these SQL statements requires elevated privileges:

  • Create database for all configurations
  • Object creations require the privilege to create that object type. E.g. CREATE ROLE.
  • Object modifications require the ownership of that object. E.g. OWNERSHIP to that role.

Create database for all privileges configurations

Section titled “Create database for all privileges configurations”

We need a database for user-owned Iceberg utilities and configurations, you can reuse an existing one or create a new one. If you skip this step and reuse an existing one, remember to update later queries accordingly.

CREATE DATABASE ICEBERG_UTILS_DB;
USE DATABASE ICEBERG_UTILS_DB;
CREATE SCHEMA UTILS;
CREATE SCHEMA CONFIGS;

This User and Role are created to be able to creation the PAT tokens as part of accessing the Iceberg rest catalog.

CREATE OR REPLACE ROLE HORIZON_REST_SRV_ACCOUNT_ROLE;
CREATE OR REPLACE USER HORIZON_REST_SRV_ACCOUNT_USER
TYPE=SERVICE
DEFAULT_ROLE=HORIZON_REST_SRV_ACCOUNT_ROLE;
GRANT ROLE HORIZON_REST_SRV_ACCOUNT_ROLE
TO USER HORIZON_REST_SRV_ACCOUNT_USER;

This network policy rule (EAI) is created for the RAI native app to be able to communicate with the Iceberg rest catalog.

USE SCHEMA ICEBERG_UTILS_DB.CONFIGS;
CREATE OR REPLACE NETWORK RULE BLOCK_PUBLIC_ACCESS_NT_RULE
MODE = INGRESS
TYPE = IPV4
VALUE_LIST = ('0.0.0.0/0');
CREATE NETWORK POLICY IF NOT EXISTS BLOCK_PUBLIC_POLICY
BLOCKED_NETWORK_RULE_LIST=('BLOCK_PUBLIC_ACCESS_NT_RULE');
ALTER USER HORIZON_REST_SRV_ACCOUNT_USER
SET NETWORK_POLICY = 'BLOCK_PUBLIC_POLICY';

The service user’s role and name may permit customizable options.

//requires user owner previliege, practically, accountadmin Grant RAI permision for user.
GRANT MODIFY PROGRAMMATIC AUTHENTICATION METHODS ON USER HORIZON_REST_SRV_ACCOUNT_USER TO APPLICATION RELATIONALAI;
//require CREATE NETWORK RULE privilege
CREATE OR REPLACE NETWORK RULE iceberg_nt_rule
TYPE = HOST_PORT
MODE = EGRESS
VALUE_LIST = ();
//require either previliege of CREATE INTEGRATION OR CREATE EAI
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION ICEBERG_EXT_ACC_INT
ALLOWED_NETWORK_RULES = (ICEBERG_UTILS_DB.CONFIGS.iceberg_nt_rule)
ENABLED = TRUE;
//must be called using accountadmin role
CALL relationalai.app.update_reference('iceberg_eai', 'SET', SYSTEM$REFERENCE('EXTERNAL ACCESS INTEGRATION', 'ICEBERG_EXT_ACC_INT', 'PERSISTENT', 'USAGE'));

Delegates app users to manage network rules

Section titled “Delegates app users to manage network rules”

This is only to create the sproc and delegate it to data stream users. The actual invocation of it happens during data registration (next section). This allows the users to separate elevated privileges from regular data user’s privileges.

// Set up the sproc by the account admin so that users can run it.
// requires account_admin
CREATE OR REPLACE PROCEDURE ICEBERG_UTILS_DB.UTILS.setup_iceberg_network_rule( network_rule_name STRING, iceberg_table_name STRING)
RETURNS STRING
LANGUAGE SQL
EXECUTE AS OWNER
AS
$$
DECLARE
iceberg_info VARIANT;
bucket_name STRING;
base_s3_url STRING;
curl_response STRING;
region STRING;
regional_s3_url STRING;
storage_location STRING;
storage_type STRING;
storage_host STRING;
dfs_host STRING;
current_value_list_str STRING;
new_value_list_str STRING;
BEGIN
-- 1. find out the bucket name
iceberg_info := (SELECT PARSE_JSON(SYSTEM$GET_ICEBERG_TABLE_INFORMATION(:iceberg_table_name)));
storage_location := iceberg_info:metadataLocation::STRING;
-- Determine storage type
IF (STARTSWITH(storage_location, 's3://')) THEN
storage_type := 'S3';
-- Parse the JSON to extract the bucket name from the base location
-- Example: s3://nsh-ibt/path -> extract nsh-ibt
bucket_name := REGEXP_SUBSTR(storage_location, 's3://([^/]+)/', 1, 1, 'e', 1);
base_s3_url := bucket_name || '.s3.amazonaws.com';
storage_host := base_s3_url;
ELSEIF (STARTSWITH(storage_location, 'azure://')) THEN
storage_type := 'AZURE';
-- Parse Azure blob storage URL
-- Example: azure://ibtexteranl.blob.core.windows.net/... -> extract ibtexteranl.blob.core.windows.net
storage_host := REGEXP_SUBSTR(storage_location, 'azure://([^/]+)', 1, 1, 'e', 1);
ELSE
RETURN 'Unsupported storage type: ' || storage_location;
END IF;
-- 2. whitelist the storage host
-- Retrieve current VALUE_LIST using DESCRIBE + RESULT_SCAN
EXECUTE IMMEDIATE 'DESCRIBE NETWORK RULE ' || network_rule_name;
SELECT "value_list" INTO :current_value_list_str
FROM TABLE(RESULT_SCAN(LAST_QUERY_ID()));
IF (current_value_list_str IS NULL OR TRIM(current_value_list_str) = '') THEN
new_value_list_str := storage_host;
ELSE
-- Only append if not already present
IF (POSITION(storage_host IN current_value_list_str) = 0) THEN
new_value_list_str := current_value_list_str || ',' || storage_host;
ELSE
new_value_list_str := current_value_list_str;
END IF;
END IF;
-- Convert comma-separated string to individually quoted values: 'a,b' -> '''a'',''b'''
LET quoted_value_list STRING := '''' || REPLACE(new_value_list_str, ',', ''',''') || '''';
EXECUTE IMMEDIATE 'ALTER NETWORK RULE ' || network_rule_name || ' SET VALUE_LIST = (' || quoted_value_list || ')';
-- 3. Re-read the current VALUE_LIST after step 2 update
EXECUTE IMMEDIATE 'DESCRIBE NETWORK RULE ' || network_rule_name;
SELECT "value_list" INTO :current_value_list_str
FROM TABLE(RESULT_SCAN(LAST_QUERY_ID()));
new_value_list_str := current_value_list_str;
-- For S3, find the region and add the regional URL
IF (storage_type = 'S3') THEN
-- curl -I https://s3.amazonaws.com/bucket-name | grep bucket-region
CALL curl_request('https://' || :base_s3_url, 'HEAD', NULL, NULL) INTO :curl_response;
-- Extract region from x-amz-bucket-region header
region := REGEXP_SUBSTR(:curl_response, 'x-amz-bucket-region: ([^\n\r]+)', 1, 1, 'ie', 1);
regional_s3_url := bucket_name || '.s3.' || :region || '.amazonaws.com';
IF (new_value_list_str IS NULL OR TRIM(new_value_list_str) = '') THEN
new_value_list_str := base_s3_url;
ELSE
IF (POSITION(base_s3_url IN new_value_list_str) = 0) THEN
new_value_list_str := new_value_list_str || ',' || base_s3_url;
END IF;
END IF;
IF (POSITION(regional_s3_url IN new_value_list_str) = 0) THEN
new_value_list_str := new_value_list_str || ',' || regional_s3_url;
END IF;
-- Convert comma-separated string to individually quoted values: 'a,b' -> '''a'',''b'''
LET quoted_value_list2 STRING := '''' || REPLACE(new_value_list_str, ',', ''',''') || '''';
EXECUTE IMMEDIATE 'ALTER NETWORK RULE ' || network_rule_name || ' SET VALUE_LIST = (' || quoted_value_list2 || ')';
RETURN 'Iceberg EAI access setup completed for S3 bucket: ' || :bucket_name || ' in region: ' || :region;
ELSEIF (storage_type = 'AZURE') THEN
-- For Azure, also add the DFS endpoint
-- e.g. ibtexteranl.blob.core.windows.net -> ibtexteranl.dfs.core.windows.net
dfs_host := REPLACE(storage_host, '.blob.core.windows.net', '.dfs.core.windows.net');
IF (POSITION(dfs_host IN new_value_list_str) = 0) THEN
new_value_list_str := new_value_list_str || ',' || dfs_host;
END IF;
-- Convert comma-separated string to individually quoted values
LET quoted_value_list3 STRING := '''' || REPLACE(new_value_list_str, ',', ''',''') || '''';
EXECUTE IMMEDIATE 'ALTER NETWORK RULE ' || network_rule_name || ' SET VALUE_LIST = (' || quoted_value_list3 || ')';
RETURN 'Iceberg EAI access setup completed for Azure storage: ' || :storage_host || ' and ' || :dfs_host;
END IF;
END;
$$;
GRANT USAGE ON PROCEDURE ICEBERG_UTILS_DB.UTILS.setup_iceberg_network_rule(STRING,STRING) TO ROLE rai_user;
CREATE OR REPLACE PROCEDURE ICEBERG_UTILS_DB.UTILS.curl_request(url STRING, method STRING, headers ARRAY, body STRING)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = '3.9'
PACKAGES = ('snowflake-snowpark-python', 'requests')
HANDLER = 'run'
EXTERNAL_ACCESS_INTEGRATIONS = (ICEBERG_EXT_ACC_INT)
AS
$$
def run(session, url, method, headers, body):
import requests
import urllib3
# Disable SSL warnings for S3 requests
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# Prepare headers as a dictionary
headers_dict = {}
if headers:
for h in headers:
if isinstance(h, dict):
headers_dict.update(h)
elif isinstance(h, (list, tuple)) and len(h) == 2:
headers_dict[h[0]] = h[1]
try:
response = requests.request(
method=method.upper(),
url=url,
headers=headers_dict if headers_dict else None,
data=body if body else None,
timeout=30,
allow_redirects=True,
verify=True
)
# For HEAD requests or when method is 'HEAD', return headers like curl -I
if method.upper() == 'HEAD':
result = f"HTTP/1.1 {response.status_code} {response.reason}\n"
for key, value in response.headers.items():
result += f"{key}: {value}\n"
return result
else:
return response.text
except Exception as e:
return f"Error: {str(e)}"
$$;

Register data integration for Iceberg with RAI

Section titled “Register data integration for Iceberg with RAI”
  • Grant privileges (db, schema, and table) to the designated service user
  • Update the External Access Integration
  • Establish the Data Stream using PyRel.

Grant service users access to Iceberg tables

Section titled “Grant service users access to Iceberg tables”
//Requires resource owner previliege and Grant access to role
GRANT USAGE,MONITOR ON DATABASE iceberg_db TO ROLE HORIZON_REST_SRV_ACCOUNT_ROLE;
GRANT USAGE,MONITOR ON SCHEMA iceberg_db.PUBLIC TO ROLE HORIZON_REST_SRV_ACCOUNT_ROLE;
GRANT SELECT ON TABLE iceberg_db.PUBLIC.customer_iceberg TO ROLE HORIZON_REST_SRV_ACCOUNT_ROLE;
//If the external volume is not snowflake-managed, you also need this
GRANT USAGE ON EXTERNAL VOLUME $underlying_volume TO ROLE $user_role.
CALL ICEBERG_UTILS_DB.UTILS.setup_iceberg_network_rule(
'iceberg_nt_rule',
'iceberg_db.PUBLIC.customer_iceberg')

Network rule updates scale automatically as buckets increase.

call relationalai.api.refresh_iceberg(
api.object_reference('TABLE', 'iceberg_db.public.customer_iceberg'),
'CDC_MANAGED_ENGINE'
);