Skip to content

Commit

Permalink
feat: add iocs, dynamic ttps, capabilities (#21)
Browse files Browse the repository at this point in the history
  • Loading branch information
davidt99 authored Dec 29, 2021
1 parent f9aec49 commit c52e763
Show file tree
Hide file tree
Showing 8 changed files with 263 additions and 28 deletions.
5 changes: 4 additions & 1 deletion CHANGES
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
Not released
1.5
------
- Add family search
- Support for zip password
- Add iocs and dynamic ttps to analysis
- Add capabilities to sub analysis

1.4.5.2
------
Expand Down
2 changes: 1 addition & 1 deletion intezer_sdk/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '1.4.5.2'
__version__ = '1.5'
54 changes: 48 additions & 6 deletions intezer_sdk/analysis.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import datetime
import logging
import os
import time
import typing
from http import HTTPStatus
Expand All @@ -8,7 +9,8 @@
from intezer_sdk import errors
from intezer_sdk.api import IntezerApi
from intezer_sdk.api import get_global_api
from intezer_sdk.consts import CodeItemType, AnalysisStatusCode
from intezer_sdk.consts import AnalysisStatusCode
from intezer_sdk.consts import CodeItemType
from intezer_sdk.sub_analysis import SubAnalysis

logger = logging.getLogger(__name__)
Expand All @@ -23,7 +25,8 @@ def __init__(self,
disable_static_unpacking: bool = None,
api: IntezerApi = None,
file_name: str = None,
code_item_type: str = None) -> None:
code_item_type: str = None,
zip_password: str = None) -> None:
if [file_path, file_hash, file_stream].count(None) != 2:
raise ValueError('Choose between file hash, file stream or file path analysis')

Expand All @@ -42,28 +45,45 @@ def __init__(self,
self._file_stream = file_stream
self._file_name = file_name
self._code_item_type = code_item_type
self._zip_password = zip_password
self._report = None
self._api = api or get_global_api()
self._sub_analyses = None
self._root_analysis = None
self._iocs_report = None
self._dynamic_ttps_report = None

if self._file_path and not self._file_name:
self._file_name = os.path.basename(file_path)

if self._zip_password:
if self._file_name:
if not self._file_name.endswith('.zip'):
self._file_name += '.zip'
else:
self._file_name = 'file.zip'

def send(self,
wait: typing.Union[bool, int] = False,
wait_timeout: typing.Optional[datetime.timedelta] = None) -> None:
wait_timeout: typing.Optional[datetime.timedelta] = None,
**additional_parameters) -> None:
if self.analysis_id:
raise errors.AnalysisHasAlreadyBeenSent()

if self._file_hash:
self.analysis_id = self._api.analyze_by_hash(self._file_hash,
self._disable_dynamic_unpacking,
self._disable_static_unpacking)
self._disable_static_unpacking,
**additional_parameters)
else:
self.analysis_id = self._api.analyze_by_file(self._file_path,
self._file_stream,
disable_dynamic_unpacking=self._disable_dynamic_unpacking,
disable_static_unpacking=self._disable_static_unpacking,
file_name=self._file_name,
code_item_type=self._code_item_type)
code_item_type=self._code_item_type,
zip_password=self._zip_password,
**additional_parameters)

self.status = consts.AnalysisStatusCode.CREATED

Expand Down Expand Up @@ -137,7 +157,7 @@ def get_sub_analyses(self):
self._init_sub_analyses()
return self._sub_analyses

def get_root_analysis(self):
def get_root_analysis(self) -> SubAnalysis:
if self._root_analysis is None and self.analysis_id:
self._init_sub_analyses()
return self._root_analysis
Expand All @@ -159,6 +179,28 @@ def _init_sub_analyses(self):
def download_file(self, path: str):
self._api.download_file_by_sha256(self.result()['sha256'], path)

@property
def iocs(self) -> dict:
self._assert_analysis_finished()
if not self._iocs_report:
self._iocs_report = self._api.get_iocs(self.analysis_id).json()['result']

return self._iocs_report

@property
def dynamic_ttps(self) -> dict:
self._assert_analysis_finished()
if not self._dynamic_ttps_report:
self._dynamic_ttps_report = self._api.get_dynamic_ttps(self.analysis_id).json()['result']

