-
Notifications
You must be signed in to change notification settings - Fork 2
/
value_generator.py
176 lines (151 loc) · 6.12 KB
/
value_generator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
"""
This file is part of SNMP agent simulator,
which used for simulate HUAWEI server iBMC/HMM SNMP interfaces.
author: Jack Zhang
crated: 2017/4/7
"""
import sys
import random
import json
from pyasn1.type import univ
from pyasn1.error import PyAsn1Error
from pysnmp.proto import rfc1902
stringPool = 'Love is a flower that lives on the cliff You can pick up the flower with nothing but your courage and adventurousness'.split()
counterRange = (0, 0xffffffff)
counter64Range = (0, 0xffffffffffffffff)
gaugeRange = (0, 0xffffffff)
timeticksRange = (0, 0xffffffff)
unsignedRange = (0, 65535)
int32Range = (0, 100) # these values are more likely to fit constraints
automaticValues = 5000
mibJsons = {} #used for hold all custom special values
def loadMibJson(mibFile):
jsonFile = 'mibjson/' + mibFile + '.json'
with open(jsonFile) as jsonData:
jsonObject = json.load(jsonData)
for key, obj in jsonObject.iteritems():
if key == 'imports' or key == 'meta':
continue
if not 'oid' in obj:
continue;
mibJsons[obj['oid']] = obj
jsonData.close()
def generateCustomValue(node):
oid = str(node.name).replace(', ', '.').strip('()')
if not oid in mibJsons:
oid = oid[:oid.rfind(".")]
if not oid in mibJsons:
return None
customNode = mibJsons[oid]
if not 'syntax' in customNode:
return None
if not 'constraints' in customNode['syntax']:
return None
constraints = customNode['syntax']['constraints']
if 'enumeration' in constraints:
count = len(constraints['enumeration'])
if count > 1:
return constraints['enumeration'].values()[random.randrange(0, count)]
elif 'range' in constraints:
ranges = constraints['range']
if len(ranges) > 0:
min = 0
max = 100
if 'min' in ranges[0]:
min = ranges[0]['min']
if 'max' in ranges[0]:
max = ranges[0]['max']
return random.randrange(min, max+1)
return None
def generateValue(node, rowNum):
syntax = node.syntax
# first try to generate a accurate value according json settings
val = generateCustomValue(node)
try:
if val is not None:
return syntax.clone(val)
except PyAsn1Error:
pass
makeGuess = automaticValues
while True:
if makeGuess:
# Pick a value
if isinstance(syntax, rfc1902.IpAddress):
val = '.'.join([str(random.randrange(1, 256)) for x in range(4)])
elif isinstance(syntax, rfc1902.TimeTicks):
val = random.randrange(timeticksRange[0], timeticksRange[1])
elif isinstance(syntax, rfc1902.Gauge32):
val = random.randrange(gaugeRange[0], gaugeRange[1])
elif isinstance(syntax, rfc1902.Counter32):
val = random.randrange(counterRange[0], counterRange[1])
elif isinstance(syntax, rfc1902.Integer32):
if rowNum > -1: #for integer, table row index
val = rowNum
else:
val = random.randrange(int32Range[0], int32Range[1])
elif isinstance(syntax, rfc1902.Unsigned32):
val = random.randrange(unsignedRange[0], unsignedRange[1])
elif isinstance(syntax, rfc1902.Counter64):
val = random.randrange(counter64Range[0], counter64Range[1])
elif isinstance(syntax, univ.OctetString):
maxWords = 2 if rowNum > -1 else 10
val = ' '.join([stringPool[random.randrange(0, len(stringPool))] for i in range(random.randrange(1, maxWords))])
elif isinstance(syntax, univ.ObjectIdentifier):
val = '.'.join(['1', '3', '6', '1', '3'] + [
'%d' % random.randrange(0, 255) for x in range(random.randrange(0, 10))])
elif isinstance(syntax, rfc1902.Bits):
val = [random.randrange(0, 256) for x in range(random.randrange(0, 9))]
else:
val = '?'
try:
if val is not None:
return syntax.clone(val)
except PyAsn1Error:
if makeGuess == 1:
sys.stderr.write(
'*** Inconsistent value: %s\r\n*** See constraints and suggest a better one for:\r\n' % (
sys.exc_info()[1],)
)
if makeGuess:
makeGuess -= 1
continue
sys.stderr.write('%s# Value [\'%s\'] ? ' % (node.name, (val is None and '<none>' or val),))
sys.stderr.flush()
line = sys.stdin.readline().strip()
if line:
if line[:2] == '0x':
if line[:4] == '0x0x':
line = line[2:]
elif isinstance(syntax, univ.OctetString):
val = syntax.clone(hexValue=line[2:])
else:
val = int(line[2:], 16)
else:
val = line
"""
This class is used for generate SNMP object values,
the value generation is dynamic, we'll first check constraints,
then generate value according the object type.
"""
def createVariable(SuperClass, typeName, instId, syntax, val):
class ScalarInstance(SuperClass):
def readGet(self, name, *args):
oid = str(self.name).replace(', ', '.').strip('()')
if not oid in mibJsons:
oid = oid[:oid.rfind(".")]
if oid in mibJsons:
customNode = mibJsons[oid]
if 'syntax' in customNode:
if ("dynamic" in customNode['syntax']) and (customNode['syntax']['dynamic'] == 1):
return name, self.getValue()
if val is not None:
return name, val
else:
return name, self.getValue()
# getValue is a function to call to retreive the value of the scalar
def getValue(self):
return generateValue(self, -1)
return ScalarInstance(typeName, instId, syntax)
if __name__ == '__main__':
sys.stderr.write('Please run hwsim.py to start SNMP simulator!')
sys.exit(-1)