use crate::analysis::forward_interprocedural_fixpoint::create_computation;
use crate::analysis::graph::{Edge, Graph, HasCfg};
use crate::analysis::interprocedural_fixpoint_generic::NodeValue;
use crate::analysis::pointer_inference::{
Data as PiData, PointerInference as PointerInferenceComputation,
};
use crate::analysis::taint::{state::State as TaState, TaintAnalysis};
use crate::analysis::vsa_results::{HasVsaResult, VsaResult};
use crate::intermediate_representation::*;
use crate::prelude::*;
use crate::utils::{
log::{CweWarning, LogMessage},
symbol_utils,
};
use crate::CweModule;
use petgraph::visit::EdgeRef;
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::convert::AsRef;
pub static CWE_MODULE: CweModule = CweModule {
name: "CWE337",
version: "0.1",
run: check_cwe,
};
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Hash, Clone)]
pub struct Config {
sources: Vec<String>,
seeding_functions: Vec<String>,
}
pub fn check_cwe(
analysis_results: &AnalysisResults,
cwe_params: &serde_json::Value,
) -> (Vec<LogMessage>, Vec<CweWarning>) {
let project = analysis_results.project;
let config: Config = serde_json::from_value(cwe_params.clone())
.expect("Invalid configuration inside config.json for CWE337.");
let source_map = symbol_utils::get_symbol_map(project, &config.sources[..]);
let sink_map = symbol_utils::get_symbol_map(project, &config.seeding_functions[..]);
if source_map.is_empty() || sink_map.is_empty() {
return (Vec::new(), Vec::new());
}
let pi_result = analysis_results.pointer_inference.unwrap();
let graph = analysis_results.control_flow_graph;
let (cwe_sender, cwe_receiver) = crossbeam_channel::unbounded();
let context = Context {
project: analysis_results.project,
pi_result,
control_flow_graph: graph,
sink_map,
extern_symbol_map: project
.program
.term
.extern_symbols
.iter()
.map(|(tid, sym)| (tid.clone(), sym))
.collect(),
cwe_collector: cwe_sender,
};
let mut computation = create_computation(context, None);
for edge in graph.edge_references() {
let Edge::ExternCallStub(jmp) = edge.weight() else {
continue;
};
let Jmp::Call { target, .. } = &jmp.term else {
continue;
};
let Some(symbol) = source_map.get(target) else {
continue;
};
let return_node = edge.target();
computation.set_node_value(
return_node,
NodeValue::Value(TaState::new_return(symbol, pi_result, return_node)),
);
}
computation.compute_with_max_steps(100);
let mut cwe_warnings = BTreeMap::new();
for cwe in cwe_receiver.try_iter() {
cwe_warnings.insert(cwe.addresses[0].clone(), cwe);
}
let cwe_warnings = cwe_warnings.into_values().collect();
(Vec::new(), cwe_warnings)
}
pub struct Context<'a> {
project: &'a Project,
pi_result: &'a PointerInferenceComputation<'a>,
control_flow_graph: &'a Graph<'a>,
sink_map: HashMap<Tid, &'a ExternSymbol>,
extern_symbol_map: HashMap<Tid, &'a ExternSymbol>,
cwe_collector: crossbeam_channel::Sender<CweWarning>,
}
impl<'a> HasCfg<'a> for Context<'a> {
fn get_cfg(&self) -> &Graph<'a> {
self.control_flow_graph
}
}
impl<'a> HasVsaResult<PiData> for Context<'a> {
fn vsa_result(&self) -> &impl VsaResult<ValueDomain = PiData> {
self.pi_result
}
}
impl<'a> AsRef<Project> for Context<'a> {
fn as_ref(&self) -> &Project {
self.project
}
}
impl<'a> TaintAnalysis<'a> for Context<'a> {
fn update_call_stub(&self, state: &TaState, call: &Term<Jmp>) -> Option<TaState> {
if state.is_empty() {
return None;
}
match &call.term {
Jmp::Call { target, .. } => {
if let Some(sink_symbol) = self.sink_map.get(target) {
if state.check_extern_parameters_for_taint::<true>(
self.vsa_result(),
sink_symbol,
&call.tid,
) {
self.generate_cwe_warning(call, sink_symbol);
None
} else {
Some(self.update_extern_symbol(state, sink_symbol))
}
} else {
let extern_symbol = self
.extern_symbol_map
.get(target)
.expect("Extern symbol not found.");
Some(self.update_extern_symbol(state, extern_symbol))
}
}
Jmp::CallInd { .. } => self.update_call_generic(state, &call.tid, &None),
_ => panic!("Malformed control flow graph encountered."),
}
}
}
impl<'a> Context<'a> {
fn update_extern_symbol(&self, state: &TaState, extern_symbol: &ExternSymbol) -> TaState {
let mut new_state = state.clone();
new_state.remove_non_callee_saved_taint(self.project.get_calling_convention(extern_symbol));
new_state
}
fn generate_cwe_warning(&self, sink_call: &Term<Jmp>, sink_symbol: &ExternSymbol) {
let cwe_warning = CweWarning::new(
CWE_MODULE.name,
CWE_MODULE.version,
format!(
"RNG seed function {} at {} is seeded with predictable seed source.",
sink_symbol.name, sink_call.tid.address,
),
)
.tids(vec![format!("{}", sink_call.tid)])
.addresses(vec![sink_call.tid.address.clone()])
.symbols(vec![sink_symbol.name.clone()]);
let _ = self.cwe_collector.send(cwe_warning);
}
}