return self._dynamic_ttps_report

def _assert_analysis_finished(self):
if self._is_analysis_running():
raise errors.AnalysisIsStillRunning()
if self.status != AnalysisStatusCode.FINISH:
raise errors.IntezerError('Analysis not finished successfully')


def get_latest_analysis(file_hash: str, api: IntezerApi = None) -> typing.Optional[Analysis]:
api = api or get_global_api()
Expand Down
55 changes: 46 additions & 9 deletions intezer_sdk/api.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
import typing
from http import HTTPStatus
from typing import Optional

import requests
import requests.adapters
Expand Down Expand Up @@ -104,9 +105,10 @@ def _request_with_refresh_expired_access_token(self,

def analyze_by_hash(self,
file_hash: str,
disable_dynamic_unpacking: bool = None,
disable_static_unpacking: bool = None) -> str:
data = self._param_initialize(disable_dynamic_unpacking, disable_static_unpacking)
disable_dynamic_unpacking: Optional[bool],
disable_static_unpacking: Optional[bool],
**additional_parameters) -> str:
data = self._param_initialize(disable_dynamic_unpacking, disable_static_unpacking, **additional_parameters)

data['hash'] = file_hash
response = self._request_with_refresh_expired_access_token(path='/analyze-by-hash', data=data, method='POST')
Expand All @@ -132,14 +134,20 @@ def analyze_by_file(self,
disable_dynamic_unpacking: bool = None,
disable_static_unpacking: bool = None,
file_name: str = None,
code_item_type: str = None) -> typing.Optional[str]:
options = self._param_initialize(disable_dynamic_unpacking, disable_static_unpacking, code_item_type)
code_item_type: str = None,
zip_password: str = None,
**additional_parameters) -> typing.Optional[str]:
options = self._param_initialize(disable_dynamic_unpacking,
disable_static_unpacking,
code_item_type,
zip_password,
**additional_parameters)

if file_stream:
return self._analyze_file_stream(file_stream, file_name, options)

with open(file_path, 'rb') as file_to_upload:
return self._analyze_file_stream(file_to_upload, file_name or os.path.basename(file_path), options)
return self._analyze_file_stream(file_to_upload, file_name, options)

def get_latest_analysis(self, file_hash: str) -> typing.Optional[dict]:
response = self._request_with_refresh_expired_access_token(path='/files/{}'.format(file_hash), method='GET')
Expand All @@ -158,6 +166,20 @@ def get_analysis_response(self, analyses_id: str) -> Response:

return response

def get_iocs(self, analyses_id: str) -> Response:
response = self._request_with_refresh_expired_access_token(path='/analyses/{}/iocs'.format(analyses_id),
method='GET')
raise_for_status(response)

return response

def get_dynamic_ttps(self, analyses_id: str):
response = self._request_with_refresh_expired_access_token(path='/analyses/{}/dynamic-ttps'.format(analyses_id),
method='GET')
raise_for_status(response)

return response

