Skip to content

Commit

Permalink
Add snowflake anlytics schemas (MystenLabs#15083)
Browse files Browse the repository at this point in the history
## Description 

Add schemas for setting up analytics suite in snowflake. This is a
snapshot of the existing schemas which can be reused to bootstrap
analytics framework

## Test Plan 

Followed schemas in this PR to set up parquet based data ingestion
  • Loading branch information
sadhansood authored Nov 30, 2023
1 parent 4f3fcff commit 49e089d
Show file tree
Hide file tree
Showing 8 changed files with 462 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Define the checkpoint table schema
CREATE
OR REPLACE TABLE CHECKPOINT
(
checkpoint_digest STRING
CONSTRAINT checkpoint_digest_unique UNIQUE NOT NULL,
sequence_number NUMBER(20, 0)
CONSTRAINT sequence_num_pk PRIMARY KEY NOT NULL,
epoch NUMBER(20, 0) NOT NULL,
timestamp_ms NUMBER(20, 0) NOT NULL,
previous_checkpoint_digest STRING,
end_of_epoch BOOLEAN NOT NULL,
total_gas_cost NUMBER(20, 0) NOT NULL,
computation_cost NUMBER(20, 0) NOT NULL,
storage_cost NUMBER(20, 0) NOT NULL,
storage_rebate NUMBER(20, 0) NOT NULL,
non_refundable_storage_fee NUMBER(20, 0) NOT NULL,
total_transaction_blocks NUMBER(20, 0) NOT NULL,
total_transactions NUMBER(20, 0) NOT NULL,
total_successful_transaction_blocks NUMBER(20, 0) NOT NULL,
total_successful_transactions NUMBER(20, 0) NOT NULL,
network_total_transaction NUMBER(20, 0) NOT NULL,
validator_signature STRING NOT NULL
) STAGE_FILE_FORMAT = parquet_format
STAGE_COPY_OPTIONS =
(
ABORT_STATEMENT
)
ENABLE_SCHEMA_EVOLUTION = TRUE
CLUSTER BY
(
timestamp_ms
);

// Define the checkpoint stage
CREATE OR REPLACE STAGE checkpoints_parquet_stage
URL = '&{checkpoints_bucket}/checkpoints/'
STORAGE_INTEGRATION = checkpoints_data_loader
FILE_FORMAT = parquet_format;

// Set up the checkpoint auto ingestion pipe
CREATE
OR REPLACE PIPE checkpoint_pipe
AUTO_INGEST = true
INTEGRATION = 'CHECKPOINTS_DATA_LOADER_NOTIFICATION'
AS
COPY INTO CHECKPOINT (checkpoint_digest, sequence_number, epoch, timestamp_ms, previous_checkpoint_digest,
end_of_epoch, total_gas_cost, computation_cost, storage_cost, storage_rebate,
non_refundable_storage_fee, total_transaction_blocks, total_transactions,
total_successful_transaction_blocks, total_successful_transactions,
network_total_transaction, validator_signature)
from (SELECT t.$1:checkpoint_digest as checkpoint_digest,
t.$1:sequence_number as sequence_number,
t.$1:epoch as epoch,
t.$1:timestamp_ms as timestamp_ms,
t.$1:previous_checkpoint_digest as previous_checkpoint_digest,
t.$1:end_of_epoch as end_of_epoch,
t.$1:total_gas_cost as total_gas_cost,
t.$1:computation_cost as computation_cost,
t.$1:storage_cost as storage_cost,
t.$1:storage_rebate as storage_rebate,
t.$1:non_refundable_storage_fee as non_refundable_storage_fee,
t.$1:total_transaction_blocks as total_transaction_blocks,
t.$1:total_transactions as total_transactions,
t.$1:total_successful_transaction_blocks as total_successful_transaction_blocks,
t.$1:total_successful_transactions as total_successful_transactions,
t.$1:network_total_transaction as network_total_transaction,
t.$1:validator_signature as validator_signature
from @checkpoints_parquet_stage (file_format => 'parquet_format', pattern => '.*[.]parquet') t)
file_format = parquet_format;
60 changes: 60 additions & 0 deletions crates/sui-analytics-indexer/src/store/snowflake/schemas/event.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
CREATE OR REPLACE TABLE EVENT
(
transaction_digest STRING NOT NULL,
event_index NUMBER(20, 0) NOT NULL,
checkpoint NUMBER(20, 0) NOT NULL,
epoch NUMBER(20, 0) NOT NULL,
timestamp_ms NUMBER(20, 0) NOT NULL,
sender STRING NOT NULL,
package STRING NOT NULL,
module STRING NOT NULL,
event_type STRING NOT NULL,
bcs STRING NOT NULL,
event_json VARIANT
) STAGE_FILE_FORMAT = parquet_format
STAGE_COPY_OPTIONS =
(
ABORT_STATEMENT
)
ENABLE_SCHEMA_EVOLUTION = TRUE
CLUSTER BY
(
timestamp_ms
);

// Define the object stage
CREATE OR REPLACE STAGE events_parquet_stage
URL = '&{checkpoints_bucket}/events/'
STORAGE_INTEGRATION = checkpoints_data_loader
FILE_FORMAT = parquet_format;

// Set up the checkpoint auto ingestion pipe
CREATE
OR REPLACE PIPE event_pipe
AUTO_INGEST = true
INTEGRATION = 'CHECKPOINTS_DATA_LOADER_NOTIFICATION'
AS
copy into EVENT (transaction_digest,
event_index,
checkpoint,
epoch,
timestamp_ms,
sender,
package,
module,
event_type,
bcs,
event_json)
from (SELECT t.$1:transaction_digest as transaction_digest,
t.$1:event_index as event_index,
t.$1:checkpoint as checkpoint,
t.$1:epoch as epoch,
t.$1:timestamp_ms as timestamp_ms,
t.$1:sender as sender,
t.$1:package as package,
t.$1:module as module,
t.$1:event_type as event_type,
t.$1:bcs as bcs,
parse_json(t.$1:event_json) as event_json
from @events_parquet_stage (file_format => 'parquet_format', pattern => '.*[.]parquet') t)
file_format = parquet_format;
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@

CREATE OR REPLACE TABLE MOVE_CALL(
transaction_digest STRING NOT NULL,
checkpoint NUMBER(20, 0) NOT NULL,
epoch NUMBER(20, 0) NOT NULL,
timestamp_ms NUMBER(20, 0) NOT NULL,
package STRING NOT NULL,
module STRING NOT NULL,
function_ STRING NOT NULL
) STAGE_FILE_FORMAT = parquet_format
STAGE_COPY_OPTIONS =
(
ABORT_STATEMENT
)
ENABLE_SCHEMA_EVOLUTION = TRUE
CLUSTER BY
(
timestamp_ms
);

// Define the object stage
CREATE OR REPLACE STAGE move_call_parquet_stage
URL = '&{checkpoints_bucket}/move_call/'
STORAGE_INTEGRATION = checkpoints_data_loader
FILE_FORMAT = parquet_format;

// Set up the checkpoint auto ingestion pipe
CREATE
OR REPLACE PIPE move_call_pipe
AUTO_INGEST = true
INTEGRATION = 'CHECKPOINTS_DATA_LOADER_NOTIFICATION'
AS
copy into MOVE_CALL (transaction_digest,
checkpoint,
epoch,
timestamp_ms,
package,
module,
function_)
from (SELECT t.$1:transaction_digest as transaction_digest,
t.$1:checkpoint as checkpoint,
t.$1:epoch as epoch,
t.$1:timestamp_ms as timestamp_ms,
t.$1:package as package,
t.$1:module as module,
t.$1:function_ as function_
from @move_call_parquet_stage (file_format => 'parquet_format', pattern => '.*[.]parquet') t)
file_format = parquet_format;
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
CREATE OR REPLACE TABLE OBJECT
(
object_id STRING NOT NULL,
version NUMBER(20, 0) NOT NULL,
digest STRING NOT NULL,
type STRING,
checkpoint NUMBER(20, 0) NOT NULL,
epoch NUMBER(20, 0) NOT NULL,
timestamp_ms NUMBER(20, 0) NOT NULL,
owner_type STRING NOT NULL,
owner_address STRING,
object_status STRING,
initial_shared_version NUMBER(20, 0),
previous_transaction STRING NOT NULL,
has_public_transfer BOOLEAN NOT NULL,
storage_rebate NUMBER(20, 0) NOT NULL,
bcs STRING NOT NULL,
coin_type STRING,
coin_balance NUMBER(20, 0),
struct_tag STRING,
object_json variant
) STAGE_FILE_FORMAT = parquet_format
STAGE_COPY_OPTIONS =
(
ABORT_STATEMENT
)
ENABLE_SCHEMA_EVOLUTION = TRUE
CLUSTER BY
(
timestamp_ms
);

// Define the object stage
CREATE OR REPLACE STAGE objects_parquet_stage
URL = '&{checkpoints_bucket}/objects/'
STORAGE_INTEGRATION = checkpoints_data_loader
FILE_FORMAT = parquet_format;

// Set up the checkpoint auto ingestion pipe
CREATE
OR REPLACE PIPE object_pipe
AUTO_INGEST = true
INTEGRATION = 'CHECKPOINTS_DATA_LOADER_NOTIFICATION'
AS
copy into OBJECT (object_id, version, digest, type, checkpoint, epoch, timestamp_ms, owner_type, owner_address,
object_status, initial_shared_version, previous_transaction, has_public_transfer,
storage_rebate, bcs, coin_type, coin_balance, struct_tag, object_json)
from (SELECT t.$1:object_id as object_id,
t.$1:version as version,
t.$1:digest as digest,
t.$1:type_ as type,
t.$1:checkpoint as checkpoint,
t.$1:epoch as epoch,
t.$1:timestamp_ms as timestamp_ms,
t.$1:owner_type as owner_type,
t.$1:owner_address as owner_address,
t.$1:object_status as object_status,
t.$1:initial_shared_version as initial_shared_version,
t.$1:previous_transaction as previous_transaction,
t.$1:has_public_transfer as has_public_transfer,
t.$1:storage_rebate as storage_rebate,
t.$1:bcs as bcs,
t.$1:coin_type as coin_type,
t.$1:coin_balance as coin_balance,
t.$1:struct_tag as struct_tag,
parse_json(t.$1:object_json) as object_json
from @objects_parquet_stage (file_format => 'parquet_format', pattern => '.*[.]parquet') t)
file_format = parquet_format;
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
CREATE OR REPLACE TABLE MOVE_PACKAGE
(
package_id STRING NOT NULL,
checkpoint NUMBER(20, 0) NOT NULL,
epoch NUMBER(20, 0) NOT NULL,
timestamp_ms NUMBER(20, 0) NOT NULL,
bcs STRING NOT NULL,
transaction_digest STRING
) STAGE_FILE_FORMAT = parquet_format
STAGE_COPY_OPTIONS =
(
ABORT_STATEMENT
)
ENABLE_SCHEMA_EVOLUTION = TRUE
CLUSTER BY
(
timestamp_ms
);

// Define the object stage
CREATE OR REPLACE STAGE packages_parquet_stage
URL = '&{checkpoints_bucket}/move_package/'
STORAGE_INTEGRATION = checkpoints_data_loader
FILE_FORMAT = parquet_format;

// Set up the checkpoint auto ingestion pipe
CREATE
OR REPLACE PIPE package_pipe
AUTO_INGEST = true
INTEGRATION = 'CHECKPOINTS_DATA_LOADER_NOTIFICATION'
AS
copy into MOVE_PACKAGE (package_id, checkpoint, epoch, timestamp_ms, bcs, transaction_digest)
from (SELECT t.$1:package_id as package_id,
t.$1:checkpoint as checkpoint,
t.$1:epoch as epoch,
t.$1:timestamp_ms as timestamp_ms,
t.$1:bcs as bcs,
t.$1:transaction_digest as transaction_digest
from @packages_parquet_stage (file_format => 'parquet_format', pattern => '.*[.]parquet') t)
file_format = parquet_format;
19 changes: 19 additions & 0 deletions crates/sui-analytics-indexer/src/store/snowflake/schemas/setup.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// set checkpoints_bucket as a variable or pass it in the command line
// This sets up an external stage backed by a gcs bucket used for reading checkpoint data from
CREATE STORAGE INTEGRATION checkpoints_data_loader
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = GCS
ENABLED = TRUE
STORAGE_ALLOWED_LOCATIONS = ('&{checkpoints_bucket}/checkpoints', '&{checkpoints_bucket}/events','&{checkpoints_bucket}/move_call.sql','&{checkpoints_bucket}/move_package','&{checkpoints_bucket}/objects','&{checkpoints_bucket}/transaction_objects','&{checkpoints_bucket}/transactions');

// This sets up pubsub_subscription_id as the pubsub topic subscriber id
CREATE NOTIFICATION INTEGRATION checkpoints_data_loader_notification
TYPE = QUEUE
NOTIFICATION_PROVIDER = GCP_PUBSUB
ENABLED = true
GCP_PUBSUB_SUBSCRIPTION_NAME = '&{pubsub_subscription_id}';

// This sets up the parquet file format used for all queries
CREATE OR REPLACE File Format parquet_format
TYPE = parquet
SNAPPY_COMPRESSION = TRUE;
Loading

0 comments on commit 49e089d

Please sign in to comment.