Skip to content

Commit

Permalink
[SPARK-50719][PYTHON] Support interruptOperation for PySpark
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This PR proposes to support `interruptOperation` for PySpark

### Why are the changes needed?

For feature parity with Spark Connect

### Does this PR introduce _any_ user-facing change?

No, this adds a new API

### How was this patch tested?

The existing CI should pass

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#49423 from itholic/interrupt_operation.

Authored-by: Haejoon Lee <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
  • Loading branch information
itholic authored and zhengruifeng committed Jan 10, 2025
1 parent a4f2870 commit fe3d3ae
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 8 deletions.
2 changes: 1 addition & 1 deletion python/docs/source/reference/pyspark.sql/spark_session.rst
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ See also :class:`SparkSession`.
SparkSession.getActiveSession
SparkSession.getTags
SparkSession.interruptAll
SparkSession.interruptOperation
SparkSession.interruptTag
SparkSession.newSession
SparkSession.profile
Expand Down Expand Up @@ -88,6 +89,5 @@ Spark Connect Only
SparkSession.clearProgressHandlers
SparkSession.client
SparkSession.copyFromLocalToFs
SparkSession.interruptOperation
SparkSession.registerProgressHandler
SparkSession.removeProgressHandler
17 changes: 12 additions & 5 deletions python/pyspark/sql/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -2253,13 +2253,15 @@ def interruptTag(self, tag: str) -> List[str]:

return python_list

@remote_only
def interruptOperation(self, op_id: str) -> List[str]:
"""
Interrupt an operation of this session with the given operationId.
.. versionadded:: 3.5.0
.. versionchanged:: 4.0.0
Supports Spark Classic.
Returns
-------
list of str
Expand All @@ -2269,10 +2271,15 @@ def interruptOperation(self, op_id: str) -> List[str]:
-----
There is still a possibility of operation finishing just as it is interrupted.
"""
raise PySparkRuntimeError(
errorClass="ONLY_SUPPORTED_WITH_SPARK_CONNECT",
messageParameters={"feature": "SparkSession.interruptOperation"},
)
java_list = self._jsparkSession.interruptOperation(op_id)
python_list = list()

# Use iterator to manually iterate through Java list
java_iterator = java_list.iterator()
while java_iterator.hasNext():
python_list.append(str(java_iterator.next()))

return python_list

def addTag(self, tag: str) -> None:
"""
Expand Down
1 change: 0 additions & 1 deletion python/pyspark/sql/tests/test_connect_compatibility.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,6 @@ def test_spark_session_compatibility(self):
"addArtifacts",
"clearProgressHandlers",
"copyFromLocalToFs",
"interruptOperation",
"newSession",
"registerProgressHandler",
"removeProgressHandler",
Expand Down
1 change: 0 additions & 1 deletion python/pyspark/sql/tests/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,6 @@ def test_unsupported_api(self):
(lambda: session.client, "client"),
(session.addArtifacts, "addArtifact(s)"),
(lambda: session.copyFromLocalToFs("", ""), "copyFromLocalToFs"),
(lambda: session.interruptOperation(""), "interruptOperation"),
]

for func, name in unsupported:
Expand Down

0 comments on commit fe3d3ae

Please sign in to comment.