forked from kd8bny/LiMEaide
-
Notifications
You must be signed in to change notification settings - Fork 0
/
limeaide.py
314 lines (264 loc) · 11.6 KB
/
limeaide.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
#!/usr/bin/env python3
import sys
import os
import logging
import configparser
import argparse
import urllib.request
import zipfile
import getpass
import pickle
import shutil
from datetime import datetime
from termcolor import colored, cprint
from lib.session import Session
from lib.client import Client
from lib.deploy_lime import LimeDeploy
from lib.deploy_volatility import VolDeploy
from lib.profiler import Profiler
class Limeaide(object):
"""Deploy LiME LKM to remote host in order to scrape RAM."""
__version__ = "1.4.1"
__author__ = "[email protected]"
def __init__(self):
super(Limeaide, self).__init__()
self.logger = None
self.volatility_profile_dir = None
self.lime_dir = './tools/LiME/src/'
self.tools_dir = './tools/'
self.output_dir = './output/'
self.profile_dir = './profiles/'
self.log_dir = './logs/'
self.scheduled_pickup_dir = './scheduled_jobs/'
self.args_case = ''
@staticmethod
def get_args():
"""Take a look at those args."""
parser = argparse.ArgumentParser(description='Utility designed to \
automate GNU/Linux memory forensics')
parser.add_argument("remote", help="remote host IP")
parser.add_argument("-u", "--user", help="use a sudo user instead \
default: root")
parser.add_argument(
"-N", "--no-profiler", action="store_true",
help="Do NOT run profiler and force compile new module/profile for \
client")
parser.add_argument(
"-p", "--profile", nargs=3, metavar=('disto', 'kver', 'arch'),
help="Skip the profiler by providing the distribution, kernel\
version, and architecture of the remote client.")
parser.add_argument(
"-C", "--dont-compress", action="store_true", help="Do NOT compress\
dump into Bzip2 format")
parser.add_argument("-o", "--output", help="name the output file")
parser.add_argument("-c", "--case", help="Append case number to output\
dir")
parser.add_argument("--delay-pickup", action="store_true", help="Used \
to store job for future pickup")
parser.add_argument("-P", "--pickup", help="Enter stored job file")
parser.add_argument("-v", "--verbose", action="store_true", help="Prod\
uce verbose output from remote client")
parser.add_argument("--force-clean", action="store_true", help="Force \
clean client after failed deployment")
return parser.parse_args()
def check_directories(self):
if not os.path.isdir(self.output_dir):
os.mkdir(self.output_dir)
if not os.path.isdir(self.profile_dir):
os.mkdir(self.profile_dir)
if not os.path.isdir(self.tools_dir):
os.mkdir(self.tools_dir)
if not os.path.isdir(self.log_dir):
os.mkdir(self.log_dir)
if not os.path.isdir(self.scheduled_pickup_dir):
os.mkdir(self.scheduled_pickup_dir)
def check_tools(self):
"""Check for required tools and directories."""
if not os.path.isfile('.limeaide'):
config = configparser.RawConfigParser()
config.set('DEFAULT', 'volatility', '')
config.set('DEFAULT', 'output', '')
config.set('DEFAULT', 'compress', '')
with open('.limeaide', 'w+') as config_file:
config.write(config_file)
# Download LiME
if not os.path.isdir(self.lime_dir):
lime_version = '1.7.8.2'
cprint("Downloading LiME", 'green')
try:
urllib.request.urlretrieve(
"https://github.com/kd8bny/LiME/archive/v{}.zip".format(
lime_version), filename="./tools/lime_master.zip")
zip_lime = zipfile.ZipFile("./tools/lime_master.zip", 'r')
zip_lime.extractall('./tools/')
zip_lime.close()
shutil.move(
'./tools/LiME-{}'.format(lime_version), './tools/LiME/')
except urllib.error.URLError:
cprint(
"LiME failed to download. Check your internet connection" +
"or place manually", 'red')
sys.exit()
# Check to see if a volatility directory exists in config
config = configparser.ConfigParser()
config.read('.limeaide')
config_vol_dir = config['DEFAULT']['volatility']
if config_vol_dir == 'None':
pass
elif not config_vol_dir or not os.path.isdir(config_vol_dir):
cprint(
"Volatility directory missing. Current directory is:", 'red')
cprint("{}".format(config_vol_dir), 'blue')
cprint("Please provide a path to your Volatility directory." +
"\ne.g. '~/volatility/'" +
"\n[q] to never ask again: ", 'green')
path_ext = '/volatility/plugins/overlays/linux/'
while True:
path = input(":")
if path == 'q':
path = 'None'
path_ext = ''
break
elif os.path.isdir(path):
break
else:
cprint(
"Entered directory does not exist. Please enter" +
"again", 'red')
config.set('DEFAULT', 'volatility', path + path_ext)
with open('.limeaide', 'w+') as configfile:
config.write(configfile)
self.volatility_profile_dir = config_vol_dir
@staticmethod
def get_client(args, config):
"""Return instantiated client.
Config will provide global overrides.
"""
client = Client()
date = datetime.strftime(datetime.today(), "%Y_%m_%dT%H_%M_%S_%f")
client.ip = args.remote
client.jobname = "{0}-{1}-worker".format(client.ip, date)
if args.user is not None:
client.user = args.user
client.is_sudoer = True
if args.delay_pickup:
client.delay_pickup = True
if not config['DEFAULT']['output']:
if args.output is not None:
client.output = args.output
if not config['DEFAULT']['compress']:
if args.dont_compress:
client.compress = not client.compress
cprint("> Establishing secure connection {0}@{1}".format(
client.user, client.ip), 'blue')
client.pass_ = getpass.getpass()
return client
def save_job(self, client, jobname):
"""Save client with pickle.
Format will be <date>-worker.dat
"""
pickle.dump(client, open(("{0}{1}.dat".format(
self.scheduled_pickup_dir, jobname)), 'wb'))
def finish_saved_job(self, jobname):
"""Restore client with pickle. Transfer dump."""
restored_client = pickle.load(open(jobname, 'rb'))
cprint("Client restored!", 'green')
cprint(
'Retrieving RAM dump "{}"'.format(restored_client.output), 'blue')
if not os.path.isdir(restored_client.output_dir):
os.mkdir(restored_client.output_dir)
saved_session = Session(restored_client)
saved_session.connect()
delayed_profiler = Profiler()
LimeDeploy(saved_session, delayed_profiler).transfer_dump()
VolDeploy(saved_session).main(self.volatility_profile_dir)
cprint(
"Job {} pickup has been completed!".format(
restored_client.output), 'green')
saved_session.disconnect()
os.remove(jobname)
def main(self):
"""Start the interactive session for LiMEaide."""
cprint(
"""\
.---. _______
| |.--. __ __ ___ __.....__ .--.\ ___ `'. __.....__
| ||__|| |/ `.' `. .-'' '. |__| ' |--.\ \ .-'' '.
| |.--.| .-. .-. ' / .-''"'-. `. .--. | | \ ' / .-''"'-. `.
| || || | | | | |/ /________\ \ __ | | | | | '/ /________\ |
| || || | | | | || | .:--.'. | | | | | || |
| || || | | | | |\ .-------------'/ | \ || | | | ' .'\ .-------------'
| || || | | | | | \ '-.____...---.`" __ | || | | |___.' /' \ '-.____...---.
| ||__||__| |__| |__| `. .' .'.''| ||__|/_______.'/ `. .'
'---' `''-...... -' / / | |_ \_______|/ `''-...... -'
\ \._,\ '/
`--' `"
by kd8bny {0}\n""".format(
self.__version__), 'green', attrs=['bold'])
print(
"LiMEaide is licensed under GPL-3.0\n"
"LiME is licensed under GPL-2.0\n")
date = datetime.strftime(datetime.today(), "%Y_%m_%dT%H_%M_%S_%f")
self.check_directories()
self.check_tools()
logging.basicConfig(
level=logging.INFO, filename='{0}{1}.log'.format(
self.log_dir, date))
self.logger = logging.getLogger()
args = self.get_args()
config = configparser.ConfigParser()
config.read('.limeaide')
profiler = Profiler()
profiler.load_profiles()
client = self.get_client(args, config)
if args.pickup:
self.finish_saved_job(args.pickup)
sys.exit()
if args.case is not None:
self.args_case = 'case_%s' % (args.case)
# Start session
session = Session(client, args.verbose)
session.connect()
client.output_dir = "{0}{1}{2}/".format(
self.output_dir, self.args_case, date)
os.mkdir(client.output_dir)
if args.force_clean:
session.disconnect()
sys.exit("Clean attempt complete")
if args.profile is not None:
profile = profiler.select_profile(
args.profile[0], args.profile[1], args.profile[2])
if profile is None:
new_profile = input(colored(
"No profiles found... Would you like to build a new" +
"profile for the remote client [Y/n]", 'red'))
if new_profile.lower() == 'n':
sys.exit()
else:
client.profile = profile
cprint("Profile found!", 'green')
elif not args.no_profiler:
use_profile = input(colored(
"Would you like to select a pre-generated profile " +
"[Y/n]", 'green'))
if use_profile.lower() == 'y':
profile = profiler.interactive_chooser()
if profile is None:
cprint("No profiles found... Will build new profile" +
"for remote client", 'red')
else:
client.profile = profile
LimeDeploy(session, profiler).main()
if args.delay_pickup:
self.save_job(client, client.jobname)
cprint("> RAM dump retrieval is postponed", 'green')
cprint(
"> To retrieve, run LiMEaide with" +
'"-P scheduled_jobs/{}.dat"'.format(client.jobname), 'yellow')
else:
# Now that's taken care of, lets do work on Volatility
VolDeploy(session).main(self.volatility_profile_dir)
session.disconnect()
logging.shutdown()
if __name__ == '__main__':
Limeaide().main()