From 71fbe844f67905f7c32e69ca8c4b351290bd13be Mon Sep 17 00:00:00 2001 From: HH Date: Wed, 13 Sep 2023 17:04:57 +0800 Subject: [PATCH 1/4] Revise the doc: quering data in Snowflake Signed-off-by: HH --- examples/snowflake_plugin/snowflake_plugin/snowflake.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/examples/snowflake_plugin/snowflake_plugin/snowflake.py b/examples/snowflake_plugin/snowflake_plugin/snowflake.py index 4a466b3f0..7a2e77eb7 100644 --- a/examples/snowflake_plugin/snowflake_plugin/snowflake.py +++ b/examples/snowflake_plugin/snowflake_plugin/snowflake.py @@ -11,6 +11,7 @@ # %% [markdown] # Instantiate a {py:class}`~flytekitplugins.snowflake.SnowflakeTask` to execute a query. # Incorporate {py:class}`~flytekitplugins.snowflake.SnowflakeConfig` within the task to define the appropriate configuration. +# USERNAME is the account you login the snowflake website. # %% snowflake_task_no_io = SnowflakeTask( name="sql.snowflake.no_io", @@ -22,6 +23,8 @@ database="SNOWFLAKE_SAMPLE_DATA", schema="TPCH_SF1000", warehouse="COMPUTE_WH", + table="", + user="", ), ) @@ -58,6 +61,7 @@ # # Let us explore how we can parameterize our query to filter results for a specific country. # This country will be provided as user input, using a nation key to specify it. +# USERNAME is the account you login the snowflake website. # %% snowflake_task_templatized_query = SnowflakeTask( name="sql.snowflake.w_io", @@ -68,8 +72,10 @@ database="SNOWFLAKE_SAMPLE_DATA", schema="TPCH_SF1000", warehouse="COMPUTE_WH", + table="", + user="", ), - query_template="SELECT * from CUSTOMER where C_NATIONKEY = {{ .inputs.nation_key }} limit 100", + query_template="SELECT * from CUSTOMER where C_NATIONKEY = %(nation_key)s limit 100", ) From 306a33a71efed99458846293113413493142c050 Mon Sep 17 00:00:00 2001 From: HH Date: Wed, 13 Sep 2023 17:35:06 +0800 Subject: [PATCH 2/4] Revise the doc: Add snowflake uri decompose in structure dataset Signed-off-by: HH --- .../data_types_and_io/structured_dataset.py | 53 ++++++++++++++++--- 1 file changed, 45 insertions(+), 8 deletions(-) diff --git a/examples/data_types_and_io/data_types_and_io/structured_dataset.py b/examples/data_types_and_io/data_types_and_io/structured_dataset.py index 85db71718..02e04716b 100644 --- a/examples/data_types_and_io/data_types_and_io/structured_dataset.py +++ b/examples/data_types_and_io/data_types_and_io/structured_dataset.py @@ -9,7 +9,7 @@ # # Structured dataset is a superset of Flyte Schema. # -# The `StructuredDataset` Transformer can write a dataframe to BigQuery, s3, or any storage by registering new structured dataset encoder and decoder. +# The `StructuredDataset` Transformer can write a dataframe to BigQuery, s3, Snowflake, or any storage by registering new structured dataset encoder and decoder. # # Flytekit makes it possible to return or accept a {py:class}`pandas.DataFrame` which is automatically # converted into Flyte's abstract representation of a structured dataset object. @@ -81,17 +81,18 @@ def get_schema_df(a: int) -> FlyteSchema[superset_cols]: def get_subset_df(df: Annotated[StructuredDataset, subset_cols]) -> Annotated[StructuredDataset, subset_cols]: df = df.open(pd.DataFrame).all() df = pd.concat([df, pd.DataFrame([[30]], columns=["Age"])]) - # On specifying BigQuery uri for StructuredDataset, flytekit writes a pandas dataframe to a BigQuery table + # When specifying a BigQuery or Snowflake URI for a StructuredDataset, flytekit exports a Pandas DataFrame to a table in BigQuery or Snowflake. return StructuredDataset(dataframe=df) # %% [markdown] # ## StructuredDataset with `uri` Argument # -# BigQuery `uri` allows you to load and retrieve data from cloud using the `uri`. The `uri` comprises of the bucket name and the filename prefixed with `gs://`. -# If you specify BigQuery `uri` for StructuredDataset, BigQuery creates a table in the location specified by the `uri`. -# The `uri` in StructuredDataset reads from or writes to S3, GCP, BigQuery, or any storage. -# Let's understand how to convert a pandas DataFrame to a BigQuery table and vice-versa through an example. +# Both Snowflake and BigQuery `uri` allows you to load and retrieve data from cloud using the `uri`. +# The `uri` comprises of the bucket name and the filename prefixed with `bq://` for BigQuery and `snowflake://` for Snowflake. +# If you specify in either BigQuery or Snowflake `uri` for StructuredDataset, it will create a table in the location specified by the `uri`. +# The `uri` in StructuredDataset reads from or writes to S3, GCP, BigQuery, Snowflake or any storage. +# Let's understand how to convert a pandas DataFrame to a BigQuery or Snowflake table and vice-versa through an example. # # Before writing DataFrame to a BigQuery table, # @@ -99,6 +100,15 @@ def get_subset_df(df: Annotated[StructuredDataset, subset_cols]) -> Annotated[St # 2. Create a project and add the `GOOGLE_APPLICATION_CREDENTIALS` environment variable to your .bashrc file. # 3. Create a dataset in your project. +# Before writing DataFrame to a Snowflake table, +# +# 1. Create a [Snowflake account](https://signup.snowflake.com/) and create a service account. +# 2. Create a dataset in your project. +# 3. Use [Key Pair Authentication](https://docs.snowflake.com/en/user-guide/key-pair-auth) to setup the connections with Snowflake. +# 4. run the following command to setup the secret +# ```bash +# kubectl create secret generic snowflake --namespace=flyte --from-literal=private_key={your_private_key_above} +# ``` # %% [markdown] # Import the dependencies. # %% @@ -118,6 +128,17 @@ def pandas_to_bq() -> StructuredDataset: return StructuredDataset(dataframe=df, uri="bq://sample-project-1-352610.sample_352610.test1") +# %% [markdown] +# Define a task that converts a pandas DataFrame to a Snowflake table. +# %% +@task +def pandas_to_sf() -> StructuredDataset: + # create a pandas dataframe + df = pd.DataFrame({"Name": ["Tom", "Joseph"], "Age": [20, 22]}) + # convert the dataframe to StructuredDataset + return StructuredDataset(dataframe=df, uri="snowflake://:////") + + # %% [markdown] # :::{note} # The BigQuery uri's format is `bq://..`. @@ -131,6 +152,19 @@ def bq_to_pandas(sd: StructuredDataset) -> pd.DataFrame: # convert to pandas dataframe return sd.open(pd.DataFrame).all() +# %% [markdown] +# :::{note} +# The Snowflake uri's format is `snowflake://:////
`. +# ::: + +# %% [markdown] +# Define a task that converts the Snowflake table to a pandas DataFrame. +# %% +@task +def sf_to_pandas(sd: StructuredDataset) -> pd.DataFrame: + # convert to pandas dataframe + return sd.open(pd.DataFrame).all() + # %% [markdown] # :::{note} @@ -141,8 +175,11 @@ def bq_to_pandas(sd: StructuredDataset) -> pd.DataFrame: # Trigger the tasks locally. # %% if __name__ == "__main__": - o1 = bq_to_pandas(sd=StructuredDataset(uri="bq://sample-project-1-352610.sample_352610.test1")) - o2 = pandas_to_bq() + obj_bq_1 = bq_to_pandas(sd=StructuredDataset(uri="bq://sample-project-1-352610.sample_352610.test1")) + obj_bq_2 = pandas_to_bq() + + obj_sf_1 = sf_to_pandas(sd=StructuredDataset(uri="snowflake://:////
")) + obj_sf_2 = pandas_to_sf() # %% [markdown] From f62da5c7219a428e8894ca74f241b624c46c6d24 Mon Sep 17 00:00:00 2001 From: HH Date: Sat, 30 Sep 2023 00:13:24 +0800 Subject: [PATCH 3/4] Fix lint and update uri Signed-off-by: HH --- .../data_types_and_io/structured_dataset.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/examples/data_types_and_io/data_types_and_io/structured_dataset.py b/examples/data_types_and_io/data_types_and_io/structured_dataset.py index 02e04716b..2a6e38491 100644 --- a/examples/data_types_and_io/data_types_and_io/structured_dataset.py +++ b/examples/data_types_and_io/data_types_and_io/structured_dataset.py @@ -107,7 +107,7 @@ def get_subset_df(df: Annotated[StructuredDataset, subset_cols]) -> Annotated[St # 3. Use [Key Pair Authentication](https://docs.snowflake.com/en/user-guide/key-pair-auth) to setup the connections with Snowflake. # 4. run the following command to setup the secret # ```bash -# kubectl create secret generic snowflake --namespace=flyte --from-literal=private_key={your_private_key_above} +# kubectl create secret generic snowflake --namespace=flytesnacks-development --from-file=private_key.pem={your_private_key_above} # ``` # %% [markdown] # Import the dependencies. @@ -136,7 +136,9 @@ def pandas_to_sf() -> StructuredDataset: # create a pandas dataframe df = pd.DataFrame({"Name": ["Tom", "Joseph"], "Age": [20, 22]}) # convert the dataframe to StructuredDataset - return StructuredDataset(dataframe=df, uri="snowflake://:////
") + return StructuredDataset( + dataframe=df, uri="snowflake://:////
" + ) # %% [markdown] @@ -152,9 +154,10 @@ def bq_to_pandas(sd: StructuredDataset) -> pd.DataFrame: # convert to pandas dataframe return sd.open(pd.DataFrame).all() + # %% [markdown] # :::{note} -# The Snowflake uri's format is `snowflake://:////
`. +# The Snowflake uri's format is `snowflake://:////
`. # ::: # %% [markdown] @@ -178,7 +181,9 @@ def sf_to_pandas(sd: StructuredDataset) -> pd.DataFrame: obj_bq_1 = bq_to_pandas(sd=StructuredDataset(uri="bq://sample-project-1-352610.sample_352610.test1")) obj_bq_2 = pandas_to_bq() - obj_sf_1 = sf_to_pandas(sd=StructuredDataset(uri="snowflake://:////
")) + obj_sf_1 = sf_to_pandas( + sd=StructuredDataset(uri="snowflake://:////
") + ) obj_sf_2 = pandas_to_sf() From c4fe9ce8f0fd34eb88710aec768045a5fab51185 Mon Sep 17 00:00:00 2001 From: HH Date: Sat, 30 Sep 2023 23:35:48 +0800 Subject: [PATCH 4/4] remove .pem Signed-off-by: HH --- .../data_types_and_io/data_types_and_io/structured_dataset.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/data_types_and_io/data_types_and_io/structured_dataset.py b/examples/data_types_and_io/data_types_and_io/structured_dataset.py index 2a6e38491..86517e47b 100644 --- a/examples/data_types_and_io/data_types_and_io/structured_dataset.py +++ b/examples/data_types_and_io/data_types_and_io/structured_dataset.py @@ -107,7 +107,7 @@ def get_subset_df(df: Annotated[StructuredDataset, subset_cols]) -> Annotated[St # 3. Use [Key Pair Authentication](https://docs.snowflake.com/en/user-guide/key-pair-auth) to setup the connections with Snowflake. # 4. run the following command to setup the secret # ```bash -# kubectl create secret generic snowflake --namespace=flytesnacks-development --from-file=private_key.pem={your_private_key_above} +# kubectl create secret generic snowflake --namespace=flytesnacks-development --from-file=private_key={your_private_key_above} # ``` # %% [markdown] # Import the dependencies.