Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEATURE] ColumnValuesNonNull and ColumnValuesNonNullCount metrics #10959

Open
wants to merge 74 commits into
base: develop
Choose a base branch
from

Conversation

NathanFarmer
Copy link
Contributor

@NathanFarmer NathanFarmer commented Feb 21, 2025

  • Implement ColumnValuesNonNull and ColumnValuesNonNullCount metrics
  • Add special case for return type of Metrics with names that end in condition so they don't also return domain and value kwargs
  • Use new backend-specific testing pattern established by Expectation integration testing framework
  • Move integration test files into tests/integration/metrics like Expectation integration testing framework

  • Description of PR changes above includes a link to an existing GitHub issue
  • PR title is prefixed with one of: [BUGFIX], [FEATURE], [DOCS], [MAINTENANCE], [CONTRIB]
  • Code is linted - run invoke lint (uses ruff format + ruff check)
  • Appropriate tests and docs have been updated

NathanFarmer and others added 30 commits February 18, 2025 21:33
…ctations/great_expectations into f/gx-40/batch-compute-metrics
Copy link

codecov bot commented Feb 21, 2025

❌ 3476 Tests Failed:

Tests completed Failed Passed Skipped
31711 3476 28235 9787
View the top 3 failed test(s) by shortest run time
tests.integration.data_sources_and_expectations.expectations.test_expect_column_distinct_values_to_equal_set::test_success_complete_results[sqlite]
Stack Traces | 0.013s run time
self = <sqlalchemy.engine.base.Connection object at 0x7fe54c1cf250>
dialect = <sqlalchemy.dialects.sqlite.pysqlite.SQLiteDialect_pysqlite object at 0x7fe54c1cf490>
constructor = <bound method DefaultExecutionContext._init_compiled of <class 'sqlalchemy.dialects.sqlite.base.SQLiteExecutionContext'>>
statement = 'SELECT 1 \nFROM expectation_test_table_yghescjygg\n LIMIT ? OFFSET ?'
parameters = (1, 0), execution_options = immutabledict({})
args = (<sqlalchemy.dialects.sqlite.base.SQLiteCompiler object at 0x7fe54c1cd390>, [], <sqlalchemy.sql.selectable.Select object at 0x7fe54c1cd0d0>, [_OffsetLimitParam('%(140622801198928 param)s', 1, type_=Integer())])
kw = {'cache_hit': symbol('CACHE_MISS')}
branched = <sqlalchemy.engine.base.Connection object at 0x7fe54c1cf250>
conn = <sqlalchemy.pool.base._ConnectionFairy object at 0x7fe54c1cef90>
context = <sqlalchemy.dialects.sqlite.base.SQLiteExecutionContext object at 0x7fe54c1cdfd0>
cursor = <sqlite3.Cursor object at 0x7fe54790cbc0>, evt_handled = False

    def _execute_context(
        self,
        dialect,
        constructor,
        statement,
        parameters,
        execution_options,
        *args,
        **kw
    ):
        """Create an :class:`.ExecutionContext` and execute, returning
        a :class:`_engine.CursorResult`."""
    
        branched = self
        if self.__branch_from:
            # if this is a "branched" connection, do everything in terms
            # of the "root" connection, *except* for .close(), which is
            # the only feature that branching provides
            self = self.__branch_from
    
        if execution_options:
            yp = execution_options.get("yield_per", None)
            if yp:
                execution_options = execution_options.union(
                    {"stream_results": True, "max_row_buffer": yp}
                )
    
        try:
            conn = self._dbapi_connection
            if conn is None:
                conn = self._revalidate_connection()
    
            context = constructor(
                dialect, self, conn, execution_options, *args, **kw
            )
        except (exc.PendingRollbackError, exc.ResourceClosedError):
            raise
        except BaseException as e:
            self._handle_dbapi_exception(
                e, util.text_type(statement), parameters, None, None
            )
    
        if (
            self._transaction
            and not self._transaction.is_active
            or (
                self._nested_transaction
                and not self._nested_transaction.is_active
            )
        ):
            self._invalid_transaction()
    
        elif self._trans_context_manager:
            TransactionalContext._trans_ctx_check(self)
    
        if self._is_future and self._transaction is None:
            self._autobegin()
    
        context.pre_exec()
    
        if dialect.use_setinputsizes:
            context._set_input_sizes()
    
        cursor, statement, parameters = (
            context.cursor,
            context.statement,
            context.parameters,
        )
    
        if not context.executemany:
            parameters = parameters[0]
    
        if self._has_events or self.engine._has_events:
            for fn in self.dispatch.before_cursor_execute:
                statement, parameters = fn(
                    self,
                    cursor,
                    statement,
                    parameters,
                    context,
                    context.executemany,
                )
    
        if self._echo:
    
            self._log_info(statement)
    
            stats = context._get_cache_stats()
    
            if not self.engine.hide_parameters:
                self._log_info(
                    "[%s] %r",
                    stats,
                    sql_util._repr_params(
                        parameters, batches=10, ismulti=context.executemany
                    ),
                )
            else:
                self._log_info(
                    "[%s] [SQL parameters hidden due to hide_parameters=True]"
                    % (stats,)
                )
    
        evt_handled = False
        try:
            if context.executemany:
                if self.dialect._has_events:
                    for fn in self.dialect.dispatch.do_executemany:
                        if fn(cursor, statement, parameters, context):
                            evt_handled = True
                            break
                if not evt_handled:
                    self.dialect.do_executemany(
                        cursor, statement, parameters, context
                    )
            elif not parameters and context.no_parameters:
                if self.dialect._has_events:
                    for fn in self.dialect.dispatch.do_execute_no_params:
                        if fn(cursor, statement, context):
                            evt_handled = True
                            break
                if not evt_handled:
                    self.dialect.do_execute_no_params(
                        cursor, statement, context
                    )
            else:
                if self.dialect._has_events:
                    for fn in self.dialect.dispatch.do_execute:
                        if fn(cursor, statement, parameters, context):
                            evt_handled = True
                            break
                if not evt_handled:
