Skip to content

Commit

Permalink
Merge pull request #45 from YanWQ-monad/master
Browse files Browse the repository at this point in the history
Add DAG generator
  • Loading branch information
Toto Lin authored Sep 6, 2018
2 parents c895c1d + e89460c commit 3c072e9
Show file tree
Hide file tree
Showing 3 changed files with 271 additions and 1 deletion.
116 changes: 116 additions & 0 deletions cyaron/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,122 @@ def graph(point_count, edge_count, **kwargs):
i += 1
return graph

@staticmethod
def DAG(point_count, edge_count, **kwargs):
"""DAG(point_count, edge_count, **kwargs) -> Graph
Factory method. Return a graph with point_count vertexes and edge_count edges.
int point_count -> the count of vertexes
int edge_count -> the count of edges
**kwargs(Keyword args):
bool self_loop = False -> whether to allow self loops or not
bool repeated_edges = True -> whether to allow repeated edges or not
bool loop = False -> whether to allow loops or not
(int,int) weight_limit = (1,1) -> the limit of weight. index 0 is the min limit, and index 1 is the max limit(both included)
int weight_limit -> If you use a int for this arg, it means the max limit of the weight(included)
int/float weight_gen()
= lambda: random.randint(weight_limit[0], weight_limit[1])
-> the generator of the weights. It should return the weight. The default way is to use the random.randint()
"""
if edge_count < point_count - 1:
raise Exception("the number of edges of connected graph must more than the number of nodes - 1")

self_loop = kwargs.get("self_loop", False) # DAG default has no loop
repeated_edges = kwargs.get("repeated_edges", True)
loop = kwargs.get("loop", False)
weight_limit = kwargs.get("weight_limit", (1, 1))
if not list_like(weight_limit):
weight_limit = (1, weight_limit)
weight_gen = kwargs.get(
"weight_gen", lambda: random.randint(
weight_limit[0], weight_limit[1]))

used_edges = set()
edge_buf = list(Graph.tree(point_count, weight_limit=weight_gen()).iterate_edges())
graph = Graph(point_count, directed=True)

for edge in edge_buf:
if loop and random.randint(1, 2) == 1:
edge.start, edge.end = edge.end, edge.start
graph.add_edge(edge.start, edge.end, weight=edge.weight)

if not repeated_edges:
used_edges.add((edge.start, edge.end))

i = point_count - 1
while i < edge_count:
u = random.randint(1, point_count)
v = random.randint(1, point_count)

if not loop and u > v:
u, v = v, u

if (not self_loop and u == v) or (not repeated_edges and (u, v) in used_edges):
# Then we generate a new pair of nodes
continue

graph.add_edge(u, v, weight=weight_gen())

if not repeated_edges:
used_edges.add((u, v))

i += 1

return graph

@staticmethod
def UDAG(point_count, edge_count, **kwargs):
"""UDAG(point_count, edge_count, **kwargs) -> Graph
Factory method. Return a graph with point_count vertexes and edge_count edges.
int point_count -> the count of vertexes
int edge_count -> the count of edges
**kwargs(Keyword args):
bool self_loop = True -> whether to allow self loops or not
bool repeated_edges = True -> whether to allow repeated edges or not
(int,int) weight_limit = (1,1) -> the limit of weight. index 0 is the min limit, and index 1 is the max limit(both included)
int weight_limit -> If you use a int for this arg, it means the max limit of the weight(included)
int/float weight_gen()
= lambda: random.randint(weight_limit[0], weight_limit[1])
-> the generator of the weights. It should return the weight. The default way is to use the random.randint()
"""
if edge_count < point_count - 1:
raise Exception("the number of edges of connected graph must more than the number of nodes - 1")

self_loop = kwargs.get("self_loop", True)
repeated_edges = kwargs.get("repeated_edges", True)
weight_limit = kwargs.get("weight_limit", (1, 1))
if not list_like(weight_limit):
weight_limit = (1, weight_limit)
weight_gen = kwargs.get(
"weight_gen", lambda: random.randint(
weight_limit[0], weight_limit[1]))

used_edges = set()
graph = Graph.tree(point_count, weight_limit=weight_gen(), directed=False)

for edge in graph.iterate_edges():
if not repeated_edges:
used_edges.add((edge.start, edge.end))
used_edges.add((edge.end, edge.start))

i = point_count - 1
while i < edge_count:
u = random.randint(1, point_count)
v = random.randint(1, point_count)

if (not self_loop and u == v) or (not repeated_edges and (u, v) in used_edges):
# Then we generate a new pair of nodes
continue

graph.add_edge(u, v, weight=weight_gen())

if not repeated_edges:
used_edges.add((u, v))
used_edges.add((v, u))

i += 1

return graph

