1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113 | """Utils for piping normalization functions."""
from collections.abc import Callable
from typing import Literal, NamedTuple, Optional
import pandas as pd
from pandera.pandas import DataFrameModel, Field
from pandera.typing.pandas import DataFrame, Series
Precision = Literal["day", "month", "year"]
class Date(NamedTuple):
"""Not completely normalized date, but the day, the month and the year are already separated by /."""
start_date: str # d/m/y
end_date: str # d/m/y
precision: Precision
class Duration(NamedTuple):
"""Tuple to represent a uniform duration."""
nb: int
unit: Precision
class RawInterventionDataForDateNormalization(DataFrameModel):
"""This is the schema of usefull columns for normalizing the intervention dates."""
idscheda: Series[int]
data_protocollo: Optional[Series[str]] = Field(nullable=True) # noqa: UP045
data_intervento: Series[str]
anno: Series[int]
norm_duration: Optional[tuple[int, Precision]] = Field(nullable=True) # noqa: UP045
class InterventionDataForDateNormalization(DataFrameModel):
"""This is the schema of usefull columns for normalizing the intervention dates."""
idscheda: Series[int]
data_protocollo: Optional[Series[str]] = Field(nullable=True) # noqa: UP045
data_intervento: Series[str]
anno: Series[pd.Int32Dtype]
norm_duration: Optional[tuple[int, Precision]] = Field(nullable=True) # noqa: UP045
norm_date: Optional[Date] = Field(nullable=True) # noqa: UP045
class InterventionDataForDateNormalizationRowSchema(NamedTuple):
"""Row schema of the class above."""
idscheda: int
data_protocollo: str
data_intervento: str
anno: int
norm_duration: Duration | None
norm_date: Date | None
DateProcessor = Callable[
[InterventionDataForDateNormalizationRowSchema], Date | None
]
def process_if_not_yet(
row: InterventionDataForDateNormalizationRowSchema, fn: DateProcessor
) -> Date | None:
"""For each row not processed yet, apply a normalization function.
This normalization function try to normalize if the humanly-input date
matches with patterns that it supports. Else, it returns None.
"""
current_answer = row.norm_date
if current_answer is not None:
return current_answer
return fn(row)
def pipe(
s: DataFrame[RawInterventionDataForDateNormalization],
functions: tuple[DateProcessor, ...],
) -> DataFrame[InterventionDataForDateNormalization]:
"""Apply to the raw date df a range of normalization functions.
This functions tries to cover a maximum of humanly-input dates.
"""
def pipe_aux(
s: DataFrame[InterventionDataForDateNormalization],
functions: tuple[DateProcessor, ...],
):
if not functions:
return s
return pipe_aux(
s.assign(
norm_date=lambda df: df.apply(
lambda row: process_if_not_yet(
InterventionDataForDateNormalizationRowSchema(
*tuple(row)
),
functions[0],
),
axis=1,
)
),
(*functions[1:],),
)
return pipe_aux(
InterventionDataForDateNormalization.validate(
s.assign(norm_date=None), lazy=True
),
functions,
)
|