Source code for local2global_embedding.network.npgraph

#  Copyright (c) 2021. Lucas G. S. Jeub
#
#  Permission is hereby granted, free of charge, to any person obtaining a copy
#  of this software and associated documentation files (the "Software"), to deal
#  in the Software without restriction, including without limitation the rights
#  to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
#  copies of the Software, and to permit persons to whom the Software is
#  furnished to do so, subject to the following conditions:
#
#  The above copyright notice and this permission notice shall be included in all
#  copies or substantial portions of the Software.
#
#  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
#  IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
#  FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
#  AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
#  LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
#  OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
#  SOFTWARE.

import json
from pathlib import Path
from tempfile import TemporaryFile
from random import randrange

import numpy as np
import torch
import numba
from numba.experimental import jitclass

from .graph import Graph
from local2global_embedding import progress


rng = np.random.default_rng()


spec = [
    ('edge_index', numba.int64[:, :]),
    ('adj_index', numba.int64[:]),
    ('degree', numba.int64[:]),

]


[docs] class NPGraph(Graph): """ numpy backed graph class with support for memmapped edge_index """ @staticmethod def _convert_input(input): if input is None: return input elif isinstance(input, torch.Tensor): return np.asanyarray(input.cpu()) else: return np.asanyarray(input)
[docs] @classmethod def load(cls, folder, mmap_edges=None, mmap_features=None): folder = Path(folder) kwargs = {} kwargs['edge_index'] = np.load(folder / 'edge_index.npy', mmap_mode=mmap_edges) attr_file = folder / 'edge_attr.npy' if attr_file.is_file(): kwargs['edge_attr'] = np.load(attr_file, mmap_mode=mmap_edges) info_file = folder / 'info.json' if info_file.is_file(): with open(info_file) as f: info = json.load(f) kwargs.update(info) feat_file = folder / 'node_feat.npy' if feat_file.is_file(): kwargs['x'] = np.load(feat_file, mmap_mode=mmap_features) label_file = folder / 'node_label.npy' if label_file.is_file(): kwargs['y'] = np.load(label_file) index_file = folder / 'adj_index.npy' if index_file.is_file(): kwargs['adj_index'] = np.load(index_file) return cls(**kwargs)
[docs] def save(self, folder): folder = Path(folder) np.save(folder / 'edge_index.npy', self.edge_index) if self.weighted: np.save(folder / 'edge_attr.npy', self.edge_attr) np.save(folder / 'adj_index.npy', self.adj_index) info = {'num_nodes': self.num_nodes, 'undir': self.undir} with open(folder / 'info.json', 'w') as f: json.dump(info, f) if self.y is not None: np.save(self.y, folder / 'node_label.npy') if self.x is not None: np.save(self.x, folder / 'node_feat.npy')
[docs] def __init__(self, *args, ensure_sorted=False, **kwargs): super().__init__(*args, **kwargs) if self.num_nodes is None: self.num_nodes = np.max(self.edge_index) + 1 if ensure_sorted: if isinstance(self.edge_index, np.memmap): raise NotImplementedError("Sorting for memmapped arrays not yet implemented") else: index = np.argsort(self.edge_index[0]*self.num_nodes + self.edge_index[1]) self.edge_index = self.edge_index[:, index] if self.edge_attr is not None: self.edge_attr = self.edge_attr[index] self._jitgraph = JitGraph(self.edge_index, self.num_nodes, self.adj_index, None) self.adj_index = self._jitgraph.adj_index self.degree = self._jitgraph.degree self.num_nodes = self._jitgraph.num_nodes if self.weighted: self.weights = self.edge_attr self.strength = np.zeros(self.num_nodes) #: tensor of node strength np.add.at(self.strength, self.edge_index[0], self.weights) else: self.weights = np.broadcast_to(np.ones(1), (self.num_edges,)) # use expand to avoid actually allocating large array self.strength = self.degree self.device = 'cpu' if self.undir is None: if isinstance(self.edge_index, np.memmap): raise NotImplementedError("Checking directedness for memmapped arrays not yet implemented") else: index = np.argsort(self.edge_index[1]*self.num_nodes + self.edge_index[0]) edge_reverse = self.edge_index[::-1, index] self.undir = np.array_equal(self.edge_index, edge_reverse) if self.weighted: self.undir = self.undir and np.array_equal(self.weights, self.weights[index])
[docs] def edges(self): """ return list of edges where each edge is a tuple ``(source, target)`` """ return ((e[0], e[1]) for e in self.edge_index.T)
[docs] def edges_weighted(self): """ return list of edges where each edge is a tuple ``(source, target, weight)`` """ return ((e[0], e[1], w[0] if w.size > 1 else w) for e, w in zip(self.edge_index.T, self.weights))
[docs] def is_edge(self, source, target): return self._jitgraph.is_edge(source, target)
[docs] def neighbourhood(self, nodes, hops: int = 1): """ find the neighbourhood of a set of source nodes note that the neighbourhood includes the source nodes themselves Args: nodes: indices of source nodes hops: number of hops for neighbourhood Returns: neighbourhood """ explore = np.ones(self.num_nodes, dtype=np.bool) explore[nodes] = False all_nodes = nodes new_nodes = nodes for _ in range(hops): new_nodes = np.concatenate([self.adj(node) for node in new_nodes]) new_nodes = np.unique(new_nodes[explore[new_nodes]]) explore[new_nodes] = False all_nodes = np.concatenate((all_nodes, new_nodes)) return all_nodes
[docs] def subgraph(self, nodes: torch.Tensor, relabel=False, keep_x=True, keep_y=True): """ find induced subgraph for a set of nodes Args: nodes: node indeces Returns: subgraph """ nodes = np.asanyarray(nodes) edge_index, index = self._jitgraph.subgraph_edges(nodes) edge_attr = self.edge_attr if relabel: node_labels = None else: node_labels = [self.nodes[n] for n in nodes] if self.x is not None and keep_x: x = self.x[nodes] else: x = None if self.y is not None and keep_y: y = self.y[nodes] else: y = None return self.__class__(edge_index=edge_index, edge_attr=edge_attr[index] if edge_attr is not None else None, num_nodes=len(nodes), ensure_sorted=False, undir=self.undir, nodes=node_labels, x=x, y=y)
[docs] def connected_component_ids(self): """ return nodes in breadth-first-search order Args: start: index of starting node (default: 0) Returns: tensor of node indeces """ return self._jitgraph.connected_component_ids()
[docs] def nodes_in_lcc(self): """List all nodes in the largest connected component""" return np.flatnonzero(self.connected_component_ids() == 0)
[docs] def bfs_order(self, start=0): """ return nodes in breadth-first-search order Args: start: index of starting node (default: 0) Returns: tensor of node indeces """ bfs_list = np.full((self.num_nodes,), -1, dtype=np.int64) not_visited = np.ones(self.num_nodes, dtype=np.int64) bfs_list[0] = start not_visited[start] = False append_pointer = 1 i = 0 restart = 0 while append_pointer < self.num_nodes: node = bfs_list[i] if node < 0: for node in range(restart, self.num_nodes): if not_visited[node]: break restart = node bfs_list[i] = node not_visited[node] = False append_pointer += 1 i += 1 new_nodes = self.adj(node) new_nodes = new_nodes[not_visited[new_nodes]] number_new_nodes = len(new_nodes) not_visited[new_nodes] = False bfs_list[append_pointer:append_pointer+number_new_nodes] = new_nodes append_pointer += number_new_nodes return bfs_list
[docs] def partition_graph(self, partition, self_loops=True): partition = np.asanyarray(partition) partition_edges, weights = self._jitgraph.partition_graph_edges(partition, self_loops) return self.__class__(edge_index=partition_edges, edge_attr=weights, undir=self.undir)
[docs] def sample_negative_edges(self, num_samples): return self._jitgraph.sample_negative_edges(num_samples)
[docs] def sample_positive_edges(self, num_samples): index = rng.integers(self.num_edges, size=(num_samples,)) return self.edge_index[:, index]
@numba.njit def _subgraph_edges(edge_index, adj_index, degs, num_nodes, sources): max_edges = degs[sources].sum() subgraph_edge_index = np.empty((2, max_edges), dtype=np.int64) index = np.empty((max_edges,), dtype=np.int64) target_index = np.full((num_nodes,), -1, np.int64) target_index[sources] = np.arange(len(sources)) count = 0 for s in range(len(sources)): for i in range(adj_index[sources[s]], adj_index[sources[s]+1]): t = target_index[edge_index[1, i]] if t >= 0: subgraph_edge_index[0, count] = s subgraph_edge_index[1, count] = t index[count] = i count += 1 return subgraph_edge_index[:, :count], index[:count] @numba.njit def _memmap_degree(edge_index, num_nodes): degree = np.zeros(num_nodes, dtype=np.int64) with numba.objmode: print('computing degrees') progress.reset(edge_index.shape[1]) for it, source in enumerate(edge_index[0]): degree[source] += 1 if it % 1000000 == 0 and it > 0: with numba.objmode: progress.update(1000000) with numba.objmode: progress.close() return degree
[docs] @jitclass( [ ('edge_index', numba.int64[:, :]), ('adj_index', numba.int64[:]), ('degree', numba.int64[:]), ('num_nodes', numba.int64) ] ) class JitGraph:
[docs] def __init__(self, edge_index, num_nodes=None, adj_index=None, degree=None): if num_nodes is None: num_nodes_int = edge_index.max() + 1 else: num_nodes_int = num_nodes if adj_index is None: adj_index_ar = np.zeros((num_nodes_int+1,), dtype=np.int64) else: adj_index_ar = adj_index if degree is None: if adj_index is None: degree = np.zeros((num_nodes_int,), dtype=np.int64) for s in edge_index[0]: degree[s] += 1 adj_index_ar[1:] = degree.cumsum() else: degree = adj_index_ar[1:]-adj_index_ar[:-1] self.edge_index = edge_index self.adj_index = adj_index_ar self.degree = degree self.num_nodes = num_nodes_int
def is_edge(self, source, target): if source not in range(self.num_nodes) or target not in range(self.num_nodes): return False index = np.searchsorted(self.edge_index[1, self.adj_index[source]:self.adj_index[source + 1]], target) if index < self.degree[source] and self.edge_index[1, self.adj_index[source] + index] == target: return True else: return False def sample_negative_edges(self, num_samples): i = 0 sampled_edges = np.empty((2, num_samples), dtype=np.int64) while i < num_samples: source = randrange(self.num_nodes) target = randrange(self.num_nodes) if not self.is_edge(source, target): sampled_edges[0, i] = source sampled_edges[1, i] = target i += 1 return sampled_edges def adj(self, node): return self.edge_index[1][self.adj_index[node]:self.adj_index[node+1]] def neighbours(self, nodes): size = self.degree[nodes].sum() out = np.empty((size,), dtype=np.int64) it = 0 for node in nodes: out[it:it+self.degree[node]] = self.adj(node) it += self.degree[node] return np.unique(out) def sample_positive_edges(self, num_samples): index = np.random.randint(self.num_edges, (num_samples,)) return self.edge_index[:, index] def subgraph_edges(self, sources): max_edges = self.degree[sources].sum() subgraph_edge_index = np.empty((2, max_edges), dtype=np.int64) index = np.empty((max_edges,), dtype=np.int64) target_index = np.full((self.num_nodes,), -1, np.int64) target_index[sources] = np.arange(len(sources)) count = 0 for s in range(len(sources)): for ei in range(self.adj_index[sources[s]], self.adj_index[sources[s]+1]): t = target_index[self.edge_index[1][ei]] if t >= 0: subgraph_edge_index[0, count] = s subgraph_edge_index[1, count] = t index[count] = ei count += 1 return subgraph_edge_index[:, :count], index[:count] def subgraph(self, sources): edge_index, _ = self.subgraph_edges(sources) return JitGraph(edge_index, len(sources), None, None) def partition_graph_edges(self, partition, self_loops): num_edges = self.num_edges with numba.objmode: print('finding partition edges') progress.reset(num_edges) num_clusters = partition.max() + 1 edge_counts = np.zeros((num_clusters, num_clusters), dtype=np.int64) for i, (source, target) in enumerate(self.edge_index.T): source = partition[source] target = partition[target] if self_loops or (source != target): edge_counts[source, target] += 1 if i % 1000000 == 0 and i > 0: with numba.objmode: progress.update(1000000) with numba.objmode: progress.close() index = np.nonzero(edge_counts) partition_edges = np.vstack(index) weights = np.empty((len(index[0]),), dtype=np.int64) for it, (i, j) in enumerate(zip(*index)): weights[it] = edge_counts[i][j] return partition_edges, weights def partition_graph(self, partition, self_loops): edge_index, _ = self.partition_graph_edges(partition, self_loops) return JitGraph(edge_index, None, None, None) def connected_component_ids(self): """ return nodes in breadth-first-search order Args: start: index of starting node (default: 0) Returns: tensor of node indeces """ components = np.full((self.num_nodes,), -1, dtype=np.int64) not_visited = np.ones(self.num_nodes, dtype=np.bool) component_id = 0 components[0] = component_id not_visited[0] = False bfs_list = [0] i = 0 for _ in range(self.num_nodes): if bfs_list: node = bfs_list.pop() else: component_id += 1 for i in range(i, self.num_nodes): if not_visited[i]: break node = i not_visited[node] = False components[node] = component_id new_nodes = self.adj(node) new_nodes = new_nodes[not_visited[new_nodes]] not_visited[new_nodes] = False bfs_list.extend(new_nodes) num_components = components.max()+1 component_size = np.zeros((num_components,), dtype=np.int64) for i in components: component_size[i] += 1 new_id = np.argsort(component_size)[::-1] inverse = np.empty_like(new_id) inverse[new_id] = np.arange(num_components) return inverse[components] def nodes_in_lcc(self): """List all nodes in the largest connected component""" return np.flatnonzero(self.connected_component_ids() == 0) @property def num_edges(self): return self.edge_index.shape[1]