@staticmethod
def hack_spfa(point_count, **kwargs):
"""hack_spfa(point_count, **kwargs) -> None
Expand Down
3 changes: 2 additions & 1 deletion cyaron/tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@
from .io_test import TestIO
from .str_test import TestString
from .polygon_test import TestPolygon
from .compare_test import TestCompare
from .compare_test import TestCompare
from .graph_test import TestGraph
153 changes: 153 additions & 0 deletions cyaron/tests/graph_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
import unittest
from cyaron import Graph


class UnionFindSet:
def __init__(self, size):
self.father = [0] + [i + 1 for i in range(size)]

def get_father(self, node):
if self.father[node] == node:
return node
else:
self.father[node] = self.get_father(self.father[node])
return self.father[node]

def merge(self, l, r):
l = self.get_father(l)
r = self.get_father(r)
self.father[l] = r

def test_same(self, l, r):
return self.get_father(l) == self.get_father(r)


def tarjan(graph, n):
def new_array(len, val=0):
return [val for _ in range(len+1)]

instack = new_array(n, False)
low = new_array(n)
dfn = new_array(n, 0)
stap = new_array(n)
belong = new_array(n)
var = [0, 0, 0] # cnt, bc, stop
# cnt = bc = stop = 0

def dfs(cur):
var[0] += 1
dfn[cur] = low[cur] = var[0]
instack[cur] = True
stap[var[2]] = cur
var[2] += 1

for v in graph.edges[cur]:
if dfn[v.end] == 0:
dfs(v.end)
low[cur] = min(low[cur], low[v.end])
elif instack[v.end]:
low[cur] = min(low[cur], dfn[v.end])

if dfn[cur] == low[cur]:
v = cur + 1 # set v != cur
var[1] += 1
while v != cur:
var[2] -= 1
v = stap[var[2]]
instack[v] = False
belong[v] = var[1]

for i in range(n):
if dfn[i+1] == 0:
dfs(i+1)

return belong


class TestGraph(unittest.TestCase):

def test_self_loop(self):
graph_size = 20
for _ in range(20):
graph = Graph.graph(graph_size, int(graph_size*2), self_loop=True)
has_self_loop = max([e.start == e.end for e in graph.iterate_edges()])
if has_self_loop:
break
self.assertTrue(has_self_loop)

for _ in range(10):
graph = Graph.graph(graph_size, int(graph_size*2), self_loop=False)
self.assertFalse(max([e.start == e.end for e in graph.iterate_edges()]))

def test_repeated_edges(self):
graph_size = 20
for _ in range(20):
graph = Graph.graph(graph_size, int(graph_size*2), repeated_edges=True)
edges = [(e.start, e.end) for e in graph.iterate_edges()]
has_repeated_edges = len(edges) > len(set(edges))
if has_repeated_edges:
break
self.assertTrue(has_repeated_edges)

for _ in range(10):
graph = Graph.graph(graph_size, int(graph_size*2), repeated_edges=False)
edges = list(graph.iterate_edges())
self.assertEqual(len(edges), len(set(edges)))

def test_tree_connected(self):
graph_size = 20
for _ in range(20):
ufs = UnionFindSet(graph_size)
tree = Graph.tree(graph_size)
for edge in tree.iterate_edges():
ufs.merge(edge.start, edge.end)
for i in range(graph_size-1):
self.assertTrue(ufs.test_same(i+1, i+2))


def test_DAG(self):
graph_size = 20
for _ in range(10): # test 10 times
ufs = UnionFindSet(graph_size)
graph = Graph.DAG(graph_size, int(graph_size*1.6), repeated_edges=False, self_loop=False, loop=True)

self.assertEqual(len(list(graph.iterate_edges())), int(graph_size*1.6))

for edge in graph.iterate_edges():
ufs.merge(edge.start, edge.end)
for i in range(graph_size-1):
self.assertTrue(ufs.test_same(i+1, i+2))

def test_DAG_without_loop(self):
graph_size = 20
for _ in range(10): # test 10 times
ufs = UnionFindSet(graph_size)
graph = Graph.DAG(graph_size, int(graph_size*1.6), repeated_edges=False, self_loop=False, loop=False)

self.assertEqual(len(list(graph.iterate_edges())), int(graph_size*1.6))

for edge in graph.iterate_edges():
ufs.merge(edge.start, edge.end)
for i in range(graph_size-1):
self.assertTrue(ufs.test_same(i+1, i+2))

belong = tarjan(graph, graph_size)
self.assertEqual(max(belong), graph_size)

def test_undirected_graph(self):
graph_size = 20
for _ in range(10): # test 10 times
ufs = UnionFindSet(graph_size)
graph = Graph.UDAG(graph_size, int(graph_size*1.6), repeated_edges=False, self_loop=False)

self.assertEqual(len(list(graph.iterate_edges())), int(graph_size*1.6))

for edge in graph.iterate_edges():
ufs.merge(edge.start, edge.end)
for i in range(graph_size-1):
self.assertTrue(ufs.test_same(i+1, i+2))

def test_DAG_boundary(self):
with self.assertRaises(Exception, msg="the number of edges of connected graph must more than the number of nodes - 1"):
Graph.DAG(8, 6)
Graph.DAG(8, 7)

0 comments on commit 3c072e9

Please sign in to comment.