Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add DAG generator #45

Merged
merged 5 commits into from
Sep 6, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 116 additions & 0 deletions cyaron/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,122 @@ def graph(point_count, edge_count, **kwargs):
i += 1
return graph

@staticmethod
def DAG(point_count, edge_count, **kwargs):
"""DAG(point_count, edge_count, **kwargs) -> Graph
Factory method. Return a graph with point_count vertexes and edge_count edges.
int point_count -> the count of vertexes
int edge_count -> the count of edges
**kwargs(Keyword args):
bool self_loop = False -> whether to allow self loops or not
bool repeated_edges = True -> whether to allow repeated edges or not
bool loop = False -> whether to allow loops or not
(int,int) weight_limit = (1,1) -> the limit of weight. index 0 is the min limit, and index 1 is the max limit(both included)
int weight_limit -> If you use a int for this arg, it means the max limit of the weight(included)
int/float weight_gen()
= lambda: random.randint(weight_limit[0], weight_limit[1])
-> the generator of the weights. It should return the weight. The default way is to use the random.randint()
"""
if edge_count < point_count - 1:
raise Exception("the number of edges of connected graph must more than the number of nodes - 1")

self_loop = kwargs.get("self_loop", False) # DAG default has no loop
repeated_edges = kwargs.get("repeated_edges", True)
loop = kwargs.get("loop", False)
weight_limit = kwargs.get("weight_limit", (1, 1))
if not list_like(weight_limit):
weight_limit = (1, weight_limit)
weight_gen = kwargs.get(
"weight_gen", lambda: random.randint(
weight_limit[0], weight_limit[1]))

used_edges = set()
edge_buf = list(Graph.tree(point_count, weight_limit=weight_gen()).iterate_edges())
graph = Graph(point_count, directed=True)

for edge in edge_buf:
if loop and random.randint(1, 2) == 1:
edge.start, edge.end = edge.end, edge.start
graph.add_edge(edge.start, edge.end, weight=edge.weight)

if not repeated_edges:
used_edges.add((edge.start, edge.end))

i = point_count - 1
while i < edge_count:
u = random.randint(1, point_count)
v = random.randint(1, point_count)

if not loop and u > v:
u, v = v, u

if (not self_loop and u == v) or (not repeated_edges and (u, v) in used_edges):
# Then we generate a new pair of nodes
continue

graph.add_edge(u, v, weight=weight_gen())

if not repeated_edges:
used_edges.add((u, v))

i += 1

return graph

@staticmethod
def UDAG(point_count, edge_count, **kwargs):
"""UDAG(point_count, edge_count, **kwargs) -> Graph
Factory method. Return a graph with point_count vertexes and edge_count edges.
int point_count -> the count of vertexes
int edge_count -> the count of edges
**kwargs(Keyword args):
bool self_loop = True -> whether to allow self loops or not
bool repeated_edges = True -> whether to allow repeated edges or not
(int,int) weight_limit = (1,1) -> the limit of weight. index 0 is the min limit, and index 1 is the max limit(both included)
int weight_limit -> If you use a int for this arg, it means the max limit of the weight(included)
int/float weight_gen()
= lambda: random.randint(weight_limit[0], weight_limit[1])
-> the generator of the weights. It should return the weight. The default way is to use the random.randint()
"""
if edge_count < point_count - 1:
raise Exception("the number of edges of connected graph must more than the number of nodes - 1")

self_loop = kwargs.get("self_loop", True)
repeated_edges = kwargs.get("repeated_edges", True)
weight_limit = kwargs.get("weight_limit", (1, 1))
if not list_like(weight_limit):
weight_limit = (1, weight_limit)
weight_gen = kwargs.get(
"weight_gen", lambda: random.randint(
weight_limit[0], weight_limit[1]))

used_edges = set()
graph = Graph.tree(point_count, weight_limit=weight_gen(), directed=False)

for edge in graph.iterate_edges():
if not repeated_edges:
used_edges.add((edge.start, edge.end))
used_edges.add((edge.end, edge.start))

i = point_count - 1
while i < edge_count:
u = random.randint(1, point_count)
v = random.randint(1, point_count)

if (not self_loop and u == v) or (not repeated_edges and (u, v) in used_edges):
# Then we generate a new pair of nodes
continue

graph.add_edge(u, v, weight=weight_gen())

if not repeated_edges:
used_edges.add((u, v))
used_edges.add((v, u))

i += 1

return graph