>                   self.dialect.do_execute(
                        cursor, statement, parameters, context
                    )

.../hostedtoolcache/Python/3.11.11................................./x64/lib/python3.11.../sqlalchemy/engine/base.py:1910: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <sqlalchemy.dialects.sqlite.pysqlite.SQLiteDialect_pysqlite object at 0x7fe54c1cf490>
cursor = <sqlite3.Cursor object at 0x7fe54790cbc0>
statement = 'SELECT 1 \nFROM expectation_test_table_yghescjygg\n LIMIT ? OFFSET ?'
parameters = (1, 0)
context = <sqlalchemy.dialects.sqlite.base.SQLiteExecutionContext object at 0x7fe54c1cdfd0>

    def do_execute(self, cursor, statement, parameters, context=None):
>       cursor.execute(statement, parameters)
E       sqlite3.OperationalError: no such table: expectation_test_table_yghescjygg

.../hostedtoolcache/Python/3.11.11................................./x64/lib/python3.11.../sqlalchemy/engine/default.py:736: OperationalError

The above exception was the direct cause of the following exception:

self = SqliteTableAsset(name='gkhrkmrvea', type='table', id=None, order_by=[], batch_metadata={}, batch_definitions=[], table_name='expectation_test_table_yghescjygg', schema_name=None)

    @override
    def test_connection(self) -> None:
        """Test the connection for the TableAsset.
    
        Raises:
            TestConnectionError: If the connection test fails.
        """
        datasource: SQLDatasource = self.datasource
        engine: sqlalchemy.Engine = datasource.get_engine()
        inspector: sqlalchemy.Inspector = sa.inspect(engine)
    
        if self.schema_name and self.schema_name not in inspector.get_schema_names():
            raise TestConnectionError(  # noqa: TRY003 # FIXME CoP
                f'Attempt to connect to table: "{self.qualified_name}" failed because the schema '
                f'"{self.schema_name}" does not exist.'
            )
    
        try:
            with engine.connect() as connection:
                table = sa.table(self.table_name, schema=self.schema_name)
                # don't need to fetch any data, just want to make sure the table is accessible
>               connection.execute(sa.select(1, table).limit(1))

.../datasource/fluent/sql_datasource.py:1068: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.../hostedtoolcache/Python/3.11.11................................./x64/lib/python3.11.../sqlalchemy/engine/base.py:1385: in execute
    return meth(self, multiparams, params, _EMPTY_EXECUTION_OPTS)
