Skip to content

Commit

Permalink
Add forest generation
Browse files Browse the repository at this point in the history
  • Loading branch information
Mr-Python-in-China committed Oct 27, 2024
1 parent b4b5122 commit fce5c35
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 48 deletions.
45 changes: 34 additions & 11 deletions cyaron/graph.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from .utils import *
from .vector import Vector
import random
from typing import TypeVar, Callable


__all__ = ["Edge", "Graph"]


Expand Down Expand Up @@ -46,7 +46,7 @@ def __init__(self, point_count, directed=False):
"""
self.directed = directed
self.edges = [[] for i in range(point_count + 1)]

def edge_count(self):
"""edge_count(self) -> int
Return the count of the edges in the graph.
Expand Down Expand Up @@ -292,9 +292,11 @@ def graph(point_count, edge_count, **kwargs):
self_loop = kwargs.get("self_loop", True)
repeated_edges = kwargs.get("repeated_edges", True)
if not repeated_edges:
max_edge = Graph._calc_max_edge(point_count, directed, self_loop)
max_edge = Graph._calc_max_edge(point_count, directed, self_loop)
if edge_count > max_edge:
raise Exception("the number of edges of this kind of graph which has %d vertexes must be less than or equal to %d." % (point_count, max_edge))
raise Exception(
"the number of edges of this kind of graph which has %d vertexes must be less than or equal to %d."
% (point_count, max_edge))

weight_limit = kwargs.get("weight_limit", (1, 1))
if not list_like(weight_limit):
Expand Down Expand Up @@ -349,9 +351,11 @@ def DAG(point_count, edge_count, **kwargs):
repeated_edges = kwargs.get("repeated_edges", True)
loop = kwargs.get("loop", False)
if not repeated_edges:
max_edge = Graph._calc_max_edge(point_count, not loop, self_loop)
max_edge = Graph._calc_max_edge(point_count, not loop, self_loop)
if edge_count > max_edge:
raise Exception("the number of edges of this kind of graph which has %d vertexes must be less than or equal to %d." % (point_count, max_edge))
raise Exception(
"the number of edges of this kind of graph which has %d vertexes must be less than or equal to %d."
% (point_count, max_edge))

weight_limit = kwargs.get("weight_limit", (1, 1))
if not list_like(weight_limit):
Expand Down Expand Up @@ -418,9 +422,11 @@ def UDAG(point_count, edge_count, **kwargs):
self_loop = kwargs.get("self_loop", True)
repeated_edges = kwargs.get("repeated_edges", True)
if not repeated_edges:
max_edge = Graph._calc_max_edge(point_count, False, self_loop)
max_edge = Graph._calc_max_edge(point_count, False, self_loop)
if edge_count > max_edge:
raise Exception("the number of edges of this kind of graph which has %d vertexes must be less than or equal to %d." % (point_count, max_edge))
raise Exception(
"the number of edges of this kind of graph which has %d vertexes must be less than or equal to %d."
% (point_count, max_edge))

weight_limit = kwargs.get("weight_limit", (1, 1))
if not list_like(weight_limit):
Expand Down Expand Up @@ -456,7 +462,7 @@ def UDAG(point_count, edge_count, **kwargs):
i += 1

return graph

@staticmethod
def connected(point_count, edge_count, directed=False, **kwargs):
"""connected(point_count, edge_count, **kwargs) -> Graph
Expand Down Expand Up @@ -519,7 +525,7 @@ def hack_spfa(point_count, **kwargs):
graph.add_edge(u, v, weight=weight_gen())

return graph

@staticmethod
def _calc_max_edge(point_count, directed, self_loop):
max_edge = point_count * (point_count - 1)
Expand All @@ -529,6 +535,22 @@ def _calc_max_edge(point_count, directed, self_loop):
max_edge += point_count
return max_edge

@staticmethod
def forest(point_count, tree_count, **kwargs):
if tree_count <= 0 or tree_count > point_count:
raise ValueError("tree_count must be between 1 and point_count")
tree = list(Graph.tree(point_count, **kwargs).iterate_edges())
need_delete = set(
i[0] for i in (Vector.random_unique_vector(tree_count - 1, [(
0, point_count - 2)]) if tree_count > 1 else []))
result = Graph(point_count, 0)
for i in range(point_count - 1):
if i not in need_delete:
result.add_edge(tree[i].start,
tree[i].end,
weight=tree[i].weight)
return result


class GraphMatrix:
"""
Expand Down Expand Up @@ -557,7 +579,8 @@ def __init__(self,
self.matrix[edge.start][edge.end], edge)

def __str__(self):
return '\n'.join([' '.join(map(str, row[1:])) for row in self.matrix[1:]])
return '\n'.join(
[' '.join(map(str, row[1:])) for row in self.matrix[1:]])

