Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update API for node2vec and biased random walks #4841

Open
wants to merge 27 commits into
base: branch-25.02
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
86b2038
add support for rng state
jnke2016 Dec 28, 2024
38690a6
update test to take rng state parameter
jnke2016 Dec 28, 2024
fd5b387
add support for rng state
jnke2016 Dec 28, 2024
9d56b5f
deprecate old API
jnke2016 Dec 28, 2024
615837e
add new API for node2vec random walks
jnke2016 Dec 28, 2024
21a76bb
add mg node2vec random walks to the python API
jnke2016 Dec 28, 2024
86a13d3
update docstrings
jnke2016 Dec 28, 2024
4c8744f
enable mg node2vec_random walks
jnke2016 Dec 28, 2024
4da1c7e
update argument list in function call
jnke2016 Dec 28, 2024
6984645
support optional weights
jnke2016 Dec 28, 2024
d04588a
update docstring and deprecate arguments
jnke2016 Dec 31, 2024
cb6a294
add new API for uniform_random_walks
jnke2016 Dec 31, 2024
7936026
deprecate method
jnke2016 Dec 31, 2024
7a5056f
update copyrights
jnke2016 Dec 31, 2024
e2e4694
add uniform random walks
jnke2016 Dec 31, 2024
877265b
add new API for node2vec random walks
jnke2016 Dec 31, 2024
bb77237
deprecate legacy implementation
jnke2016 Dec 31, 2024
618fe76
add random state argumment and update copyright
jnke2016 Dec 31, 2024
a1d004c
update header file to take as input a random state
jnke2016 Dec 31, 2024
bea2a2f
add support for rng state as input
jnke2016 Dec 31, 2024
755acc7
update tests to support rng state as input
jnke2016 Dec 31, 2024
ef00fa5
add biased random walks to the PLC API
jnke2016 Jan 1, 2025
ae4833c
add biased random walks to the python API
jnke2016 Jan 1, 2025
8314291
update docstrings and init file
jnke2016 Jan 1, 2025
1603bcd
fix typo
jnke2016 Jan 1, 2025
0a03b29
update copyright
jnke2016 Jan 1, 2025
88e405d
add mg implementation of biased and uniform random walks
jnke2016 Jan 1, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
add mg implementation of biased and uniform random walks
  • Loading branch information
jnke2016 committed Jan 1, 2025
commit 88e405df171c748da7973ec2ae0b2f77aad55c2f
171 changes: 171 additions & 0 deletions python/cugraph/cugraph/dask/sampling/biased_random_walks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
# Copyright (c) 2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from dask.distributed import wait, default_client
import dask_cudf
import cudf
import operator as op
from cugraph.dask.common.part_utils import (
persist_dask_df_equal_parts_per_worker,
)

from pylibcugraph import ResourceHandle

from pylibcugraph import (
biased_random_walks as pylibcugraph_biased_random_walks,
)

from cugraph.dask.comms import comms as Comms


def convert_to_cudf(cp_paths, number_map=None, is_vertex_paths=False):
"""
Creates cudf Series from cupy arrays from pylibcugraph wrapper
"""

if is_vertex_paths and len(cp_paths) > 0:
if number_map.implementation.numbered:
df_ = cudf.DataFrame()
df_["vertex_paths"] = cp_paths
df_ = number_map.unrenumber(
df_, "vertex_paths", preserve_order=True
).compute()
vertex_paths = cudf.Series(df_["vertex_paths"]).fillna(-1)

return vertex_paths

return cudf.Series(cp_paths)


def _call_plc_biased_random_walks(sID, mg_graph_x, st_x, max_depth, random_state):

return pylibcugraph_biased_random_walks(
resource_handle=ResourceHandle(Comms.get_handle(sID).getHandle()),
input_graph=mg_graph_x,
start_vertices=st_x,
max_length=max_depth,
random_state=random_state,
)


