1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
use polars_core::prelude::*;
use rayon::prelude::*;
use super::*;
use crate::physical_plan::planner::create_physical_expr;
use crate::physical_plan::state::ExecutionState;
use crate::prelude::*;
pub trait ExprEvalExtension: IntoExpr + Sized {
fn cumulative_eval(self, expr: Expr, min_periods: usize, parallel: bool) -> Expr {
let this = self.into_expr();
let expr2 = expr.clone();
let func = move |mut s: Series| {
let name = s.name().to_string();
s.rename("");
let expr = expr.clone();
let mut arena = Arena::with_capacity(10);
let aexpr = to_aexpr(expr, &mut arena);
let phys_expr = create_physical_expr(aexpr, Context::Default, &arena, None)?;
let state = ExecutionState::new();
let finish = |out: Series| {
if out.len() > 1 {
Err(PolarsError::ComputeError(
format!(
"expected single value, got a result with length: {}, {:?}",
out.len(),
out
)
.into(),
))
} else {
Ok(out.get(0).unwrap().into_static().unwrap())
}
};
let avs = if parallel {
(1..s.len() + 1)
.into_par_iter()
.map(|len| {
let s = s.slice(0, len);
if (len - s.null_count()) >= min_periods {
let df = DataFrame::new_no_checks(vec![s]);
let out = phys_expr.evaluate(&df, &state)?;
finish(out)
} else {
Ok(AnyValue::Null)
}
})
.collect::<PolarsResult<Vec<_>>>()?
} else {
let mut df_container = DataFrame::new_no_checks(vec![]);
(1..s.len() + 1)
.map(|len| {
let s = s.slice(0, len);
if (len - s.null_count()) >= min_periods {
df_container.get_columns_mut().push(s);
let out = phys_expr.evaluate(&df_container, &state)?;
df_container.get_columns_mut().clear();
finish(out)
} else {
Ok(AnyValue::Null)
}
})
.collect::<PolarsResult<Vec<_>>>()?
};
Ok(Series::new(&name, avs))
};
this.apply(
func,
GetOutput::map_field(move |f| {
let dtype = f
.data_type()
.inner_dtype()
.cloned()
.unwrap_or_else(|| f.data_type().clone());
let df = Series::new_empty("", &dtype).into_frame();
#[cfg(feature = "python")]
let out = {
use pyo3::Python;
Python::with_gil(|py| {
py.allow_threads(|| df.lazy().select([expr2.clone()]).collect())
})
};
#[cfg(not(feature = "python"))]
let out = { df.lazy().select([expr2.clone()]).collect() };
match out {
Ok(out) => {
let dtype = out.get_columns()[0].dtype();
Field::new(f.name(), dtype.clone())
}
Err(_) => Field::new(f.name(), DataType::Null),
}
}),
)
.with_fmt("expanding_eval")
}
}
impl ExprEvalExtension for Expr {}