Skip to content

Commit

Permalink
Merge pull request #139 from intezer/feat/add-search-alerts-history
Browse files Browse the repository at this point in the history
add alerts history with filters TKT-3002
  • Loading branch information
ofirit authored Mar 27, 2024
2 parents 947b700 + 1774cbb commit 7e71658
Show file tree
Hide file tree
Showing 10 changed files with 371 additions and 59 deletions.
4 changes: 4 additions & 0 deletions CHANGES
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
1.20
_______
- Support getting alerts history with filters from server.

1.19.17
_______
- Add sandbox_machine_type to FileAnalysis
Expand Down
14 changes: 13 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -242,13 +242,25 @@ for analyse in history_results:
print(analyse)
```

### Get alert by id
### Alerts
#### Get alert by id
```python
alert = Alert.from_id(alert_id=alert_id,
fetch_scans=False,
wait=False)
```

#### Alerts History

```python
history_results = query_file_analyses_history(
api = <IntezerApi>,
**filters
)
for analyse in history_results:
print(analyse)
```

## Code examples
You can find more code examples under [analyze-python-sdk/examples/](https://github.com/intezer/analyze-python-sdk/tree/master/examples) directory

21 changes: 21 additions & 0 deletions examples/get_alerts_history.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import datetime
import sys
from pprint import pprint

from intezer_sdk.alerts import query_alerts_history
from intezer_sdk import api


def alerts_history_example(start_date: datetime.datetime, end_date: datetime.datetime):
results = query_alerts_history(start_time=start_date, end_time=end_date)
for result in results:
pprint(result)


def main(args):
api.set_global_api('<api_key>')
alerts_history_example(**args)


if __name__ == '__main__':
main(sys.argv[1:])
2 changes: 1 addition & 1 deletion intezer_sdk/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '1.19.18'
__version__ = '1.20'
192 changes: 192 additions & 0 deletions intezer_sdk/alerts.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import time
import datetime
from io import BytesIO
from typing import Any
from typing import BinaryIO

import requests
Expand All @@ -14,6 +15,7 @@
from typing import Optional

from intezer_sdk._api import IntezerApi
from intezer_sdk.alerts_results import AlertsHistoryResult
from intezer_sdk.analysis import FileAnalysis
from intezer_sdk.analysis import UrlAnalysis
from intezer_sdk.endpoint_analysis import EndpointAnalysis
Expand All @@ -22,6 +24,11 @@
from intezer_sdk.api import get_global_api
from intezer_sdk import errors
from intezer_sdk import consts
from intezer_sdk.util import add_filter

DEFAULT_LIMIT = 100
DEFAULT_OFFSET = 0
ALERTS_SEARCH_REQUEST = '/alerts/search'


def get_alerts_by_alert_ids(alert_ids: List[str],
Expand All @@ -40,6 +47,191 @@ def get_alerts_by_alert_ids(alert_ids: List[str],
return result['alerts_count'], result['alerts']


def generate_alerts_history_search_filters(*,
start_time: datetime.datetime = None,
end_time: datetime.datetime = None,
environments: List[str] = None,
offset: int = None,
limit: int = None,
sources: List[str] = None,
risk_categories: List[str] = None,
alert_verdicts: List[str] = None,
family_names: List[str] = None,
response_statuses: List[str] = None,
hostnames: List[str] = None,
free_text: str = None,
site_name: str = None,
account_name: str = None,
exclude_alert_ids: List[str] = None,
usernames: List[str] = None,
file_hashes: List[str] = None,
process_commandlines: List[str] = None,
sort_by: List[str] = None,
is_mitigated: bool = None,
email_sender: str = None,
email_recipient: str = None,
email_subject: str = None,
email_cc: str = None,
email_bcc: str = None,
email_message_id: str = None,
email_reported_by: str = None,
device_private_ips: List[str] = None,
device_external_ips: List[str] = None,
device_ids: List[str] = None,
time_filter_type: List[str] = None,
sort_order: str = None) -> Dict[str, Any]:
filters = {}
if start_time:
filters['start_time'] = start_time.timestamp()
if end_time:
filters['end_time'] = end_time.timestamp()
add_filter(filters, 'environments', environments)
add_filter(filters, 'offset', offset)
add_filter(filters, 'limit', limit)
add_filter(filters, 'sources', sources)
add_filter(filters, 'risk_categories', risk_categories)
add_filter(filters, 'alert_verdicts', alert_verdicts)
add_filter(filters, 'family_names', family_names)
add_filter(filters, 'response_statuses', response_statuses)
add_filter(filters, 'hostnames', hostnames)
add_filter(filters, 'free_text', free_text)
add_filter(filters, 'site_name', site_name)
add_filter(filters, 'account_name', account_name)
add_filter(filters, 'exclude_alert_ids', exclude_alert_ids)
add_filter(filters, 'usernames', usernames)
add_filter(filters, 'file_hashes', file_hashes)
add_filter(filters, 'process_commandlines', process_commandlines)
add_filter(filters, 'sort_by', sort_by)
add_filter(filters, 'is_mitigated', is_mitigated)
add_filter(filters, 'email_sender', email_sender)
add_filter(filters, 'email_recipient', email_recipient)
add_filter(filters, 'email_subject', email_subject)
add_filter(filters, 'email_cc', email_cc)
add_filter(filters, 'email_bcc', email_bcc)
add_filter(filters, 'email_message_id', email_message_id)
add_filter(filters, 'email_reported_by', email_reported_by)
add_filter(filters, 'device_private_ips', device_private_ips)
add_filter(filters, 'device_external_ips', device_external_ips)
add_filter(filters, 'device_ids', device_ids)
add_filter(filters, 'time_filter_type', time_filter_type)
add_filter(filters, 'sort_order', sort_order)

return filters


def query_alerts_history(*,
start_time: datetime.datetime = None,
end_time: datetime.datetime = None,
api: IntezerApiClient = None,
environments: List[str] = None,
offset: int = DEFAULT_OFFSET,
limit: int = DEFAULT_LIMIT,
sources: List[str] = None,
risk_categories: List[str] = None,
alert_verdicts: List[str] = None,
family_names: List[str] = None,
response_statuses: List[str] = None,
hostnames: List[str] = None,
free_text: str = None,
site_name: str = None,
account_name: str = None,
exclude_alert_ids: List[str] = None,
usernames: List[str] = None,
file_hashes: List[str] = None,
process_commandlines: List[str] = None,
sort_by: List[str] = None,
is_mitigated: bool = None,
email_sender: str = None,
email_recipient: str = None,
email_subject: str = None,
email_cc: str = None,
email_bcc: str = None,
email_message_id: str = None,
email_reported_by: str = None,
device_private_ips: List[str] = None,
device_external_ips: List[str] = None,
device_ids: List[str] = None,
time_filter_type: List[str] = None,
sort_order: str = None) -> AlertsHistoryResult:
"""
Query for alerts history with query param.
:param environments: Query alerts only from these environments.
:param offset: Offset to start querying from - used for pagination.
:param limit: Maximum number of alerts to return - used for pagination.
:param start_time: Query alerts that were created after this timestamp (in UTC).
:param end_time: Query alerts that were created before this timestamp (in UTC).
:param api: Instance of Intezer API for request server.
:param sources: Query alerts only with these sources.
:param risk_categories: Query alerts only with these risk categories.
:param alert_verdicts: Query alerts only with these alert verdicts.
:param family_names: Query alerts only with these family names.
:param response_statuses: Query alerts only with these response statuses.
:param hostnames: Query alerts only with these hostnames.
:param free_text: Query alerts that contain this text in the following fields: family name, hostname, alert verdict.
:param site_name: Query alerts only with this site name.
:param account_name: Query alerts only with this account name.
:param exclude_alert_ids: Query alerts that do not have these alert ids.
:param usernames: Query alerts only with these usernames.
:param file_hashes: Query alerts only with these file hashes.
:param process_commandlines: Query alerts only with these process commandlines.
:param is_mitigated: Query alerts only with this is_mitigated value.
:param email_sender: Query alerts only with these email sender.
:param email_recipient: Query alerts only with these email recipient.
:param email_subject: Query alerts only with this email subject.
:param email_cc: Query alerts only with this email cc.
:param email_bcc: Query alerts only with this email bcc.
:param email_message_id: Query alerts only with this email message id.
:param email_reported_by: Query alerts only with this email reported by.
:param device_private_ips: Query alerts only with these private ips.
:param device_external_ips: Query alerts only with these external ips.
:param device_ids: Query alerts only with these device ids.
:param time_filter_type: The time value to filter alerts by (creation_time / triage_time / triage_change_time / triage_or_triage_change_time).
:param sort_order: The order to sort the alerts by (asc / desc).
:param sort_by: Sort alerts only with this sort_by_key value (CREATION_TIME / TRIAGE_TIME / TRIAGE_CHANGE_TIME ).
:return: Alert query result from server as Results iterator.
"""
api = api or get_global_api()
api.assert_any_on_premise()
filters = generate_alerts_history_search_filters(
start_time=start_time,
end_time=end_time,
environments=environments,
offset=offset,
limit=limit,
sources=sources,
risk_categories=risk_categories,
alert_verdicts=alert_verdicts,
family_names=family_names,
response_statuses=response_statuses,
hostnames=hostnames,
free_text=free_text,
site_name=site_name,
account_name=account_name,
exclude_alert_ids=exclude_alert_ids,
usernames=usernames,
file_hashes=file_hashes,
process_commandlines=process_commandlines,
sort_by=sort_by,
is_mitigated=is_mitigated,
email_sender=email_sender,
email_recipient=email_recipient,
email_subject=email_subject,
email_cc=email_cc,
email_bcc=email_bcc,
email_message_id=email_message_id,
email_reported_by=email_reported_by,
device_private_ips=device_private_ips,
device_external_ips=device_external_ips,
device_ids=device_ids,
time_filter_type=time_filter_type,
sort_order=sort_order
)

return AlertsHistoryResult(ALERTS_SEARCH_REQUEST, api, filters)


class Alert:
"""
The Alert class is used to represent an alert from the Intezer Analyze API.
Expand Down
30 changes: 30 additions & 0 deletions intezer_sdk/alerts_results.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from typing import List
from typing import Dict
from typing import Tuple

from intezer_sdk.api import IntezerApiClient
from intezer_sdk.api import raise_for_status
from intezer_sdk.history_results import HistoryResult


class AlertsHistoryResult(HistoryResult):

def __init__(self, request_url_path: str, api: IntezerApiClient, filters: Dict):
"""
Fetch all alerts history results from server.
"""
super().__init__(request_url_path, api, filters)

def _fetch_history(self, url_path: str, data: Dict) -> Tuple[int, List]:
"""
Request from server filtered alerts history.
:param url_path: Url to request new data from.
:param data: filtered data.
:return: Count of all results exits in filtered request and amount
alerts as requested.
"""
response = self._api.request_with_refresh_expired_access_token(
path=url_path, method='POST', data=data)
raise_for_status(response)
data_response = response.json()['result']
return data_response['alerts_count'], data_response['alerts']
61 changes: 4 additions & 57 deletions intezer_sdk/analyses_results.py
Original file line number Diff line number Diff line change
@@ -1,73 +1,20 @@
from typing import List
from typing import Any
from typing import Dict
from typing import Tuple

from intezer_sdk.api import IntezerApiClient
from intezer_sdk.api import raise_for_status
from intezer_sdk.history_results import HistoryResult


class AnalysesHistoryResult:
class AnalysesHistoryResult(HistoryResult):
def __init__(self, request_url_path: str, api: IntezerApiClient, filters: Dict):
"""
Fetch all analyses history results from server.
:param request_url_path: Url to request new filter from.
:param api: Instance of Intezer API for request server.
:param filters: Filters requested from server.
"""
self._api = api
self.filters: Dict = filters
self._pages: List[Any] = []
self._current_page: List[Any] = []
self._request_url_path: str = request_url_path
self._total_count: int = 0
self._current_offset: int = 0

def __iter__(self):
"""Iterate between page."""
if not self._current_page:
self._fetch_page()
if self._current_page:
yield from self._current_page
if len(self._pages) * self.filters['limit'] < self._total_count:
self._fetch_page()
yield from iter(self)

def __len__(self) -> int:
"""Amount of results fetched currently."""
return self._total_count

@property
def current_page(self) -> list:
"""Get current page, if not exits, ask a new one from server."""
return self._current_page or self._fetch_page()

def all(self) -> list:
"""List all remaining and exists analysis's from server."""
results = list(self)
if self._pages:
self._current_page = self._pages[0]
return results

def _fetch_page(self) -> list:
"""Request for new page from server."""
self.filters['offset'] = self._current_offset
self._total_count, new_page = self._fetch_analyses_history(
self._request_url_path, self.filters)

if new_page:
self._current_page = new_page
self._update_current_page_metadata()
return self._current_page

def _update_current_page_metadata(self):
"""Update all metadata about new current page."""
self._current_offset += len(self._current_page)
self._pages.append(self._current_page)
super().__init__(request_url_path, api, filters)

def _fetch_analyses_history(self, url_path: str, data: Dict
) -> Tuple[int, List]:
def _fetch_history(self, url_path: str, data: Dict) -> Tuple[int, List]:
"""
Request from server filtered analyses history.
:param url_path: Url to request new data from.
Expand Down
Loading

0 comments on commit 7e71658

Please sign in to comment.