@staticmethod
def hack_spfa(point_count, **kwargs):
"""hack_spfa(point_count, **kwargs) -> None
Expand Down
3 changes: 2 additions & 1 deletion cyaron/tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@
from .io_test import TestIO
from .str_test import TestString
from .polygon_test import TestPolygon
from .compare_test import TestCompare
from .compare_test import TestCompare
from .graph_test import TestGraph
153 changes: 153 additions & 0 deletions cyaron/tests/graph_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
import unittest
from cyaron import Graph


class UnionFindSet:
def __init__(self, size):
self.father = [0] + [i + 1 for i in range(size)]

def get_father(self, node):
if self.father[node] == node:
return node
else:
self.father[node] = self.get_father(self.father[node])
return self.father[node]

def merge(self, l, r):
l = self.get_father(l)
r = self.get_father(r)
self.father[l] = r

def test_same(self, l, r):
return self.get_father(l) == self.get_father(r)


def tarjan(graph, n):
def new_array(len, val=0):
return [val for _ in range(len+1)]

instack = new_array(n, False)
low = new_array(n)
dfn = new_array(n, 0)
stap = new_array(n)
belong = new_array(n)
var = [0, 0, 0] # cnt, bc, stop
# cnt = bc = stop = 0

def dfs(cur):
var[0] += 1
dfn[cur] = low[cur] = var[0]
instack[cur] = True
stap[var[2]] = cur
var[2] += 1

for v in graph.edges[cur]:
if dfn[v.end] == 0:
dfs(v.end)
low[cur] = min(low[cur], low[v.end])
elif instack[v.end]:
low[cur] = min(low[cur], dfn[v.end])

if dfn[cur] == low[cur]:
v = cur + 1 # set v != cur
var[1] += 1
while v != cur:
var[2] -= 1
v = stap[var[2]]
instack[v] = False
belong[v] = var[1]

for i in range(n):
if dfn[i+1] == 0:
dfs(i+1)

return belong


class TestGraph(unittest.TestCase):

def test_self_loop(self):
graph_size = 20
for _ in range(20):
graph = Graph.graph(graph_size, int(graph_size*2), self_loop=True)
has_self_loop = max([e.start == e.end for e in graph.iterate_edges()])
if has_self_loop:
break
self.assertTrue(has_self_loop)

for _ in range(10):
graph = Graph.graph(graph_size, int(graph_size*2), self_loop=False)
self.assertFalse(max([e.start == e.end for e in graph.iterate_edges()]))

def test_repeated_edges(self):
graph_size = 20
for _ in range(20):
graph = Graph.graph(graph_size, int(graph_size*2), repeated_edges=True)
edges = [(e.start, e.end) for e in graph.iterate_edges()]
has_repeated_edges = len(edges) > len(set(edges))
if has_repeated_edges:
break
self.assertTrue(has_repeated_edges)

for _ in range(10):
graph = Graph.graph(graph_size, int(graph_size*2), repeated_edges=False)
edges = list(graph.iterate_edges())
self.assertEqual(len(edges), len(set(edges)))

def test_tree_connected(self):
graph_size = 20
for _ in range(20):
ufs = UnionFindSet(graph_size)
tree = Graph.tree(graph_size)
for edge in tree.iterate_edges():
ufs.merge(edge.start, edge.end)
for i in range(graph_size-1):
self.assertTrue(ufs.test_same(i+1, i+2))


def test_DAG(self):
graph_size = 20
for _ in range(10): # test 10 times
ufs = UnionFindSet(graph_size)
graph = Graph.DAG(graph_size, int(graph_size*1.6), repeated_edges=False, self_loop=False, loop=True)

self.assertEqual(len(list(graph.iterate_edges())), int(graph_size*1.6))

for edge in graph.iterate_edges():
ufs.merge(edge.start, edge.end)
for i in range(graph_size-1):
self.assertTrue(ufs.test_same(i+1, i+2))

def test_DAG_without_loop(self):
graph_size = 20
for _ in range(10): # test 10 times
ufs = UnionFindSet(graph_size)
graph = Graph.DAG(graph_size, int(graph_size*1.6), repeated_edges=False, self_loop=False, loop=False)

self.assertEqual(len(list(graph.iterate_edges())), int(graph_size*1.6))

for edge in graph.iterate_edges():
ufs.merge(edge.start, edge.end)
for i in range(graph_size-1):
self.assertTrue(ufs.test_same(i+1, i+2))

belong = tarjan(graph, graph_size)
self.assertEqual(max(belong), graph_size)

def test_undirected_graph(self):
graph_size = 20
for _ in range(10): # test 10 times
ufs = UnionFindSet(graph_size)
graph = Graph.UDAG(graph_size, int(graph_size*1.6), repeated_edges=False, self_loop=False)

self.assertEqual(len(list(graph.iterate_edges())), int(graph_size*1.6))

for edge in graph.iterate_edges():
ufs.merge(edge.start, edge.end)
for i in range(graph_size-1):
self.assertTrue(ufs.test_same(i+1, i+2))

def test_DAG_boundary(self):
with self.assertRaises(Exception, msg="the number of edges of connected graph must more than the number of nodes - 1"):
Graph.DAG(8, 6)
Graph.DAG(8, 7)