def get_family_info(self, family_id: str) -> typing.Optional[dict]:
response = self._request_with_refresh_expired_access_token('GET', '/families/{}/info'.format(family_id))
if response.status_code == HTTPStatus.NOT_FOUND:
Expand Down Expand Up @@ -225,6 +247,15 @@ def get_sub_analysis_account_related_samples_by_id(self, composed_analysis_id: s

return response.json()['result_url']

def get_sub_analysis_capabilities_by_id(self, composed_analysis_id: str, sub_analysis_id: str) -> str:
response = self._request_with_refresh_expired_access_token(
path='/analyses/{}/sub-analyses/{}/capabilities'.format(composed_analysis_id, sub_analysis_id),
method='POST')

raise_for_status(response)

return response.json()['result_url']

def generate_sub_analysis_vaccine_by_id(self, composed_analysis_id: str, sub_analysis_id: str) -> str:
response = self._request_with_refresh_expired_access_token(
path='/analyses/{}/sub-analyses/{}/generate-vaccine'.format(composed_analysis_id, sub_analysis_id),
Expand Down Expand Up @@ -339,9 +370,11 @@ def set_session(self):
self._session.headers['User-Agent'] = consts.USER_AGENT

@staticmethod
def _param_initialize(disable_dynamic_unpacking: bool = None,
disable_static_unpacking: bool = None,
code_item_type: str = None):
def _param_initialize(disable_dynamic_unpacking: bool,
disable_static_unpacking: bool,
code_item_type: str = None,
zip_password: str = None,
**additional_parameters):
data = {}

if disable_dynamic_unpacking is not None:
Expand All @@ -350,6 +383,10 @@ def _param_initialize(disable_dynamic_unpacking: bool = None,
data['disable_static_extraction'] = disable_static_unpacking
if code_item_type:
data['code_item_type'] = code_item_type
if zip_password:
data['zip_password'] = zip_password

data.update(additional_parameters)

return data

Expand Down
20 changes: 16 additions & 4 deletions intezer_sdk/sub_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@

from intezer_sdk.api import IntezerApi
from intezer_sdk.api import get_global_api
from intezer_sdk.operation import Operation
from intezer_sdk.consts import AnalysisStatusCode
from intezer_sdk.operation import Operation


class SubAnalysis:
Expand All @@ -16,6 +16,7 @@ def __init__(self, analysis_id: str, composed_analysis_id: str, sha256: str, sou
self._api = api or get_global_api()
self._code_reuse = None
self._metadata = None
self._capabilities = None
self._operations = {}

@property
Expand All @@ -34,13 +35,16 @@ def find_related_files(self,
family_id: str,
wait: typing.Union[bool, int] = False,
wait_timeout: typing.Optional[datetime.timedelta] = None) -> Operation:
result_url = self._api.get_sub_analysis_related_files_by_family_id(self.composed_analysis_id, self.analysis_id, family_id)
result_url = self._api.get_sub_analysis_related_files_by_family_id(self.composed_analysis_id,
self.analysis_id,
family_id)
return self._handle_operation(family_id, result_url, wait, wait_timeout)

def get_account_related_samples(self,
wait: typing.Union[bool, int] = False,
wait_timeout: typing.Optional[datetime.timedelta] = None) -> Operation:
result_url = self._api.get_sub_analysis_account_related_samples_by_id(self.composed_analysis_id, self.analysis_id)
result_url = self._api.get_sub_analysis_account_related_samples_by_id(self.composed_analysis_id,
self.analysis_id)
return self._handle_operation('Account related samples', result_url, wait, wait_timeout)

def generate_vaccine(self,
Expand All @@ -49,6 +53,12 @@ def generate_vaccine(self,
result_url = self._api.generate_sub_analysis_vaccine_by_id(self.composed_analysis_id, self.analysis_id)
return self._handle_operation('Vaccine', result_url, wait, wait_timeout)

def get_capabilities(self,
wait: typing.Union[bool, int] = False,
wait_timeout: typing.Optional[datetime.timedelta] = None) -> Operation:
result_url = self._api.get_sub_analysis_capabilities_by_id(self.composed_analysis_id, self.analysis_id)
return self._handle_operation('Capabilities', result_url, wait, wait_timeout)

def get_strings(self,
wait: typing.Union[bool, int] = False,
wait_timeout: typing.Optional[datetime.timedelta] = None) -> Operation:
Expand All @@ -59,7 +69,9 @@ def get_string_related_samples(self,
string_value: str,
wait: typing.Union[bool, int] = False,
wait_timeout: typing.Optional[datetime.timedelta] = None) -> Operation:
result_url = self._api.get_string_related_samples_by_id(self.composed_analysis_id, self.analysis_id, string_value)
result_url = self._api.get_string_related_samples_by_id(self.composed_analysis_id,
self.analysis_id,
string_value)
return self._handle_operation(string_value, result_url, wait, wait_timeout)

def _handle_operation(self,
Expand Down
4 changes: 2 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
requests>=2.22.0,<3
responses==0.13.2
pytest==6.1.2
responses==0.16.0
pytest==6.2.5
7 changes: 4 additions & 3 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,14 @@ def rel(*xs):
install_requires=install_requires,
keywords='intezer',
tests_requires=[
'responses == 0.13.2',
'pytest == 6.1.2'
'responses == 0.16.0',
'pytest == 6.2.5'
],
python_requires='!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*',
classifiers=[
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8']
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9']
)
Loading

0 comments on commit c52e763

Please sign in to comment.