Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revise snowflake plugins doc #1127

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Revise the doc: Add snowflake uri decompose in structure dataset
Signed-off-by: HH <[email protected]>
  • Loading branch information
hhcs9527 committed Sep 29, 2023
commit 306a33a71efed99458846293113413493142c050
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
#
# Structured dataset is a superset of Flyte Schema.
#
# The `StructuredDataset` Transformer can write a dataframe to BigQuery, s3, or any storage by registering new structured dataset encoder and decoder.
# The `StructuredDataset` Transformer can write a dataframe to BigQuery, s3, Snowflake, or any storage by registering new structured dataset encoder and decoder.
#
# Flytekit makes it possible to return or accept a {py:class}`pandas.DataFrame` which is automatically
# converted into Flyte's abstract representation of a structured dataset object.
Expand Down Expand Up @@ -81,24 +81,34 @@ def get_schema_df(a: int) -> FlyteSchema[superset_cols]:
def get_subset_df(df: Annotated[StructuredDataset, subset_cols]) -> Annotated[StructuredDataset, subset_cols]:
df = df.open(pd.DataFrame).all()
df = pd.concat([df, pd.DataFrame([[30]], columns=["Age"])])
# On specifying BigQuery uri for StructuredDataset, flytekit writes a pandas dataframe to a BigQuery table
# When specifying a BigQuery or Snowflake URI for a StructuredDataset, flytekit exports a Pandas DataFrame to a table in BigQuery or Snowflake.
return StructuredDataset(dataframe=df)


# %% [markdown]
# ## StructuredDataset with `uri` Argument
#
# BigQuery `uri` allows you to load and retrieve data from cloud using the `uri`. The `uri` comprises of the bucket name and the filename prefixed with `gs://`.
# If you specify BigQuery `uri` for StructuredDataset, BigQuery creates a table in the location specified by the `uri`.
# The `uri` in StructuredDataset reads from or writes to S3, GCP, BigQuery, or any storage.
# Let's understand how to convert a pandas DataFrame to a BigQuery table and vice-versa through an example.
# Both Snowflake and BigQuery `uri` allows you to load and retrieve data from cloud using the `uri`.
# The `uri` comprises of the bucket name and the filename prefixed with `bq://` for BigQuery and `snowflake://` for Snowflake.
# If you specify in either BigQuery or Snowflake `uri` for StructuredDataset, it will create a table in the location specified by the `uri`.
# The `uri` in StructuredDataset reads from or writes to S3, GCP, BigQuery, Snowflake or any storage.
# Let's understand how to convert a pandas DataFrame to a BigQuery or Snowflake table and vice-versa through an example.
#
# Before writing DataFrame to a BigQuery table,
#
# 1. Create a [GCP account](https://cloud.google.com/docs/authentication/getting-started) and create a service account.
# 2. Create a project and add the `GOOGLE_APPLICATION_CREDENTIALS` environment variable to your .bashrc file.
# 3. Create a dataset in your project.

# Before writing DataFrame to a Snowflake table,
#
# 1. Create a [Snowflake account](https://signup.snowflake.com/) and create a service account.
# 2. Create a dataset in your project.
# 3. Use [Key Pair Authentication](https://docs.snowflake.com/en/user-guide/key-pair-auth) to setup the connections with Snowflake.
# 4. run the following command to setup the secret
# ```bash
# kubectl create secret generic snowflake --namespace=flyte --from-literal=private_key={your_private_key_above}
# ```
# %% [markdown]
# Import the dependencies.
# %%
Expand All @@ -118,6 +128,17 @@ def pandas_to_bq() -> StructuredDataset:
return StructuredDataset(dataframe=df, uri="bq://sample-project-1-352610.sample_352610.test1")


# %% [markdown]
# Define a task that converts a pandas DataFrame to a Snowflake table.
# %%
@task
def pandas_to_sf() -> StructuredDataset:
# create a pandas dataframe
df = pd.DataFrame({"Name": ["Tom", "Joseph"], "Age": [20, 22]})
# convert the dataframe to StructuredDataset
return StructuredDataset(dataframe=df, uri="snowflake://<user>:<your_account>/<database>/<schema>/<warehouse>/<table>")


# %% [markdown]
# :::{note}
# The BigQuery uri's format is `bq://<project_name>.<dataset_name>.<table_name>`.
Expand All @@ -131,6 +152,19 @@ def bq_to_pandas(sd: StructuredDataset) -> pd.DataFrame:
# convert to pandas dataframe
return sd.open(pd.DataFrame).all()

# %% [markdown]
# :::{note}
# The Snowflake uri's format is `snowflake://<user>:<your_account>/<database>/<schema>/<warehouse>/<table>`.
# :::

# %% [markdown]
# Define a task that converts the Snowflake table to a pandas DataFrame.
# %%
@task
def sf_to_pandas(sd: StructuredDataset) -> pd.DataFrame:
# convert to pandas dataframe
return sd.open(pd.DataFrame).all()


# %% [markdown]
# :::{note}
Expand All @@ -141,8 +175,11 @@ def bq_to_pandas(sd: StructuredDataset) -> pd.DataFrame:
# Trigger the tasks locally.
# %%
if __name__ == "__main__":
o1 = bq_to_pandas(sd=StructuredDataset(uri="bq://sample-project-1-352610.sample_352610.test1"))
o2 = pandas_to_bq()
obj_bq_1 = bq_to_pandas(sd=StructuredDataset(uri="bq://sample-project-1-352610.sample_352610.test1"))
obj_bq_2 = pandas_to_bq()

obj_sf_1 = sf_to_pandas(sd=StructuredDataset(uri="snowflake://<user>:<your_account>/<database>/<schema>/<warehouse>/<table>"))
obj_sf_2 = pandas_to_sf()


# %% [markdown]
Expand Down