MEGR-APT: A Memory-Efficient APT Hunting System Based on Attack Representation Learning
Year: 2024, Author: Department of Computer Science and Software Engineering, Concordia University
Abstract:
The stealthy and persistent nature of Advanced Persistent Threats (APTs) makes them one of the most challenging cyber threats to uncover. Several systems adopted the development of provenance-graph-based security solutions to capture this persistent nature. Provenance graphs (PGs) represent system audit logs by connecting system entities using causal relations and information flows. Hunting APTs demands the processing of ever-growing large-scale PGs of audit logs for a wide range of activities over months or years, i.e., multiterabyte graphs. Existing APT hunting systems are typically memory-based, which suffers colossal memory consumption, or disk-based, which suffers from performance hits. Therefore, these systems are hard to scale in terms of graph size or time performance. In this paper, we propose MEGR-APT, a scalable APT hunting system to discover suspicious subgraphs matching an attack scenario (query graph) published in Cyber Threat Intelligence (CTI) reports. MEGR-APT hunts APTs in a twofold process: (i) memory-efficient extraction of suspicious subgraphs as search queries over a graph database, and (ii) fast subgraph matching based on graph neural network (GNN) and our effective attack representation learning. We compared MEGR-APT with state-of-the-art (SOTA) APT systems using popular APT benchmarks, such as DARPA TC3 and OpTC. We also tested it using a real enterprise dataset. MEGR-APT achieves an order of magnitude reduction in memory consumption while achieving comparable performance to SOTA in terms of time and accuracy.
Record:
一般情况下,我看到好的论文才会单独开一版,而这篇我实际上还没看;
但奈何这篇有源码,赢太多了。
首先需要说明的,也就是我目前最需要的,DARPA TC E3 数据集的溯源图构建;基于这篇论文对TA1-cadets处理的源码(对数据本身的探索详情请见博客内文章Dataset of CyberSecurity的DARPA TC部分),讨论具体的内容:
df_events = pd.read_sql(query_events,db_url, params={"start_timestamp":provenance_graph_start,"end_timestamp":provenance_graph_end}) df_events['type'] = [event.split("EVENT_")[1].lower() if event elseNonefor event in df_events["type"]] ``` 该研究预先将数据存入了sql数据库,暂时我们不讨论有关数据库的调用,只看其提取过程,该部分首先抽取了数据库中指定时间片段的所有时间并存为dataframe,直接给出完整的查询语句: ```SQL SELECT "subject"as subject, "predicate_object"asobject, "uuid"as event, "type" ,"time_stamp_nanos"as timestamp FROM public."Event" WHERE "time_stamp_nanos" BETWEEN %(start_timestamp)s AND %(end_timestamp)s AND uuid IS NOT NULL AND "subject" IS NOT NULL AND "predicate_object"in ( SELECT f.uuid FROM public."FileObject"as f INNER JOIN public."Event"as e on f.uuid = e.predicate_object WHERE time_stamp_nanos BETWEEN %(start_timestamp)s AND %(end_timestamp)s AND f.uuid IS NOT NULL AND prediacte_object_path IS NOT NULL AND prediacte_object_path != '<unknown>' GROUP BY 1 UNION SELECT DISTINCT "uuid" FROM public."NetflowObject" WHERE uuid IS NOT NULL UNION SELECT DISTINCT "uuid" FROM public."UnnamedPipeObject" WHERE uuid IS NOT NULL UNION SELECT DISTINCT "uuid" FROM public."SrcSinkObject" WHERE uuid IS NOT NULL UNION SELECT DISTINCT "uuid" FROM public."Subject" WHERE uuid IS NOT NULL ) UNION SELECT "subject"as subject, "predicate_object_2"asobject,"uuid"as event ,"type","time_stamp_nanos"as timestamp FROM public."Event" WHERE "time_stamp_nanos" BETWEEN %(start_timestamp)s AND %(end_timestamp)s AND uuid IS NOT NULL AND "subject" IS NOT NULL AND "predicate_object_2"in ( SELECT f.uuid FROM public."FileObject"as f INNER JOIN public."Event"as e on f.uuid = e.predicate_object_2 WHERE time_stamp_nanos BETWEEN %(start_timestamp)s AND %(end_timestamp)s AND f.uuid IS NOT NULL AND predicate_object_path_2 IS NOT NULL AND predicate_object_path_2 != '<unknown>' GROUP BY 1 UNION SELECT DISTINCT "uuid" FROM public."NetflowObject" WHERE uuid IS NOT NULL UNION SELECT DISTINCT "uuid" FROM public."UnnamedPipeObject" WHERE uuid IS NOT NULL UNION SELECT DISTINCT "uuid" FROM public."SrcSinkObject" WHERE uuid IS NOT NULL UNION SELECT DISTINCT "uuid" FROM public."Subject" WHERE uuid IS NOT NULL )