def biased_random_walks(
input_graph,
start_vertices=None,
max_depth=None,
random_state=None
):
"""
compute random walks under the biased sampling framework for each nodes in
'start_vertices' and returns a padded result along with the maximum path length.
Vertices with no outgoing edges will be padded with -1.

parameters
----------
input_graph : cuGraph.Graph
The graph can be either directed or undirected.

start_vertices : int or list or cudf.Series or cudf.DataFrame
A single node or a list or a cudf.Series of nodes from which to run
the random walks. In case of multi-column vertices it should be
a cudf.DataFrame

max_depth : int
The maximum depth of the random walks

random_state: int, optional
Random seed to use when making sampling calls.


Returns
-------
vertex_paths : dask_cudf.Series or dask_cudf.DataFrame
Series containing the vertices of edges/paths in the random walk.

edge_weight_paths: dask_cudf.Series
Series containing the edge weights of edges represented by the
returned vertex_paths

max_path_length : int
The maximum path length
"""
client = default_client()
if isinstance(start_vertices, int):
start_vertices = [start_vertices]

if isinstance(start_vertices, list):
start_vertices = cudf.Series(start_vertices)

# start_vertices uses "external" vertex IDs, but if the graph has been
# renumbered, the start vertex IDs must also be renumbered.
if input_graph.renumbered:
# FIXME: This should match start_vertices type to the renumbered df type
# but verify that. If not retrieve the type and cast it when creating
# the dask_cudf from a cudf
start_vertices = input_graph.lookup_internal_vertex_id(start_vertices).compute()
start_vertices_type = input_graph.edgelist.edgelist_df.dtypes[0]
else:
# FIXME: Get the 'src' column names instead and retrieve the type
start_vertices_type = input_graph.input_df.dtypes.iloc[0]
start_vertices = dask_cudf.from_cudf(
start_vertices, npartitions=min(input_graph._npartitions, len(start_vertices))
)
start_vertices = start_vertices.astype(start_vertices_type)
start_vertices = persist_dask_df_equal_parts_per_worker(
start_vertices, client, return_type="dict"
)

result = [
client.submit(
_call_plc_biased_random_walks,
Comms.get_session_id(),
input_graph._plc_graph[w],
start_v[0] if start_v else cudf.Series(dtype=start_vertices_type),
max_depth,
random_state=random_state,
workers=[w],
allow_other_workers=False,
)
for w, start_v in start_vertices.items()
]

wait(result)

result_vertex_paths = [client.submit(op.getitem, f, 0) for f in result]
result_edge_wgt_paths = [client.submit(op.getitem, f, 1) for f in result]

cudf_vertex_paths = [
client.submit(convert_to_cudf, cp_vertex_paths, input_graph.renumber_map, True)
for cp_vertex_paths in result_vertex_paths
]

cudf_edge_wgt_paths = [
client.submit(convert_to_cudf, cp_edge_wgt_paths)
for cp_edge_wgt_paths in result_edge_wgt_paths
]

wait([cudf_vertex_paths, cudf_edge_wgt_paths])

ddf_vertex_paths = dask_cudf.from_delayed(cudf_vertex_paths).persist()
ddf_edge_wgt_paths = dask_cudf.from_delayed(cudf_edge_wgt_paths).persist()
wait([ddf_vertex_paths, ddf_edge_wgt_paths])

# Wait until the inactive futures are released
wait(
[
(r.release(), c_v.release(), c_e.release())
for r, c_v, c_e in zip(result, cudf_vertex_paths, cudf_edge_wgt_paths)
]
)

return ddf_vertex_paths, ddf_edge_wgt_paths, max_depth
171 changes: 171 additions & 0 deletions python/cugraph/cugraph/dask/sampling/uniform_random_walks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
# Copyright (c) 2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from dask.distributed import wait, default_client
import dask_cudf
import cudf
import operator as op
from cugraph.dask.common.part_utils import (
persist_dask_df_equal_parts_per_worker,
)

from pylibcugraph import ResourceHandle

from pylibcugraph import (
uniform_random_walks as pylibcugraph_uniform_random_walks,
)

from cugraph.dask.comms import comms as Comms


def convert_to_cudf(cp_paths, number_map=None, is_vertex_paths=False):
"""
Creates cudf Series from cupy arrays from pylibcugraph wrapper
"""

if is_vertex_paths and len(cp_paths) > 0:
if number_map.implementation.numbered:
df_ = cudf.DataFrame()
df_["vertex_paths"] = cp_paths
df_ = number_map.unrenumber(
df_, "vertex_paths", preserve_order=True
).compute()
vertex_paths = cudf.Series(df_["vertex_paths"]).fillna(-1)