.../hostedtoolcache/Python/3.11.11................................./x64/lib/python3.11.../sqlalchemy/sql/elements.py:334: in _execute_on_connection
    return connection._execute_clauseelement(
.../hostedtoolcache/Python/3.11.11................................./x64/lib/python3.11.../sqlalchemy/engine/base.py:1577: in _execute_clauseelement
    ret = self._execute_context(
.../hostedtoolcache/Python/3.11.11................................./x64/lib/python3.11.../sqlalchemy/engine/base.py:1953: in _execute_context
    self._handle_dbapi_exception(
.../hostedtoolcache/Python/3.11.11................................./x64/lib/python3.11.../sqlalchemy/engine/base.py:2134: in _handle_dbapi_exception
    util.raise_(
.../hostedtoolcache/Python/3.11.11................................./x64/lib/python3.11.../sqlalchemy/util/compat.py:211: in raise_
    raise exception
.../hostedtoolcache/Python/3.11.11................................./x64/lib/python3.11.../sqlalchemy/engine/base.py:1910: in _execute_context
    self.dialect.do_execute(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <sqlalchemy.dialects.sqlite.pysqlite.SQLiteDialect_pysqlite object at 0x7fe54c1cf490>
cursor = <sqlite3.Cursor object at 0x7fe54790cbc0>
statement = 'SELECT 1 \nFROM expectation_test_table_yghescjygg\n LIMIT ? OFFSET ?'
parameters = (1, 0)
context = <sqlalchemy.dialects.sqlite.base.SQLiteExecutionContext object at 0x7fe54c1cdfd0>

    def do_execute(self, cursor, statement, parameters, context=None):
>       cursor.execute(statement, parameters)
E       sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) no such table: expectation_test_table_yghescjygg
E       [SQL: SELECT 1 
E       FROM expectation_test_table_yghescjygg
E        LIMIT ? OFFSET ?]
E       [parameters: (1, 0)]
E       (Background on this error at: https://sqlalche..../e/14/e3q8)

.../hostedtoolcache/Python/3.11.11................................./x64/lib/python3.11.../sqlalchemy/engine/default.py:736: OperationalError

The above exception was the direct cause of the following exception:

>       lambda: ihook(item=item, **kwds), when=when, reraise=reraise
    )

.../hostedtoolcache/Python/3.11.11................................./x64/lib/python3.11.../site-packages/flaky/flaky_pytest_plugin.py:146: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
tests/integration/conftest.py:165: in batch_for_datasource
    yield _batch_setup_for_datasource.make_batch()
.../test_utils/data_source_config/sql.py:89: in make_batch
    self.make_asset()
.../test_utils/data_source_config/sqlite.py:77: in make_asset
    ).add_table_asset(name=self._random_resource_name(), table_name=self.table_name)
.../datasource/fluent/sqlite_datasource.py:186: in add_table_asset
    super().add_table_asset(
.../datasource/fluent/sql_datasource.py:1313: in add_table_asset
    return self._add_asset(asset)
.../datasource/fluent/interfaces.py:879: in _add_asset
    asset.test_connection()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = SqliteTableAsset(name='gkhrkmrvea', type='table', id=None, order_by=[], batch_metadata={}, batch_definitions=[], table_name='expectation_test_table_yghescjygg', schema_name=None)

    @override
    def test_connection(self) -> None:
        """Test the connection for the TableAsset.
    
        Raises:
            TestConnectionError: If the connection test fails.
        """
        datasource: SQLDatasource = self.datasource
        engine: sqlalchemy.Engine = datasource.get_engine()
        inspector: sqlalchemy.Inspector = sa.inspect(engine)
    
        if self.schema_name and self.schema_name not in inspector.get_schema_names():
            raise TestConnectionError(  # noqa: TRY003 # FIXME CoP
                f'Attempt to connect to table: "{self.qualified_name}" failed because the schema '
                f'"{self.schema_name}" does not exist.'
            )
    
        try:
            with engine.connect() as connection:
                table = sa.table(self.table_name, schema=self.schema_name)
                # don't need to fetch any data, just want to make sure the table is accessible
                connection.execute(sa.select(1, table).limit(1))
        except Exception as query_error:
            LOGGER.info(f"{self.name} `.test_connection()` query failed: {query_error!r}")
>           raise TestConnectionError(  # noqa: TRY003 # FIXME CoP
                f"Attempt to connect to table: {self.qualified_name} failed because the test query "
                f"failed. Ensure the table exists and the user has access to select data from the table: {query_error}"  # noqa: E501 # FIXME CoP
            ) from query_error
E           great_expectations.datasource.fluent.interfaces.TestConnectionError: Attempt to connect to table: expectation_test_table_yghescjygg failed because the test query failed. Ensure the table exists and the user has access to select data from the table: (sqlite3.OperationalError) no such table: expectation_test_table_yghescjygg
E           [SQL: SELECT 1 
E           FROM expectation_test_table_yghescjygg
E            LIMIT ? OFFSET ?]
E           [parameters: (1, 0)]
E           (Background on this error at: https://sqlalche..../e/14/e3q8)

.../datasource/fluent/sql_datasource.py:1071: TestConnectionError
tests.integration.data_sources_and_expectations.expectations.test_expect_column_values_to_not_match_like_pattern_list.TestNormalSql::test_failure[sqlite-one_pattern]
Stack Traces | 0.013s run time
self = <sqlalchemy.engine.base.Connection object at 0x7fe546d85b10>
dialect = <sqlalchemy.dialects.sqlite.pysqlite.SQLiteDialect_pysqlite object at 0x7fe54798c910>
constructor = <bound method DefaultExecutionContext._init_compiled of <class 'sqlalchemy.dialects.sqlite.base.SQLiteExecutionContext'>>
statement = 'SELECT 1 \nFROM expectation_test_table_nnwdibkvmn\n LIMIT ? OFFSET ?'
parameters = (1, 0), execution_options = immutabledict({})
args = (<sqlalchemy.dialects.sqlite.base.SQLiteCompiler object at 0x7fe54794f150>, [], <sqlalchemy.sql.selectable.Select object at 0x7fe54794cfd0>, [_OffsetLimitParam('%(140622725170448 param)s', 1, type_=Integer())])
kw = {'cache_hit': symbol('CACHE_MISS')}
branched = <sqlalchemy.engine.base.Connection object at 0x7fe546d85b10>
conn = <sqlalchemy.pool.base._ConnectionFairy object at 0x7fe546d85c90>
context = <sqlalchemy.dialects.sqlite.base.SQLiteExecutionContext object at 0x7fe54794c050>
cursor = <sqlite3.Cursor object at 0x7fe54726eec0>, evt_handled = False

    def _execute_context(
        self,
        dialect,
        constructor,
        statement,
        parameters,
        execution_options,
        *args,
        **kw
    ):
        """Create an :class:`.ExecutionContext` and execute, returning
        a :class:`_engine.CursorResult`."""
    
        branched = self
        if self.__branch_from:
            # if this is a "branched" connection, do everything in terms
            # of the "root" connection, *except* for .close(), which is
            # the only feature that branching provides
            self = self.__branch_from
    
        if execution_options:
            yp = execution_options.get("yield_per", None)
            if yp:
                execution_options = execution_options.union(
                    {"stream_results": True, "max_row_buffer": yp}
                )
    
        try:
            conn = self._dbapi_connection
            if conn is None:
                conn = self._revalidate_connection()
    
            context = constructor(
                dialect, self, conn, execution_options, *args, **kw
            )
        except (exc.PendingRollbackError, exc.ResourceClosedError):
            raise
        except BaseException as e:
            self._handle_dbapi_exception(
                e, util.text_type(statement), parameters, None, None
            )
    
        if (
            self._transaction
            and not self._transaction.is_active
            or (
                self._nested_transaction
                and not self._nested_transaction.is_active
            )
        ):
            self._invalid_transaction()
    
        elif self._trans_context_manager:
            TransactionalContext._trans_ctx_check(self)
    
        if self._is_future and self._transaction is None:
            self._autobegin()
    
        context.pre_exec()
    
        if dialect.use_setinputsizes:
            context._set_input_sizes()
    
        cursor, statement, parameters = (
            context.cursor,
            context.statement,
            context.parameters,
        )
    
        if not context.executemany:
            parameters = parameters[0]
    
        if self._has_events or self.engine._has_events:
            for fn in self.dispatch.before_cursor_execute:
                statement, parameters = fn(
                    self,
                    cursor,
                    statement,
                    parameters,
                    context,
                    context.executemany,
                )
    
        if self._echo:
    
            self._log_info(statement)
    
            stats = context._get_cache_stats()
    
            if not self.engine.hide_parameters:
                self._log_info(
                    "[%s] %r",
                    stats,
                    sql_util._repr_params(
                        parameters, batches=10, ismulti=context.executemany
                    ),
                )
            else:
                self._log_info(
                    "[%s] [SQL parameters hidden due to hide_parameters=True]"
                    % (stats,)
                )
    
        evt_handled = False
        try:
            if context.executemany:
                if self.dialect._has_events:
                    for fn in self.dialect.dispatch.do_executemany:
                        if fn(cursor, statement, parameters, context):
                            evt_handled = True
                            break
                if not evt_handled:
                    self.dialect.do_executemany(
                        cursor, statement, parameters, context
                    )
            elif not parameters and context.no_parameters:
                if self.dialect._has_events:
                    for fn in self.dialect.dispatch.do_execute_no_params:
                        if fn(cursor, statement, context):
                            evt_handled = True
                            break
                if not evt_handled:
                    self.dialect.do_execute_no_params(
                        cursor, statement, context
                    )
            else:
                if self.dialect._has_events:
                    for fn in self.dialect.dispatch.do_execute:
                        if fn(cursor, statement, parameters, context):
                            evt_handled = True
                            break
                if not evt_handled:
>                   self.dialect.do_execute(
                        cursor, statement, parameters, context
                    )

.../hostedtoolcache/Python/3.11.11................................./x64/lib/python3.11.../sqlalchemy/engine/base.py:1910: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <sqlalchemy.dialects.sqlite.pysqlite.SQLiteDialect_pysqlite object at 0x7fe54798c910>
cursor = <sqlite3.Cursor object at 0x7fe54726eec0>
statement = 'SELECT 1 \nFROM expectation_test_table_nnwdibkvmn\n LIMIT ? OFFSET ?'
parameters = (1, 0)
context = <sqlalchemy.dialects.sqlite.base.SQLiteExecutionContext object at 0x7fe54794c050>

    def do_execute(self, cursor, statement, parameters, context=None):
>       cursor.execute(statement, parameters)
E       sqlite3.OperationalError: no such table: expectation_test_table_nnwdibkvmn

.../hostedtoolcache/Python/3.11.11................................./x64/lib/python3.11.../sqlalchemy/engine/default.py:736: OperationalError

The above exception was the direct cause of the following exception:

self = SqliteTableAsset(name='fmrcwkfdnm', type='table', id=None, order_by=[], batch_metadata={}, batch_definitions=[], table_name='expectation_test_table_nnwdibkvmn', schema_name=None)

    @override
    def test_connection(self) -> None:
        """Test the connection for the TableAsset.
    
        Raises:
            TestConnectionError: If the connection test fails.
        """
        datasource: SQLDatasource = self.datasource
        engine: sqlalchemy.Engine = datasource.get_engine()
        inspector: sqlalchemy.Inspector = sa.inspect(engine)
    
        if self.schema_name and self.schema_name not in inspector.get_schema_names():
            raise TestConnectionError(  # noqa: TRY003 # FIXME CoP
                f'Attempt to connect to table: "{self.qualified_name}" failed because the schema '
                f'"{self.schema_name}" does not exist.'
            )
    
        try:
            with engine.connect() as connection:
                table = sa.table(self.table_name, schema=self.schema_name)
                # don't need to fetch any data, just want to make sure the table is accessible
>               connection.execute(sa.select(1, table).limit(1))

.../datasource/fluent/sql_datasource.py:1068: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.../hostedtoolcache/Python/3.11.11................................./x64/lib/python3.11.../sqlalchemy/engine/base.py:1385: in execute
    return meth(self, multiparams, params, _EMPTY_EXECUTION_OPTS)
.../hostedtoolcache/Python/3.11.11................................./x64/lib/python3.11.../sqlalchemy/sql/elements.py:334: in _execute_on_connection
    return connection._execute_clauseelement(
.../hostedtoolcache/Python/3.11.11................................./x64/lib/python3.11.../sqlalchemy/engine/base.py:1577: in _execute_clauseelement
    ret = self._execute_context(
.../hostedtoolcache/Python/3.11.11................................./x64/lib/python3.11.../sqlalchemy/engine/base.py:1953: in _execute_context
    self._handle_dbapi_exception(
.../hostedtoolcache/Python/3.11.11................................./x64/lib/python3.11.../sqlalchemy/engine/base.py:2134: in _handle_dbapi_exception
    util.raise_(
.../hostedtoolcache/Python/3.11.11................................./x64/lib/python3.11.../sqlalchemy/util/compat.py:211: in raise_
    raise exception
.../hostedtoolcache/Python/3.11.11................................./x64/lib/python3.11.../sqlalchemy/engine/base.py:1910: in _execute_context
    self.dialect.do_execute(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <sqlalchemy.dialects.sqlite.pysqlite.SQLiteDialect_pysqlite object at 0x7fe54798c910>
cursor = <sqlite3.Cursor object at 0x7fe54726eec0>
statement = 'SELECT 1 \nFROM expectation_test_table_nnwdibkvmn\n LIMIT ? OFFSET ?'
parameters = (1, 0)
context = <sqlalchemy.dialects.sqlite.base.SQLiteExecutionContext object at 0x7fe54794c050>

    def do_execute(self, cursor, statement, parameters, context=None):
>       cursor.execute(statement, parameters)
E       sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) no such table: expectation_test_table_nnwdibkvmn
E       [SQL: SELECT 1 
E       FROM expectation_test_table_nnwdibkvmn
E        LIMIT ? OFFSET ?]
E       [parameters: (1, 0)]
E       (Background on this error at: https://sqlalche..../e/14/e3q8)

.../hostedtoolcache/Python/3.11.11................................./x64/lib/python3.11.../sqlalchemy/engine/default.py:736: OperationalError

The above exception was the direct cause of the following exception:

>       lambda: ihook(item=item, **kwds), when=when, reraise=reraise
    )

.../hostedtoolcache/Python/3.11.11................................./x64/lib/python3.11.../site-packages/flaky/flaky_pytest_plugin.py:146: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
tests/integration/conftest.py:165: in batch_for_datasource
    yield _batch_setup_for_datasource.make_batch()
.../test_utils/data_source_config/sql.py:89: in make_batch
    self.make_asset()
.../test_utils/data_source_config/sqlite.py:77: in make_asset
    ).add_table_asset(name=self._random_resource_name(), table_name=self.table_name)
.../datasource/fluent/sqlite_datasource.py:186: in add_table_asset
    super().add_table_asset(
.../datasource/fluent/sql_datasource.py:1313: in add_table_asset
    return self._add_asset(asset)
.../datasource/fluent/interfaces.py:879: in _add_asset
    asset.test_connection()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = SqliteTableAsset(name='fmrcwkfdnm', type='table', id=None, order_by=[], batch_metadata={}, batch_definitions=[], table_name='expectation_test_table_nnwdibkvmn', schema_name=None)

    @override
    def test_connection(self) -> None:
        """Test the connection for the TableAsset.
    
        Raises:
            TestConnectionError: If the connection test fails.
        """
        datasource: SQLDatasource = self.datasource
        engine: sqlalchemy.Engine = datasource.get_engine()
        inspector: sqlalchemy.Inspector = sa.inspect(engine)
    
        if self.schema_name and self.schema_name not in inspector.get_schema_names():
            raise TestConnectionError(  # noqa: TRY003 # FIXME CoP
                f'Attempt to connect to table: "{self.qualified_name}" failed because the schema '
                f'"{self.schema_name}" does not exist.'
            )
    
        try:
            with engine.connect() as connection:
                table = sa.table(self.table_name, schema=self.schema_name)
                # don't need to fetch any data, just want to make sure the table is accessible
                connection.execute(sa.select(1, table).limit(1))
        except Exception as query_error:
            LOGGER.info(f"{self.name} `.test_connection()` query failed: {query_error!r}")
>           raise TestConnectionError(  # noqa: TRY003 # FIXME CoP
                f"Attempt to connect to table: {self.qualified_name} failed because the test query "
                f"failed. Ensure the table exists and the user has access to select data from the table: {query_error}"  # noqa: E501 # FIXME CoP
            ) from query_error
E           great_expectations.datasource.fluent.interfaces.TestConnectionError: Attempt to connect to table: expectation_test_table_nnwdibkvmn failed because the test query failed. Ensure the table exists and the user has access to select data from the table: (sqlite3.OperationalError) no such table: expectation_test_table_nnwdibkvmn
E           [SQL: SELECT 1 
E           FROM expectation_test_table_nnwdibkvmn
E            LIMIT ? OFFSET ?]
E           [parameters: (1, 0)]
E           (Background on this error at: https://sqlalche..../e/14/e3q8)

.../datasource/fluent/sql_datasource.py:1071: TestConnectionError
tests.integration.data_sources_and_expectations.test_misconfigured_expectations::test_column_min_max_mismatch_misconfiguration[sqlite]
Stack Traces | 0.013s run time
self = <sqlalchemy.engine.base.Connection object at 0x7fe547ba2790>
dialect = <sqlalchemy.dialects.sqlite.pysqlite.SQLiteDialect_pysqlite object at 0x7fe5470ec650>
constructor = <bound method DefaultExecutionContext._init_compiled of <class 'sqlalchemy.dialects.sqlite.base.SQLiteExecutionContext'>>
statement = 'SELECT 1 \nFROM expectation_test_table_qnygciqaxw\n LIMIT ? OFFSET ?'
parameters = (1, 0), execution_options = immutabledict({})
args = (<sqlalchemy.dialects.sqlite.base.SQLiteCompiler object at 0x7fe5468e87d0>, [], <sqlalchemy.sql.selectable.Select object at 0x7fe5468ea2d0>, [_OffsetLimitParam('%(140622707983760 param)s', 1, type_=Integer())])
kw = {'cache_hit': symbol('CACHE_MISS')}
branched = <sqlalchemy.engine.base.Connection object at 0x7fe547ba2790>
conn = <sqlalchemy.pool.base._ConnectionFairy object at 0x7fe547ba07d0>
context = <sqlalchemy.dialects.sqlite.base.SQLiteExecutionContext object at 0x7fe5468ebdd0>
cursor = <sqlite3.Cursor object at 0x7fe546f998c0>, evt_handled = False

    def _execute_context(
        self,
        dialect,
        constructor,
        statement,
        parameters,
        execution_options,
        *args,
        **kw
    ):
        """Create an :class:`.ExecutionContext` and execute, returning
        a :class:`_engine.CursorResult`."""
    
        branched = self
        if self.__branch_from:
            # if this is a "branched" connection, do everything in terms
            # of the "root" connection, *except* for .close(), which is
            # the only feature that branching provides
            self = self.__branch_from
    
        if execution_options:
            yp = execution_options.get("yield_per", None)
            if yp:
                execution_options = execution_options.union(
                    {"stream_results": True, "max_row_buffer": yp}
                )
    
        try:
            conn = self._dbapi_connection
            if conn is None:
                conn = self._revalidate_connection()
    
            context = constructor(
                dialect, self, conn, execution_options, *args, **kw
            )
        except (exc.PendingRollbackError, exc.ResourceClosedError):
            raise
        except BaseException as e:
            self._handle_dbapi_exception(
                e, util.text_type(statement), parameters, None, None
            )
    
        if (
            self._transaction
            and not self._transaction.is_active
            or (
                self._nested_transaction
                and not self._nested_transaction.is_active
            )
        ):
            self._invalid_transaction()
    
        elif self._trans_context_manager:
            TransactionalContext._trans_ctx_check(self)
    
        if self._is_future and self._transaction is None:
            self._autobegin()
    
        context.pre_exec()
    
        if dialect.use_setinputsizes:
            context._set_input_sizes()
    
        cursor, statement, parameters = (
            context.cursor,
            context.statement,
            context.parameters,
        )
    
        if not context.executemany:
            parameters = parameters[0]
    
        if self._has_events or self.engine._has_events:
            for fn in self.dispatch.before_cursor_execute:
                statement, parameters = fn(
                    self,
                    cursor,
                    statement,
                    parameters,
                    context,
                    context.executemany,
                )
    
        if self._echo:
    
            self._log_info(statement)
    
            stats = context._get_cache_stats()
    
            if not self.engine.hide_parameters:
                self._log_info(
                    "[%s] %r",
                    stats,
                    sql_util._repr_params(
                        parameters, batches=10, ismulti=context.executemany
                    ),
                )
            else:
                self._log_info(
                    "[%s] [SQL parameters hidden due to hide_parameters=True]"
                    % (stats,)
                )
    
        evt_handled = False
        try:
            if context.executemany:
                if self.dialect._has_events:
                    for fn in self.dialect.dispatch.do_executemany:
                        if fn(cursor, statement, parameters, context):
                            evt_handled = True
                            break
                if not evt_handled:
                    self.dialect.do_executemany(
                        cursor, statement, parameters, context
                    )
            elif not parameters and context.no_parameters:
                if self.dialect._has_events:
                    for fn in self.dialect.dispatch.do_execute_no_params:
                        if fn(cursor, statement, context):
                            evt_handled = True
                            break
                if not evt_handled:
                    self.dialect.do_execute_no_params(
                        cursor, statement, context
                    )
            else:
                if self.dialect._has_events:
                    for fn in self.dialect.dispatch.do_execute:
                        if fn(cursor, statement, parameters, context):
                            evt_handled = True
                            break
                if not evt_handled:
>                   self.dialect.do_execute(
                        cursor, statement, parameters, context
                    )

.../hostedtoolcache/Python/3.11.11................................./x64/lib/python3.11.../sqlalchemy/engine/base.py:1910: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <sqlalchemy.dialects.sqlite.pysqlite.SQLiteDialect_pysqlite object at 0x7fe5470ec650>
cursor = <sqlite3.Cursor object at 0x7fe546f998c0>
statement = 'SELECT 1 \nFROM expectation_test_table_qnygciqaxw\n LIMIT ? OFFSET ?'
parameters = (1, 0)
context = <sqlalchemy.dialects.sqlite.base.SQLiteExecutionContext object at 0x7fe5468ebdd0>

    def do_execute(self, cursor, statement, parameters, context=None):
>       cursor.execute(statement, parameters)
E       sqlite3.OperationalError: no such table: expectation_test_table_qnygciqaxw

.../hostedtoolcache/Python/3.11.11................................./x64/lib/python3.11.../sqlalchemy/engine/default.py:736: OperationalError

The above exception was the direct cause of the following exception:

self = SqliteTableAsset(name='yspisgpxhb', type='table', id=None, order_by=[], batch_metadata={}, batch_definitions=[], table_name='expectation_test_table_qnygciqaxw', schema_name=None)

    @override
    def test_connection(self) -> None:
        """Test the connection for the TableAsset.
    
        Raises:
            TestConnectionError: If the connection test fails.
        """
        datasource: SQLDatasource = self.datasource
        engine: sqlalchemy.Engine = datasource.get_engine()
        inspector: sqlalchemy.Inspector = sa.inspect(engine)
    
        if self.schema_name and self.schema_name not in inspector.get_schema_names():
            raise TestConnectionError(  # noqa: TRY003 # FIXME CoP
                f'Attempt to connect to table: "{self.qualified_name}" failed because the schema '
                f'"{self.schema_name}" does not exist.'
            )
    
        try:
            with engine.connect() as connection:
                table = sa.table(self.table_name, schema=self.schema_name)
                # don't need to fetch any data, just want to make sure the table is accessible
>               connection.execute(sa.select(1, table).limit(1))

.../datasource/fluent/sql_datasource.py:1068: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.../hostedtoolcache/Python/3.11.11................................./x64/lib/python3.11.../sqlalchemy/engine/base.py:1385: in execute
    return meth(self, multiparams, params, _EMPTY_EXECUTION_OPTS)
.../hostedtoolcache/Python/3.11.11................................./x64/lib/python3.11.../sqlalchemy/sql/elements.py:334: in _execute_on_connection
    return connection._execute_clauseelement(
.../hostedtoolcache/Python/3.11.11................................./x64/lib/python3.11.../sqlalchemy/engine/base.py:1577: in _execute_clauseelement
    ret = self._execute_context(
.../hostedtoolcache/Python/3.11.11................................./x64/lib/python3.11.../sqlalchemy/engine/base.py:1953: in _execute_context
    self._handle_dbapi_exception(
.../hostedtoolcache/Python/3.11.11................................./x64/lib/python3.11.../sqlalchemy/engine/base.py:2134: in _handle_dbapi_exception
    util.raise_(
.../hostedtoolcache/Python/3.11.11................................./x64/lib/python3.11.../sqlalchemy/util/compat.py:211: in raise_
    raise exception
.../hostedtoolcache/Python/3.11.11................................./x64/lib/python3.11.../sqlalchemy/engine/base.py:1910: in _execute_context
    self.dialect.do_execute(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <sqlalchemy.dialects.sqlite.pysqlite.SQLiteDialect_pysqlite object at 0x7fe5470ec650>
cursor = <sqlite3.Cursor object at 0x7fe546f998c0>
statement = 'SELECT 1 \nFROM expectation_test_table_qnygciqaxw\n LIMIT ? OFFSET ?'
parameters = (1, 0)
context = <sqlalchemy.dialects.sqlite.base.SQLiteExecutionContext object at 0x7fe5468ebdd0>

    def do_execute(self, cursor, statement, parameters, context=None):
>       cursor.execute(statement, parameters)
E       sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) no such table: expectation_test_table_qnygciqaxw
E       [SQL: SELECT 1 
E       FROM expectation_test_table_qnygciqaxw
E        LIMIT ? OFFSET ?]
E       [parameters: (1, 0)]
E       (Background on this error at: https://sqlalche..../e/14/e3q8)

.../hostedtoolcache/Python/3.11.11................................./x64/lib/python3.11.../sqlalchemy/engine/default.py:736: OperationalError

The above exception was the direct cause of the following exception:

>       lambda: ihook(item=item, **kwds), when=when, reraise=reraise
    )

.../hostedtoolcache/Python/3.11.11................................./x64/lib/python3.11.../site-packages/flaky/flaky_pytest_plugin.py:146: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
tests/integration/conftest.py:165: in batch_for_datasource
    yield _batch_setup_for_datasource.make_batch()
.../test_utils/data_source_config/sql.py:89: in make_batch
    self.make_asset()
.../test_utils/data_source_config/sqlite.py:77: in make_asset
    ).add_table_asset(name=self._random_resource_name(), table_name=self.table_name)
.../datasource/fluent/sqlite_datasource.py:186: in add_table_asset
    super().add_table_asset(
.../datasource/fluent/sql_datasource.py:1313: in add_table_asset
    return self._add_asset(asset)
.../datasource/fluent/interfaces.py:879: in _add_asset
    asset.test_connection()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = SqliteTableAsset(name='yspisgpxhb', type='table', id=None, order_by=[], batch_metadata={}, batch_definitions=[], table_name='expectation_test_table_qnygciqaxw', schema_name=None)

    @override
    def test_connection(self) -> None:
        """Test the connection for the TableAsset.
    
        Raises:
            TestConnectionError: If the connection test fails.
        """
        datasource: SQLDatasource = self.datasource
        engine: sqlalchemy.Engine = datasource.get_engine()
        inspector: sqlalchemy.Inspector = sa.inspect(engine)
    
        if self.schema_name and self.schema_name not in inspector.get_schema_names():
            raise TestConnectionError(  # noqa: TRY003 # FIXME CoP
                f'Attempt to connect to table: "{self.qualified_name}" failed because the schema '
                f'"{self.schema_name}" does not exist.'
            )
    
        try:
            with engine.connect() as connection:
                table = sa.table(self.table_name, schema=self.schema_name)
                # don't need to fetch any data, just want to make sure the table is accessible
                connection.execute(sa.select(1, table).limit(1))
        except Exception as query_error:
            LOGGER.info(f"{self.name} `.test_connection()` query failed: {query_error!r}")
>           raise TestConnectionError(  # noqa: TRY003 # FIXME CoP
                f"Attempt to connect to table: {self.qualified_name} failed because the test query "
                f"failed. Ensure the table exists and the user has access to select data from the table: {query_error}"  # noqa: E501 # FIXME CoP
            ) from query_error
E           great_expectations.datasource.fluent.interfaces.TestConnectionError: Attempt to connect to table: expectation_test_table_qnygciqaxw failed because the test query failed. Ensure the table exists and the user has access to select data from the table: (sqlite3.OperationalError) no such table: expectation_test_table_qnygciqaxw
E           [SQL: SELECT 1 
E           FROM expectation_test_table_qnygciqaxw
E            LIMIT ? OFFSET ?]
E           [parameters: (1, 0)]
E           (Background on this error at: https://sqlalche..../e/14/e3q8)

.../datasource/fluent/sql_datasource.py:1071: TestConnectionError

To view more test analytics, go to the Test Analytics Dashboard
📋 Got 3 mins? Take this short survey to help us improve Test Analytics.

@NathanFarmer NathanFarmer marked this pull request as ready for review February 21, 2025 21:33


class MetricNameSuffix(str, Enum):
CONDITION = "condition"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this because condition gets special treatment. Hopefully we won't find any more special cases like that and we can pare this file down.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant