Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add forest generation #146

Merged
merged 1 commit into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 34 additions & 11 deletions cyaron/graph.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from .utils import *
from .vector import Vector
import random
from typing import TypeVar, Callable


__all__ = ["Edge", "Graph"]


Expand Down Expand Up @@ -46,7 +46,7 @@ def __init__(self, point_count, directed=False):
"""
self.directed = directed
self.edges = [[] for i in range(point_count + 1)]

def edge_count(self):
"""edge_count(self) -> int
Return the count of the edges in the graph.
Expand Down Expand Up @@ -292,9 +292,11 @@ def graph(point_count, edge_count, **kwargs):
self_loop = kwargs.get("self_loop", True)
repeated_edges = kwargs.get("repeated_edges", True)
if not repeated_edges:
max_edge = Graph._calc_max_edge(point_count, directed, self_loop)
max_edge = Graph._calc_max_edge(point_count, directed, self_loop)
if edge_count > max_edge:
raise Exception("the number of edges of this kind of graph which has %d vertexes must be less than or equal to %d." % (point_count, max_edge))
raise Exception(
"the number of edges of this kind of graph which has %d vertexes must be less than or equal to %d."
% (point_count, max_edge))

weight_limit = kwargs.get("weight_limit", (1, 1))
if not list_like(weight_limit):
Expand Down Expand Up @@ -349,9 +351,11 @@ def DAG(point_count, edge_count, **kwargs):
repeated_edges = kwargs.get("repeated_edges", True)
loop = kwargs.get("loop", False)
if not repeated_edges:
max_edge = Graph._calc_max_edge(point_count, not loop, self_loop)
max_edge = Graph._calc_max_edge(point_count, not loop, self_loop)
if edge_count > max_edge:
raise Exception("the number of edges of this kind of graph which has %d vertexes must be less than or equal to %d." % (point_count, max_edge))
raise Exception(
"the number of edges of this kind of graph which has %d vertexes must be less than or equal to %d."
% (point_count, max_edge))

weight_limit = kwargs.get("weight_limit", (1, 1))
if not list_like(weight_limit):
Expand Down Expand Up @@ -418,9 +422,11 @@ def UDAG(point_count, edge_count, **kwargs):
self_loop = kwargs.get("self_loop", True)
repeated_edges = kwargs.get("repeated_edges", True)
if not repeated_edges:
max_edge = Graph._calc_max_edge(point_count, False, self_loop)
max_edge = Graph._calc_max_edge(point_count, False, self_loop)
if edge_count > max_edge:
raise Exception("the number of edges of this kind of graph which has %d vertexes must be less than or equal to %d." % (point_count, max_edge))
raise Exception(
"the number of edges of this kind of graph which has %d vertexes must be less than or equal to %d."
% (point_count, max_edge))

weight_limit = kwargs.get("weight_limit", (1, 1))
if not list_like(weight_limit):
Expand Down Expand Up @@ -456,7 +462,7 @@ def UDAG(point_count, edge_count, **kwargs):
i += 1

return graph

@staticmethod
def connected(point_count, edge_count, directed=False, **kwargs):
"""connected(point_count, edge_count, **kwargs) -> Graph
Expand Down Expand Up @@ -519,7 +525,7 @@ def hack_spfa(point_count, **kwargs):
graph.add_edge(u, v, weight=weight_gen())

return graph

@staticmethod
def _calc_max_edge(point_count, directed, self_loop):
max_edge = point_count * (point_count - 1)
Expand All @@ -529,6 +535,22 @@ def _calc_max_edge(point_count, directed, self_loop):
max_edge += point_count
return max_edge

@staticmethod
def forest(point_count, tree_count, **kwargs):
if tree_count <= 0 or tree_count > point_count:
raise ValueError("tree_count must be between 1 and point_count")
tree = list(Graph.tree(point_count, **kwargs).iterate_edges())
need_delete = set(
i[0] for i in (Vector.random_unique_vector(tree_count - 1, [(
0, point_count - 2)]) if tree_count > 1 else []))
result = Graph(point_count, 0)
for i in range(point_count - 1):
if i not in need_delete:
result.add_edge(tree[i].start,
tree[i].end,
weight=tree[i].weight)
return result


class GraphMatrix:
"""
Expand Down Expand Up @@ -557,7 +579,8 @@ def __init__(self,
self.matrix[edge.start][edge.end], edge)

def __str__(self):
return '\n'.join([' '.join(map(str, row[1:])) for row in self.matrix[1:]])
return '\n'.join(
[' '.join(map(str, row[1:])) for row in self.matrix[1:]])

def __call__(self, u: int, v: int):
return self.matrix[u][v]
Expand Down
127 changes: 90 additions & 37 deletions cyaron/tests/graph_test.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import unittest
from cyaron import Graph
from random import randint


class UnionFindSet:

def __init__(self, size):
self.father = [0] + [i + 1 for i in range(size)]

Expand All @@ -23,15 +25,17 @@ def test_same(self, l, r):


def tarjan(graph, n):

def new_array(len, val=0):
return [val for _ in range(len+1)]
return [val for _ in range(len + 1)]

instack = new_array(n, False)
low = new_array(n)
dfn = new_array(n, 0)
stap = new_array(n)
belong = new_array(n)
var = [0, 0, 0] # cnt, bc, stop
var = [0, 0, 0] # cnt, bc, stop

# cnt = bc = stop = 0

def dfs(cur):
Expand All @@ -49,7 +53,7 @@ def dfs(cur):
low[cur] = min(low[cur], dfn[v.end])

if dfn[cur] == low[cur]:
v = cur + 1 # set v != cur
v = cur + 1 # set v != cur
var[1] += 1
while v != cur:
var[2] -= 1
Expand All @@ -58,8 +62,8 @@ def dfs(cur):
belong[v] = var[1]

for i in range(n):
if dfn[i+1] == 0:
dfs(i+1)
if dfn[i + 1] == 0:
dfs(i + 1)

return belong

Expand All @@ -69,28 +73,38 @@ class TestGraph(unittest.TestCase):
def test_self_loop(self):
graph_size = 20
for _ in range(20):
graph = Graph.graph(graph_size, int(graph_size*2), self_loop=True)
has_self_loop = max([e.start == e.end for e in graph.iterate_edges()])
graph = Graph.graph(graph_size,
int(graph_size * 2),
self_loop=True)
has_self_loop = max(
[e.start == e.end for e in graph.iterate_edges()])
if has_self_loop:
break
self.assertTrue(has_self_loop)

for _ in range(10):
graph = Graph.graph(graph_size, int(graph_size*2), self_loop=False)
self.assertFalse(max([e.start == e.end for e in graph.iterate_edges()]))
graph = Graph.graph(graph_size,
int(graph_size * 2),
self_loop=False)
self.assertFalse(
max([e.start == e.end for e in graph.iterate_edges()]))

def test_repeated_edges(self):
graph_size = 20
for _ in range(20):
graph = Graph.graph(graph_size, int(graph_size*2), repeated_edges=True)
graph = Graph.graph(graph_size,
int(graph_size * 2),
repeated_edges=True)
edges = [(e.start, e.end) for e in graph.iterate_edges()]
has_repeated_edges = len(edges) > len(set(edges))
if has_repeated_edges:
break
self.assertTrue(has_repeated_edges)

for _ in range(10):
graph = Graph.graph(graph_size, int(graph_size*2), repeated_edges=False)
graph = Graph.graph(graph_size,
int(graph_size * 2),
repeated_edges=False)
edges = [(e.start, e.end) for e in graph.iterate_edges()]
self.assertEqual(len(edges), len(set(edges)))

Expand All @@ -101,60 +115,78 @@ def test_tree_connected(self):
tree = Graph.tree(graph_size)
for edge in tree.iterate_edges():
ufs.merge(edge.start, edge.end)
for i in range(graph_size-1):
self.assertTrue(ufs.test_same(i+1, i+2))

for i in range(graph_size - 1):
self.assertTrue(ufs.test_same(i + 1, i + 2))

def test_DAG(self):
graph_size = 20
for _ in range(10): # test 10 times
for _ in range(10): # test 10 times
ufs = UnionFindSet(graph_size)
graph = Graph.DAG(graph_size, int(graph_size*1.6), repeated_edges=False, self_loop=False, loop=True)
graph = Graph.DAG(graph_size,
int(graph_size * 1.6),
repeated_edges=False,
self_loop=False,
loop=True)

self.assertEqual(len(list(graph.iterate_edges())), int(graph_size*1.6))
self.assertEqual(len(list(graph.iterate_edges())),
int(graph_size * 1.6))

for edge in graph.iterate_edges():
ufs.merge(edge.start, edge.end)
for i in range(graph_size-1):
self.assertTrue(ufs.test_same(i+1, i+2))
for i in range(graph_size - 1):
self.assertTrue(ufs.test_same(i + 1, i + 2))

def test_DAG_without_loop(self):
graph_size = 20
for _ in range(10): # test 10 times
for _ in range(10): # test 10 times
ufs = UnionFindSet(graph_size)
graph = Graph.DAG(graph_size, int(graph_size*1.6), repeated_edges=False, self_loop=False, loop=False)
graph = Graph.DAG(graph_size,
int(graph_size * 1.6),
repeated_edges=False,
self_loop=False,
loop=False)