return vertex_paths

return cudf.Series(cp_paths)


def _call_plc_uniform_random_walks(sID, mg_graph_x, st_x, max_depth, random_state):

return pylibcugraph_uniform_random_walks(
resource_handle=ResourceHandle(Comms.get_handle(sID).getHandle()),
input_graph=mg_graph_x,
start_vertices=st_x,
max_length=max_depth,
random_state=random_state,
)


def uniform_random_walks(
input_graph,
start_vertices=None,
max_depth=None,
random_state=None
):
"""
compute random walks under the uniform sampling framework for each nodes in
'start_vertices' and returns a padded result along with the maximum path length.
Vertices with no outgoing edges will be padded with -1.

parameters
----------
input_graph : cuGraph.Graph
The graph can be either directed or undirected.

start_vertices : int or list or cudf.Series or cudf.DataFrame
A single node or a list or a cudf.Series of nodes from which to run
the random walks. In case of multi-column vertices it should be
a cudf.DataFrame

max_depth : int
The maximum depth of the random walks

random_state: int, optional
Random seed to use when making sampling calls.


Returns
-------
vertex_paths : dask_cudf.Series or dask_cudf.DataFrame
Series containing the vertices of edges/paths in the random walk.

edge_weight_paths: dask_cudf.Series
Series containing the edge weights of edges represented by the
returned vertex_paths

max_path_length : int
The maximum path length
"""
client = default_client()
if isinstance(start_vertices, int):
start_vertices = [start_vertices]

if isinstance(start_vertices, list):
start_vertices = cudf.Series(start_vertices)

# start_vertices uses "external" vertex IDs, but if the graph has been
# renumbered, the start vertex IDs must also be renumbered.
if input_graph.renumbered:
# FIXME: This should match start_vertices type to the renumbered df type
# but verify that. If not retrieve the type and cast it when creating
# the dask_cudf from a cudf
start_vertices = input_graph.lookup_internal_vertex_id(start_vertices).compute()
start_vertices_type = input_graph.edgelist.edgelist_df.dtypes[0]
else:
# FIXME: Get the 'src' column names instead and retrieve the type
start_vertices_type = input_graph.input_df.dtypes.iloc[0]
start_vertices = dask_cudf.from_cudf(
start_vertices, npartitions=min(input_graph._npartitions, len(start_vertices))
)
start_vertices = start_vertices.astype(start_vertices_type)
start_vertices = persist_dask_df_equal_parts_per_worker(
start_vertices, client, return_type="dict"
)

result = [
client.submit(
_call_plc_uniform_random_walks,
Comms.get_session_id(),
input_graph._plc_graph[w],
start_v[0] if start_v else cudf.Series(dtype=start_vertices_type),
max_depth,
random_state=random_state,
workers=[w],
allow_other_workers=False,
)
for w, start_v in start_vertices.items()
]

wait(result)

result_vertex_paths = [client.submit(op.getitem, f, 0) for f in result]
result_edge_wgt_paths = [client.submit(op.getitem, f, 1) for f in result]

cudf_vertex_paths = [
client.submit(convert_to_cudf, cp_vertex_paths, input_graph.renumber_map, True)
for cp_vertex_paths in result_vertex_paths
]

cudf_edge_wgt_paths = [
client.submit(convert_to_cudf, cp_edge_wgt_paths)
for cp_edge_wgt_paths in result_edge_wgt_paths
]

wait([cudf_vertex_paths, cudf_edge_wgt_paths])

ddf_vertex_paths = dask_cudf.from_delayed(cudf_vertex_paths).persist()
ddf_edge_wgt_paths = dask_cudf.from_delayed(cudf_edge_wgt_paths).persist()
wait([ddf_vertex_paths, ddf_edge_wgt_paths])

# Wait until the inactive futures are released
wait(
[
(r.release(), c_v.release(), c_e.release())
for r, c_v, c_e in zip(result, cudf_vertex_paths, cudf_edge_wgt_paths)
]
)

return ddf_vertex_paths, ddf_edge_wgt_paths, max_depth
Loading