Skip to content

Commit

Permalink
fix: analysis summary didn't look for genes in root analysis (#27)
Browse files Browse the repository at this point in the history
* fix: analysis summary didn't look for genes in root analysis

* feat: add query mode for s1 integration script
  • Loading branch information
davidt99 authored Jan 13, 2022
1 parent 792ab1b commit c4b9b1c
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 37 deletions.
5 changes: 5 additions & 0 deletions CHANGES
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
1.6.2
-------
- Fix: analysis summary didn't look for genes in root analysis


1.6.1
-------
- Fix: Handle no iocs correctly
Expand Down
8 changes: 6 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@ Basic SDK for Intezer Analyze API 2.0

[View full API documentation](https://analyze.intezer.com/api/docs/documentation) (Notice - You must be logged in to Intezer Analyze to access the documentation)

Currently the following options are available in the SDK:
Currently, the following options are available in the SDK:

- Analyze by file
- Analyze by SHA256
- Index by file
- Index by SHA256
- Get Latest Analysis
- Account and file related samples
- Code reuse and metadata
- Code reuse and Metadata
- IOCs, Dynamic TTPs and Capabilities
- Strings related samples
- Search a family

Expand Down Expand Up @@ -138,6 +139,9 @@ You can find more code examples under [analyze-python-sdk/examples/](https://git

## Changelog

### 1.6.2
- Fix: analysis summary didn't look for genes in root analysis

### 1.6.1
- Fix: Handle no iocs correctly

Expand Down
94 changes: 63 additions & 31 deletions examples/sentinel_one_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
import datetime
import io
import logging
import logging.handlers
import secrets
import sys
import time
import urllib.parse
from http import HTTPStatus
Expand All @@ -13,16 +15,18 @@

import requests
import requests.adapters

from intezer_sdk import api
from intezer_sdk import errors
from intezer_sdk import util
from intezer_sdk.analysis import Analysis
from intezer_sdk.util import get_analysis_summary

_s1_session: Optional[requests.Session] = None
_logger = logging.getLogger('intezer')


class BaseUrlSession(requests.Session):
"""Taken from https://github.com/requests/toolbelt/blob/master/requests_toolbelt/sessions.py"""
base_url = None

def __init__(self, base_url=None):
Expand All @@ -31,23 +35,23 @@ def __init__(self, base_url=None):
super(BaseUrlSession, self).__init__()

def request(self, method, url, *args, **kwargs):
'Send the request after generating the complete URL.'
"""Send the request after generating the complete URL."""
url = self.create_url(url)
return super(BaseUrlSession, self).request(
method, url, *args, **kwargs
)

def prepare_request(self, request):
'Prepare the request after generating the complete URL.'
"""Prepare the request after generating the complete URL."""
request.url = self.create_url(request.url)
return super(BaseUrlSession, self).prepare_request(request)

def create_url(self, url):
'Create the URL based off this partial path.'
"""Create the URL based off this partial path."""
return urllib.parse.urljoin(self.base_url, url)


def init_s1_requests_session(api_token: str, base_url: str, skip_ssl_verification: bool=False):
def init_s1_requests_session(api_token: str, base_url: str, skip_ssl_verification: bool):
headers = {'Authorization': 'ApiToken ' + api_token}
global _s1_session
_s1_session = BaseUrlSession(base_url)
Expand All @@ -60,6 +64,7 @@ def init_s1_requests_session(api_token: str, base_url: str, skip_ssl_verificatio
def analyze_by_file(threat_id: str):
download_url, zipp_password = fetch_file(threat_id)
file = download_file(download_url)
_logger.debug('starting to analyze file')
analysis = Analysis(file_stream=file, file_name=f'{threat_id}.zip', zip_password=zipp_password)
return analysis

Expand All @@ -68,12 +73,13 @@ def fetch_file(threat_id: str) -> Tuple[str, Optional[str]]:
zip_password = secrets.token_urlsafe(32)
fetch_file_time = datetime.datetime.utcnow() - datetime.timedelta(seconds=5)

_logger.debug('sending fetch command to the endpoint')
response = _s1_session.post('/web/api/v2.1/threats/fetch-file',
json={'data': {'password': zip_password}, 'filter': {'ids': [threat_id]}})
assert_s1_response(response)

for c in range(20):
_logger.debug(f'starting to fetch file with request number {c}')
for count in range(20):
_logger.debug(f'waiting for s1 to fetch the file from the endpoint ({count})')
time.sleep(10)
response = _s1_session.get('/web/api/v2.1/activities',
params={'threatIds': threat_id,
Expand All @@ -87,20 +93,18 @@ def fetch_file(threat_id: str) -> Tuple[str, Optional[str]]:
if download_url:
return download_url, zip_password
else:
err_msg = 'Time out fetching the file, this is most likely when the endpoint is powered off' \
'or the agent is shut down'
err_msg = ('Time out fetching the file, this is most likely when the endpoint is powered off'
'or the agent is shut down')

_logger.debug(err_msg)
raise Exception(err_msg)


def download_file(download_url: str):
_logger.debug(f'starting to download file from s1 with download url of {download_url}')
_logger.debug(f'downloading file from s1 (download url of {download_url})')
response = _s1_session.get('/web/api/v2.1' + download_url)
_logger.debug(f'got this response from s1 - {response}')

assert_s1_response(response)
_logger.debug(f'assert s1 response finished successfully')
_logger.debug(f'download finished')

file = io.BytesIO(response.content)
return file
Expand Down Expand Up @@ -139,8 +143,8 @@ def filter_threat(threat_info: dict) -> bool:
return threat_info['agentDetectionInfo']['agentOsName'].lower().startswith(('linux', 'windows'))


def send_note(threat_id: str, analysis: Analysis, no_emojis: bool):
note = get_analysis_summary(analysis, no_emojis)
def send_note(threat_id: str, analysis: Analysis):
note = util.get_analysis_summary(analysis)

response = _s1_session.post('/web/api/v2.1/threats/notes',
json={'data': {'text': note}, 'filter': {'ids': [threat_id]}})
Expand All @@ -153,12 +157,11 @@ def send_failure_note(note: str, threat_id: str):
assert_s1_response(response)


def analyze_threat(intezer_api_key: str, s1_api_key: str, s1_base_address: str, threat_id: str, skip_ssl_verification: bool=False, no_emojis: bool=False):
api.set_global_api(intezer_api_key)
init_s1_requests_session(s1_api_key, s1_base_address, skip_ssl_verification)
def analyze_threat(threat_id: str, threat: dict = None):
_logger.info(f'incoming threat: {threat_id}')
try:
threat = get_threat(threat_id)
if not threat:
threat = get_threat(threat_id)
if not filter_threat(threat):
_logger.info(f'threat {threat_id} is been filtered')
return
Expand All @@ -176,15 +179,14 @@ def analyze_threat(intezer_api_key: str, s1_api_key: str, s1_base_address: str,
analysis = None

if not analysis:
_logger.debug('starting to analyze file')
analysis = analyze_by_file(threat_id)
analysis.send(requester='s1')

_logger.debug('waiting for analysis completion')
analysis.wait_for_completion()
_logger.debug('analysis completed')

send_note(threat_id, analysis, no_emojis)
send_note(threat_id, analysis)
except Exception as ex:
send_failure_note(str(ex), threat_id)

Expand All @@ -197,22 +199,52 @@ def parse_argparse_args():
parser.add_argument('-i', '--intezer-api-key', help='Intezer API key', required=True)
parser.add_argument('-s', '--s1-api-key', help='S1 API Key', required=True)
parser.add_argument('-a', '--s1-base-address', help='S1 base address', required=True)
parser.add_argument('-t', '--threat-id', help='S1 threat id', required=True)
parser.add_argument('-sv', '--skip-ssl-verification', action='store_true',
help='Skipping SSL verification on S1 request')
parser.add_argument('-ne', '--no-emojis', action='store_true', help="Don't show emojis")
subparser_options = {}
if sys.version_info >= (3, 7):
subparser_options['required'] = True

subparsers = parser.add_subparsers(title='valid subcommands', dest='subcommand', **subparser_options)
threat_parser = subparsers.add_parser('threat', help='Get a threat ID and analyze it')
threat_parser.add_argument('threat_id', help='SentinelOne threat id')
subparsers.add_parser('query', help='Analyze new incoming threat')

return parser.parse_args()


if __name__ == '__main__':
args = parse_argparse_args()
def _init_logger():
_logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter('%(asctime)s %(levelname)s: %(message)s'))
_logger.addHandler(handler)

analyze_threat(args.intezer_api_key,
args.s1_api_key,
args.s1_base_address,
args.threat_id,
args.skip_ssl_verification,
args.no_emojis)

def query_threats():
next_time_query = datetime.datetime.utcnow()
while True:
_logger.info('checking for new threats...')
response = _s1_session.get('/web/api/v2.1/threats', params={'createdAt__gte': next_time_query.isoformat()})
next_time_query = datetime.datetime.utcnow()
assert_s1_response(response)
threats = response.json()['data']
for threat in threats:
analyze_threat(threat['id'], threat)

if not threats:
_logger.info('no new threats found')
time.sleep(10)


if __name__ == '__main__':
_args = parse_argparse_args()
api.set_global_api(_args.intezer_api_key)
init_s1_requests_session(_args.s1_api_key, _args.s1_base_address, _args.skip_ssl_verification)
_init_logger()
if _args.subcommand == 'threat':
analyze_threat(_args.threat_id)
elif _args.subcommand == 'query':
query_threats()
else:
print('error: the following arguments are required: subcommand')
sys.exit(1)
2 changes: 1 addition & 1 deletion intezer_sdk/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '1.6.1'
__version__ = '1.6.2'
8 changes: 5 additions & 3 deletions intezer_sdk/util.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import collections
import itertools
from typing import List
from typing import Optional
from typing import Tuple
Expand All @@ -14,7 +15,7 @@
}


def get_analysis_summary(analysis: Analysis, no_emojis: bool=False) -> str:
def get_analysis_summary(analysis: Analysis, no_emojis: bool = False) -> str:
result = analysis.result()

metadata = analysis.get_root_analysis().metadata
Expand Down Expand Up @@ -96,21 +97,22 @@ def get_analysis_family(analysis: Analysis, software_type_priorities: List[str])
def get_analysis_family_by_family_id(analysis: Analysis, family_id: str) -> int:
reused_gene_count = 0

for sub_analysis in analysis.get_sub_analyses():
for sub_analysis in itertools.chain([analysis.get_root_analysis()], analysis.get_sub_analyses()):
if not sub_analysis.code_reuse:
continue

for family in sub_analysis.code_reuse['families']:
if family['family_id'] == family_id:
if family['reused_gene_count'] > reused_gene_count:
reused_gene_count = family['reused_gene_count']
break

return reused_gene_count


def find_largest_family(analysis: Analysis) -> dict:
largest_family_by_software_type = collections.defaultdict(lambda: {'reused_gene_count': 0})
for sub_analysis in analysis.get_sub_analyses():
for sub_analysis in itertools.chain([analysis.get_root_analysis()], analysis.get_sub_analyses()):
for family in sub_analysis.code_reuse['families']:
software_type = family['family_type']
if family['reused_gene_count'] > largest_family_by_software_type[software_type]['reused_gene_count']:
Expand Down

0 comments on commit c4b9b1c

Please sign in to comment.