def __call__(self, u: int, v: int):
return self.matrix[u][v]
Expand Down
127 changes: 90 additions & 37 deletions cyaron/tests/graph_test.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import unittest
from cyaron import Graph
from random import randint


class UnionFindSet:

def __init__(self, size):
self.father = [0] + [i + 1 for i in range(size)]

Expand All @@ -23,15 +25,17 @@ def test_same(self, l, r):


def tarjan(graph, n):

def new_array(len, val=0):
return [val for _ in range(len+1)]
return [val for _ in range(len + 1)]

instack = new_array(n, False)
low = new_array(n)
dfn = new_array(n, 0)
stap = new_array(n)
belong = new_array(n)
var = [0, 0, 0] # cnt, bc, stop
var = [0, 0, 0] # cnt, bc, stop

# cnt = bc = stop = 0

def dfs(cur):
Expand All @@ -49,7 +53,7 @@ def dfs(cur):
low[cur] = min(low[cur], dfn[v.end])

if dfn[cur] == low[cur]:
v = cur + 1 # set v != cur
v = cur + 1 # set v != cur
var[1] += 1
while v != cur:
var[2] -= 1
Expand All @@ -58,8 +62,8 @@ def dfs(cur):
belong[v] = var[1]

for i in range(n):
if dfn[i+1] == 0:
dfs(i+1)
if dfn[i + 1] == 0:
dfs(i + 1)

return belong

Expand All @@ -69,28 +73,38 @@ class TestGraph(unittest.TestCase):
def test_self_loop(self):
graph_size = 20
for _ in range(20):
graph = Graph.graph(graph_size, int(graph_size*2), self_loop=True)
has_self_loop = max([e.start == e.end for e in graph.iterate_edges()])
graph = Graph.graph(graph_size,
int(graph_size * 2),
self_loop=True)
has_self_loop = max(
[e.start == e.end for e in graph.iterate_edges()])
if has_self_loop:
break
self.assertTrue(has_self_loop)

for _ in range(10):
graph = Graph.graph(graph_size, int(graph_size*2), self_loop=False)
self.assertFalse(max([e.start == e.end for e in graph.iterate_edges()]))
graph = Graph.graph(graph_size,
int(graph_size * 2),
self_loop=False)
self.assertFalse(
max([e.start == e.end for e in graph.iterate_edges()]))

def test_repeated_edges(self):
graph_size = 20
for _ in range(20):
graph = Graph.graph(graph_size, int(graph_size*2), repeated_edges=True)
graph = Graph.graph(graph_size,
int(graph_size * 2),
repeated_edges=True)
edges = [(e.start, e.end) for e in graph.iterate_edges()]
has_repeated_edges = len(edges) > len(set(edges))
if has_repeated_edges:
break
self.assertTrue(has_repeated_edges)

for _ in range(10):
graph = Graph.graph(graph_size, int(graph_size*2), repeated_edges=False)
graph = Graph.graph(graph_size,
int(graph_size * 2),
repeated_edges=False)
edges = [(e.start, e.end) for e in graph.iterate_edges()]
self.assertEqual(len(edges), len(set(edges)))

Expand All @@ -101,60 +115,78 @@ def test_tree_connected(self):
tree = Graph.tree(graph_size)
for edge in tree.iterate_edges():
ufs.merge(edge.start, edge.end)
for i in range(graph_size-1):
self.assertTrue(ufs.test_same(i+1, i+2))

for i in range(graph_size - 1):
self.assertTrue(ufs.test_same(i + 1, i + 2))

def test_DAG(self):
graph_size = 20
for _ in range(10): # test 10 times
for _ in range(10): # test 10 times
ufs = UnionFindSet(graph_size)
graph = Graph.DAG(graph_size, int(graph_size*1.6), repeated_edges=False, self_loop=False, loop=True)
graph = Graph.DAG(graph_size,
int(graph_size * 1.6),
repeated_edges=False,
self_loop=False,
loop=True)

self.assertEqual(len(list(graph.iterate_edges())), int(graph_size*1.6))
self.assertEqual(len(list(graph.iterate_edges())),
int(graph_size * 1.6))

for edge in graph.iterate_edges():
ufs.merge(edge.start, edge.end)
for i in range(graph_size-1):
self.assertTrue(ufs.test_same(i+1, i+2))
for i in range(graph_size - 1):
self.assertTrue(ufs.test_same(i + 1, i + 2))

def test_DAG_without_loop(self):
graph_size = 20
for _ in range(10): # test 10 times
for _ in range(10): # test 10 times
ufs = UnionFindSet(graph_size)
graph = Graph.DAG(graph_size, int(graph_size*1.6), repeated_edges=False, self_loop=False, loop=False)
graph = Graph.DAG(graph_size,
int(graph_size * 1.6),
repeated_edges=False,
self_loop=False,
loop=False)