self.assertEqual(len(list(graph.iterate_edges())), int(graph_size*1.6))
self.assertEqual(len(list(graph.iterate_edges())),
int(graph_size * 1.6))

for edge in graph.iterate_edges():
ufs.merge(edge.start, edge.end)
for i in range(graph_size-1):
self.assertTrue(ufs.test_same(i+1, i+2))
for i in range(graph_size - 1):
self.assertTrue(ufs.test_same(i + 1, i + 2))

belong = tarjan(graph, graph_size)
self.assertEqual(max(belong), graph_size)

def test_undirected_graph(self):
graph_size = 20
for _ in range(10): # test 10 times
for _ in range(10): # test 10 times
ufs = UnionFindSet(graph_size)
graph = Graph.UDAG(graph_size, int(graph_size*1.6), repeated_edges=False, self_loop=False)
graph = Graph.UDAG(graph_size,
int(graph_size * 1.6),
repeated_edges=False,
self_loop=False)

self.assertEqual(len(list(graph.iterate_edges())), int(graph_size*1.6))
self.assertEqual(len(list(graph.iterate_edges())),
int(graph_size * 1.6))

for edge in graph.iterate_edges():
ufs.merge(edge.start, edge.end)
for i in range(graph_size-1):
self.assertTrue(ufs.test_same(i+1, i+2))
for i in range(graph_size - 1):
self.assertTrue(ufs.test_same(i + 1, i + 2))

def test_DAG_boundary(self):
with self.assertRaises(Exception, msg="the number of edges of connected graph must more than the number of nodes - 1"):
with self.assertRaises(
Exception,
msg=
"the number of edges of connected graph must more than the number of nodes - 1"
):
Graph.DAG(8, 6)
Graph.DAG(8, 7)

def test_GraphMatrix(self):
g = Graph(3, True)
edge_set = [(2, 3, 3), (3, 3, 1), (2, 3, 7), (2, 3, 4), (3, 2, 1), (1, 3, 3)]
edge_set = [(2, 3, 3), (3, 3, 1), (2, 3, 7), (2, 3, 4), (3, 2, 1),
(1, 3, 3)]
for u, v, w in edge_set:
g.add_edge(u, v, weight=w)
self.assertEqual(str(g.to_matrix()), "-1 -1 3\n-1 -1 4\n-1 1 1")
Expand All @@ -166,9 +198,30 @@ def test_GraphMatrix(self):
merge2 = lambda val, edge: max(edge.weight, val)
merge3 = lambda val, edge: min(edge.weight, val)
merge4 = lambda val, edge: gcd(val, edge.weight)
merge5 = lambda val, edge: lcm(val, edge.weight) if val else edge.weight
self.assertEqual(str(g.to_matrix(merge=merge1)), "-1 -1 3\n-1 -1 3\n-1 1 1")
self.assertEqual(str(g.to_matrix(merge=merge2)), "-1 -1 3\n-1 -1 7\n-1 1 1")
self.assertEqual(str(g.to_matrix(default=9, merge=merge3)), "9 9 3\n9 9 3\n9 1 1")
self.assertEqual(str(g.to_matrix(default=0, merge=merge4)), "0 0 3\n0 0 1\n0 1 1")
self.assertEqual(str(g.to_matrix(default=0, merge=merge5)), "0 0 3\n0 0 84\n0 1 1")
merge5 = lambda val, edge: lcm(val, edge.weight
) if val else edge.weight
self.assertEqual(str(g.to_matrix(merge=merge1)),
"-1 -1 3\n-1 -1 3\n-1 1 1")
self.assertEqual(str(g.to_matrix(merge=merge2)),
"-1 -1 3\n-1 -1 7\n-1 1 1")
self.assertEqual(str(g.to_matrix(default=9, merge=merge3)),
"9 9 3\n9 9 3\n9 1 1")
self.assertEqual(str(g.to_matrix(default=0, merge=merge4)),
"0 0 3\n0 0 1\n0 1 1")
self.assertEqual(str(g.to_matrix(default=0, merge=merge5)),
"0 0 3\n0 0 84\n0 1 1")

def test_forest(self):
for i in range(10):
size = randint(1, 100)
part_count = randint(1, size)
forest = Graph.forest(size, part_count)
dsu = UnionFindSet(size)
for edge in forest.iterate_edges():
self.assertFalse(dsu.test_same(edge.start, edge.end))
dsu.merge(edge.start, edge.end)
count = 0
for i in range(1, size + 1):
if dsu.get_father(i) == i:
count += 1
self.assertEqual(count, part_count)
Loading