self.assertEqual(len(list(graph.iterate_edges())), int(graph_size*1.6))
self.assertEqual(len(list(graph.iterate_edges())),
int(graph_size * 1.6))

for edge in graph.iterate_edges():
ufs.merge(edge.start, edge.end)
for i in range(graph_size-1):
self.assertTrue(ufs.test_same(i+1, i+2))
for i in range(graph_size - 1):
self.assertTrue(ufs.test_same(i + 1, i + 2))

belong = tarjan(graph, graph_size)
self.assertEqual(max(belong), graph_size)

def test_undirected_graph(self):
graph_size = 20
for _ in range(10): # test 10 times
for _ in range(10): # test 10 times
ufs = UnionFindSet(graph_size)
graph = Graph.UDAG(graph_size, int(graph_size*1.6), repeated_edges=False, self_loop=False)
graph = Graph.UDAG(graph_size,
int(graph_size * 1.6),
repeated_edges=False,
self_loop=False)

self.assertEqual(len(list(graph.iterate_edges())), int(graph_size*1.6))
self.assertEqual(len(list(graph.iterate_edges())),
int(graph_size * 1.6))

for edge in graph.iterate_edges():
ufs.merge(edge.start, edge.end)
for i in range(graph_size-1):
self.assertTrue(ufs.test_same(i+1, i+2))
for i in range(graph_size - 1):
self.assertTrue(ufs.test_same(i + 1, i + 2))

def test_DAG_boundary(self):
with self.assertRaises(Exception, msg="the number of edges of connected graph must more than the number of nodes - 1"):
with self.assertRaises(
Exception,
msg=
"the number of edges of connected graph must more than the number of nodes - 1"
):
Graph.DAG(8, 6)
Graph.DAG(8, 7)

def test_GraphMatrix(self):
g = Graph(3, True)
edge_set = [(2, 3, 3), (3, 3, 1), (2, 3, 7), (2, 3, 4), (3, 2, 1), (1, 3, 3)]
edge_set = [(2, 3, 3), (3, 3, 1), (2, 3, 7), (2, 3, 4), (3, 2, 1),
(1, 3, 3)]
for u, v, w in edge_set:
g.add_edge(u, v, weight=w)
self.assertEqual(str(g.to_matrix()), "-1 -1 3\n-1 -1 4\n-1 1 1")
Expand All @@ -166,9 +198,30 @@ def test_GraphMatrix(self):
merge2 = lambda val, edge: max(edge.weight, val)
merge3 = lambda val, edge: min(edge.weight, val)
merge4 = lambda val, edge: gcd(val, edge.weight)
merge5 = lambda val, edge: lcm(val, edge.weight) if val else edge.weight
self.assertEqual(str(g.to_matrix(merge=merge1)), "-1 -1 3\n-1 -1 3\n-1 1 1")
self.assertEqual(str(g.to_matrix(merge=merge2)), "-1 -1 3\n-1 -1 7\n-1 1 1")
self.assertEqual(str(g.to_matrix(default=9, merge=merge3)), "9 9 3\n9 9 3\n9 1 1")
self.assertEqual(str(g.to_matrix(default=0, merge=merge4)), "0 0 3\n0 0 1\n0 1 1")
self.assertEqual(str(g.to_matrix(default=0, merge=merge5)), "0 0 3\n0 0 84\n0 1 1")
merge5 = lambda val, edge: lcm(val, edge.weight
) if val else edge.weight
self.assertEqual(str(g.to_matrix(merge=merge1)),
"-1 -1 3\n-1 -1 3\n-1 1 1")
self.assertEqual(str(g.to_matrix(merge=merge2)),
"-1 -1 3\n-1 -1 7\n-1 1 1")
self.assertEqual(str(g.to_matrix(default=9, merge=merge3)),
"9 9 3\n9 9 3\n9 1 1")
self.assertEqual(str(g.to_matrix(default=0, merge=merge4)),
"0 0 3\n0 0 1\n0 1 1")
self.assertEqual(str(g.to_matrix(default=0, merge=merge5)),
"0 0 3\n0 0 84\n0 1 1")

def test_forest(self):
for i in range(10):
size = randint(1, 100)
part_count = randint(1, size)
forest = Graph.forest(size, part_count)
dsu = UnionFindSet(size)
for edge in forest.iterate_edges():
self.assertFalse(dsu.test_same(edge.start, edge.end))
dsu.merge(edge.start, edge.end)
count = 0
for i in range(1, size + 1):
if dsu.get_father(i) == i:
count += 1
self.assertEqual(count, part_count)

0 comments on commit fce5c35

Please sign in to comment.