module documentation

Helpers with pandas.

Some little helper's for and depending on the data science package pandas.

Class DataContainer A container managing multiple pandas.DataFrame.
Function add_missing_category Add category label for missing values to a categorical data series.
Function create_icd_catalog Creates a ICD Catalog data frame based on original data from BfArM or WHO.
Function cut_bins Cutting values into bins similar to pandas.cut().
Function cut_bins_via_interval_count Cutting values into specific number of bins of the same range.
Function cut_by_row_keep_group Cut a dataframe into pieces row by row but keep groups together.
Function cut_into_pieces Cut a dataframe horizontally into pieces of (nearly) the same size.
Function parallize_job_on_dataframe Use multiprocessing to working on dataframe row by row.
Function read_and_validate_csv Read a CSV file with respect to specifications about format and rules about valid values and return a pandas.DataFrame.
Function read_and_validate_excel Read an Excel file with respect to specifications about format and rules about valid values and return a pandas.DataFrame.
Function reorder_columns Rearrange the columns of a DataFrame.
Function validate_csv Validate the structure of a CSV file.
Function validate_dataframe Validate a dataframes about columnes and expected values.
Function validate_value_rules_on_dataframe Validate if the DataFrame (df_to_check) fit to value_rules. Otherwise an ValueError is raised.
Constant FILENAME_PREFIX_WRONG Prefix used for the WRONG file. See validate_csv() for details.
Constant SERVER_N_CORES Machines with this number of logical cores are treated as servers or crunching machines used by multiple users. See SERVER_USE_CORE_FX for details.
Constant SERVER_USE_CORE_FX When parallelizing tasks using parallize_job_on_dataframe() on a mutli-user server use only this fraction of available cores.
Function _announce_csv_violations No summary
Function _construct_file_path_wrong Constructs a file name suitable for documenting wrong lines.
Function _csv_line_fit_field_length_rules See validate_csv() for details.
Function _explode_rules Convert lists with values and ranges to ranges only.
Function _file_path_as_buffer Using a path or a in-memory file (buffer) with a with statement.
Function _file_path_or_zip_path_as_buffer A workaround to make pandas handle zipfile.Path.
Function _generate_bin_labels Create labels for the associated intervals.
Function _get_original_icd10gm Original WHO ICD-Data as Pandas DataFrames.
Function _parse_specs_and_rules Parse, validate and separate the specs_and_rules argument used in read_and_validate_csv().
Function _replace_lines_and_substrings Replace lines and substrings in the content of a file or buffer.
Function _store_spec_na_values What is wrong here?
Variable _log Undocumented
def add_missing_category(data: pandas.Series, missing_label: str, insert_at_end: bool = True) -> pandas.Series: (source)

Add category label for missing values to a categorical data series.

Parameters
data:pandas.SeriesThe data series (e.g. a column).
missing_label:strThe new label.
insert_at_end:boolAppend label to the end (default) or in front (False).
Returns
pandas.SeriesThe data series with new categorical dtype attached.
def create_icd_catalog(file_or_url: Union[pathlib.Path, str]) -> pandas.DataFrame: (source)

Creates a ICD Catalog data frame based on original data from BfArM or WHO.

The data source can be a zip file (download from BfArM or WHO) or a download URL for that zip file.

The result looks like this:

CODE      TYP CHAPTER    BLOCK        TEXT         CODE_TEXT
0    A00.-     CODE      01  A00-A09   Cholera..   A00.- Cholera..
1    A00.0     CODE      01  A00-A09   Cholera..   A00.0 Cholera..
2  A00-A09    BLOCK      01  A00-A09  Infektiö..  A00-A09 Infekt..
3  A15-A19    BLOCK      01  A15-A19  Tuberkul..  A15-A19 Tuberk..
4       01  CHAPTER      01           Bestimmt..  01 Bestimmte i..
5       02  CHAPTER      02           Neubildu..  02 Neubildunge..

Attention

Most of the downloads offering zip archives. But the internal structure of them differs. This can cause problems with that function here because it doesn't know all structures yet. Please open an Issue report

Parameters
file_or_url:Union[pathlib.Path, str]File path or URL.
Returns
pandas.DataFrameData frame with columns named ???
def cut_bins(values: Union[list, pandas.Series], infinity_is_less_than: int = 0, interval_range: int = None, infinity_is_equal_or_more_than: int = None, interval_list: Iterable[Tuple] = None, labels: Iterable[str] = None, format_string: str = '{} to {}', begin_format_string: str = 'less than {}', end_format_string: str = '{} and more', remove_unused_labels: bool = False) -> Union[list, pandas.Series]: (source)

Cutting values into bins similar to pandas.cut().

See also cut_bins_via_interval_count() for a simplified wrapper. There are two major methods to specify the bins or intervals.

# Specify steps using "interval_range" to get intervals of
# same length:
cut_bins(the_values, interval_range=10)

# Or using "interval_list" to specify explicit each interval
# with pairs of start and begin value which are both included
# in that interval:
bins = [(0, 4), (5, 9), (10, 50)]
cut_bins(the_values, interval_list=bins)

The interval range arguments are ignored when interval_list is given.

Infinity starts and ends are created automatically when interval_range is used but not with interval_list. For the latter you have to specify them yourself like so:

bins = [(-math.inf, -1), (0, 9), (10, math.inf)]
cut_bins(the_values, interval_list=bins)

The wording of the labels can be modified via format_string and when using infinity ends also via begin_format_string and end_format_string. Or you can set explicit labels via labels.

Parameters
values:Union[list, pandas.Series]List of values to cut.
infinity_is_less_than:intThe right edge (or end) of the first infinity interval. For example if 0 the first two intervals are [(math.inf, -1), (0, ...)].
interval_range:intSize for each interval.
infinity_is_equal_or_more_than:intValue covered by the last infinity interval. For example if 100 the last two intervals are [(..., 99), (100, math.inf)].
interval_list:Iterable[Tuple]List of pairs of start and end points of intervals. Both points are included in that interval. Overwrites interval_range and related arguments. If one of values is not covered in that list a ValueError is raised.
labels:Iterable[str]Explicit list of labels to use.
format_string:strUsed for in between intervals.
begin_format_string:strUsed for the first interval including infinity start.
end_format_string:strUsed for the last interval including infinity end.
remove_unused_labels:boolLabels removed from the categories if they don't exist in the values.
Returns
Union[list, pandas.Series]List of labels corresponding to values.
Raises
ValueErrorIf interval_list is used and its length isn't equal to the number of provided labels.
ValueErrorIf not all values covered by the bins/intervals (except NA values).
def cut_bins_via_interval_count(values: Union[list, pandas.Series], interval_range: int, interval_count: int, first_interval_begin: int = 0, **kwargs) -> Union[list, pandas.Series]: (source)

Cutting values into specific number of bins of the same range.

If one of the values is not covered by the resulting intervals (bins) a ValueError is raised. NA values are an exception of that rule. This function is a wrapper around cut_bins().

Parameters
values:Union[list, pandas.Series]List of values to cut.
interval_range:intSize for each interval.
interval_count:intNumber of intervals (or bins).
first_interval_begin:intFirst value in the first interval.
**kwargsSee cut_bins() for additional arguments.
Returns
Union[list, pandas.Series]List of labels corresponding to values.
Raises
ValueErrorIf not all values covered by the bins/intervals (except NA values).
def cut_by_row_keep_group(data: pandas.DataFrame, group_column: Union[str, int], n_pieces: int, sort_kind: str = 'quicksort') -> list[pandas.DataFrame]: (source)

Cut a dataframe into pieces row by row but keep groups together.

Groups are specified by the column name in group_column. The data will by sorted by group_column using the sort algorithm specified by sort_kind which is used by pandas/numpy. The default quicksort is the fastest. For unittesting stable should be used. The number of resulting parts is not guaranteed to be n_pieces.

Parameters
data:pandas.DataFrameThe data frame that should be cut.
group_column:Union[str, int]Name or index of the group column.
n_pieces:intNumber of resulting pieces.
sort_kind:strUsed for unittesting.
Returns
list[pandas.DataFrame]A list of the data frame parts.
def cut_into_pieces(data: pandas.DataFrame, n_pieces: int) -> list[pandas.DataFrame]: (source)

Cut a dataframe horizontally into pieces of (nearly) the same size.

This pieces can be used for parallelization. To keep groups of rows together there is the alternative cut_by_row_keep_group(). The number of pieces is garantueed.

Parameters
data:pandas.DataFrameThe data frame that should be cut.
n_pieces:intNumber of resulting pieces.
Returns
list[pandas.DataFrame]A list of the data frame parts.
def parallize_job_on_dataframe(data: pandas.DataFrame, worker_func: Callable, group_column: str = None, worker_args: tuple = tuple(), n_pieces: int = None, decrease_workers_by: int = 0) -> pandas.DataFrame: (source)

Use multiprocessing to working on dataframe row by row.

A dataframe is cut into multiple dataframes. Each of them is transferred to another process (not thread). This is fast because of using multiple CPU cores but costs a lot of RAM and some time for transffering the dataframe pieces (via pickle) to a process and back.

There are two options to cut a dataframe. By default or when using n_pieces it is cut into pieces with nearly the same number of rows. The function bandas.cut_into_pieces() is used in that case. When using group_column the function bandas.cut_by_row_keep_group() will be used.

Just return the result of the worker:

def the_worker(sub_dataframe):
    sub_dataframe.foo = 7

    return sub_dataframe

To add additional arguments to the worker use worker_args argument and the values in a tuple.

def the_worker(columns, sub_dataframe):
    sub_dataframe['Extra'] = sub_dataframe.loc[:, cols] \
        .apply(lambda row: row * 7, axis=1)

    return sub_dataframe

if __name__ == '__main__':
    result = bandas.parallize_job_on_dataframe(
        data=df,
        worker_func=the_worker,
        group_column='group',
        worker_args=(['colA', 'colD', 'colT'], )
    )
Parameters
data:pandas.DataFrameThe dataframe.
worker_func:CallableA function used in each process.
group_column:strThat group is not cut into while cutting the dataframe.
worker_args:tupleTuple of arguments used in the worker function.
n_pieces:intNumber of pieces the dataframe should be cut into.
decrease_workers_by:intReduce the number of cores to use by value.
Returns
pandas.DataFrameThe resulting dataframe.
def read_and_validate_csv(file_path: Union[pathlib.Path, zipfile.Path], specs_and_rules: dict, no_header_line: bool = False, encoding: str = 'utf-8', delimiter: str = ';', replace_lines: Dict[str, str] = None, replace_substrings: Dict[str, str] = None, on_bad_lines: Union[str, Callable] = 'error', **kwargs) -> pandas.DataFrame: (source)

Read a CSV file with respect to specifications about format and rules about valid values and return a pandas.DataFrame.

You have to give specifications for all existing columns in the correct order. The following aspects can be specified:

  • Columns to read and columns to ignore.
  • The data type of a column. Types from pandas, numpy or Pythons builtins are valid.
  • Missing values. They are converted to pandas.NA in the resulting data frame.
  • Length of a data field in the raw CSV file.
  • Valid values in a column (checked in the resulting data frame).
  • Ignoring a column is also a specification.

Example

Here you see a complex example with all possible options.

  • ColumnA is of type str.
  • ColumnB is of type str and the value no answer is treated as missing (pandas.NA).
  • ColumnC exist in the *.csv file but will be ignored while reading and won't be a part of the resulting data frame.
  • ColumnD is of type pandas.Int16Dtype. The value -9 is a missing. The field length can be 1, 2 or 4 to 8.
  • Possible or valid values are 0, 1, 3 to 9 and the missing -9.
specs_and_rules = {
    'ColumnA': 'str',
    'ColumnB': ('str', 'no answer'),
    'ColumnC': None,
    'ColumnD': (
        'Int16',
        -9, {
            'len': [1, 2, (4-8)],
            'val': [0, 1, (3-9)]
        }
    }
}

Example

Here we expect a CSV file with three columns but only one ColumnB is in the resulting data frame and the others are ignored while reading. Despite the third column ColumnC is not contained in the result its content will be validated with a val-rule.

specs_and_rules = {
    'ColumnA': None,
    'ColumnB': 'int',
    'ColumnC': (None, None, {'val': [1]}),
}

Important

Do not use objects of type type when specifying the column type. For example when the column is a string use "str" instead of str.

Hint

To passthrough arguments to pandas.read_csv() the **kwargs can be used. For example skiprows or skipfooter.

Hint

The file_path can also be of type zipfile.Path to specify an entry in a ZIP file.

Parameters
file_path:Union[pathlib.Path, zipfile.Path]Path to the CSV file to read from.
specs_and_rules:dictA column named indexed dictionary.
no_header_line:boolIndicates if the first line contains column names.
encoding:strOptional encoding type used for reading the CSV file.
delimiter:strDelimiter to separate the fields.
replace_lines:Dict[str, str]Undocumented
replace_substrings:Dict[str, str]Undocumented
on_bad_lines:Union[str, Callable]Undocumented
**kwargsUsed to handover arguments to pandas.read_csv().
Returns
pandas.DataFrameThe resulting data frame.
def read_and_validate_excel(file_path: Union[pathlib.Path, zipfile.Path], specs_and_rules: dict, no_header_line: bool = False, **kwargs) -> pandas.DataFrame: (source)

Read an Excel file with respect to specifications about format and rules about valid values and return a pandas.DataFrame.

You have to give specifications for all existing columns in the correct order. But ignoring a column is also a specification. See read_and_validate_csv() for details and examples about usage of specs_and_rules.

Tip

To passthrough arguments to pandas.read_excel() the **kwargs can be used. For example sheet_name, skiprows or skipfooter.

def reorder_columns(dataframe: pandas.DataFrame, this_columns: Union[list[str], str], behind_this_column: str = None) -> pandas.DataFrame: (source)

Rearrange the columns of a DataFrame.

The columns named in this_columns are moved behind the column named via behind_this_column.

Parameters
dataframe:pandas.DataFrameThe complete data frame.
this_columns:Union[list[str], str]List of names or one name of column(s) to move.
behind_this_column:strName of column before the insertion position.
Returns
pandas.DataFrameThe new ordered data frame.
Raises
AttributeErrorColumn names not unique.
KeyErrorColumn to move not exist.
ValueErrorBehind column not exist.
def validate_csv(file_path: Union[pathlib.Path, zipfile.Path], columns: Union[list[str], int], delimiter: str = ';', quoting: int = csv.QUOTE_MINIMAL, encoding: str = 'utf-8', field_length_rules: dict = None, stop_after_n_errors: int = 50, skiprows: int = 0, skipfooter: int = 0, nrows: int = None, replace_lines: Dict[str, str] = None, replace_substrings: Dict[str, str] = None) -> bool: (source)

Validate the structure of a CSV file.

See read_and_validate_csv() for more details. Malformed or invalid rows are logged to a file (*.wrong.csv). The need for that function arises from the fact that the checks of pandas.read_csv() are quite sluggish.

Note

Development notes: Maybe move that function to buhtzology.misc because there are no pandas dependencies in it.

Parameters
file_path:Union[pathlib.Path, zipfile.Path]Path and name of the csv file to check.
columns:Union[list[str], int]Names or count of expected columns.
delimiter:strField delimiter.
quoting:intQuoting dialect.
encoding:strUsed while reading the file.
field_length_rules:dictColumn index indexed dict with list of valid field length.
stop_after_n_errors:intStop checking for further errors or rule violations when this number is reached.
skiprows:intSkipping n rows from the beginning.
skipfooter:intSkipping n rows from the end.
nrows:intRead nrows from beginning (including header) after skipping.
replace_lines:Dict[str, str]Undocumented
replace_substrings:Dict[str, str]Undocumented
Returns
boolTrue if everything is fine. Otherwise exception is raised.
Raises
ValueErrorIf the header or one or more lines do not fit the rules.
def validate_dataframe(df_to_check: pandas.DataFrame, expected_columns: Iterable, ignored_columns: Iterable, value_rules: Dict[Hashable, Iterable]): (source)

Validate a dataframes about columnes and expected values.

If the dataframe is valid nothing happens; no return value. If something is invalid a TypeError is raised.

Parameters
df_to_check:pandas.DataFrameThe dataframe to perform the validation on.
expected_columns:IterableList of columns names that should exist.
ignored_columns:IterableList of columns excluded from the validation.
value_rules:Dict[Hashable, Iterable]See read_and_validate_csv() for details.
Raises
TypeErrorWhen the dataframe do not fit the rules.
def validate_value_rules_on_dataframe(df_to_check, value_rules): (source)

Validate if the DataFrame (df_to_check) fit to value_rules. Otherwise an ValueError is raised.

Parameters
df_to_check:pandas.DataFrameDataframe to validate.
value_rules:dictColumn name indexed dict with valid values.
Raises
ValueErrorIf value rules not fit.
FILENAME_PREFIX_WRONG: str = (source)

Prefix used for the WRONG file. See validate_csv() for details.

Value
'_WRONG_'
SERVER_N_CORES: int = (source)

Machines with this number of logical cores are treated as servers or crunching machines used by multiple users. See SERVER_USE_CORE_FX for details.

Value
16
SERVER_USE_CORE_FX: float = (source)

When parallelizing tasks using parallize_job_on_dataframe() on a mutli-user server use only this fraction of available cores.

This is used to preventing to take all cors and block other users. See SERVER_N_CORES for details.

Value
0.25
def _announce_csv_violations(file_path: pathlib.Path, violations: dict, delimiter: str, quoting: int, encoding: str, real_header: list[str], count_err_msg: str): (source)
def _construct_file_path_wrong(file_path: Union[pathlib.Path, zipfile.Path, io.IOBase]) -> pathlib.Path: (source)

Constructs a file name suitable for documenting wrong lines.

Example 1:
Based on /home/user/data.csv the path /home/user/_WRONG_data.csv will be returned.
Example 2:
Based on /foo.zip/foo/bar/data.csv the path /_WRONG_foo.zip.foo_bar_data.csv will be returned.
Parameters
file_path:Union[pathlib.Path, zipfile.Path, io.IOBase]Path object which used as a base for construction.
Returns
pathlib.PathThe new file path.
def _csv_line_fit_field_length_rules(line: list[str], rules: dict[int, list[int]], columns: list[str]) -> bool: (source)

See validate_csv() for details.

def _explode_rules(rules: list) -> list: (source)

Convert lists with values and ranges to ranges only.

Used by _parse_specs_and_rules(). Ranges are specified as tuples with two elements. Ranges are only allowed with type int.

Parameters
rules:listA list mixed with single values and value ranges as tuples. E.g. [1, 2, (4, 8), 11]
Returns
listA list with single values only. E.g. [1, 2, 4, 5, 6, 7, 8, 11]
Raises
ValueErrorIf range is invalid. e.g. from 5 to 2
@contextlib.contextmanager
def _file_path_as_buffer(path_or_buffer: Union[pathlib.Path, io.IOBase], mode: str = 'r', encoding: str = 'utf-8', replace_lines: Dict[str, str] = None, replace_substrings: Dict[str, str] = None) -> io.IOBase: (source)

Using a path or a in-memory file (buffer) with a with statement.

@contextlib.contextmanager
def _file_path_or_zip_path_as_buffer(file_path, zip_entry_mode: str = 'r', encoding: str = 'utf-8', replace_lines: Dict[str, str] = None, replace_substrings: Dict[str, str] = None): (source)

A workaround to make pandas handle zipfile.Path.

Pandas can not handle zipfile.Path instances. See: https://github.com/pandas-dev/pandas/issues/49906 Here we open it as a byte stream.

def _generate_bin_labels(bins: Iterable, format_string: str = '{} to {}', begin_format_string: str = 'less than {}', end_format_string: str = '{} and more') -> list[str]: (source)

Create labels for the associated intervals.

The function is used by cut_bins(). Two ways exists to give the intervals in the bin argument.

Example

# Pairs of intervals (both ends included)
bins = [
    (0, 4),
    (5, 9),
    (10, 20)
]
result = ['0 to 4', '5 to 9', '10 to 20']

# Or with infinity ends
bins = [
    (-math.inf, 4),
    (5, 9),
    (10, math.inf)
]
result = ['less than 5', '5 to 9', '10 and more']

# As list of edges (only right end included)
bins = [0, 5, 10]
result = ['1 to 5', '6 to 10']

# Or with infinity ends
bins = [-math.inf, 0, 5, 10, math.inf]
result = ['less than 1', '1 to 5', '6 to 10', '11 and more']
Parameters
bins:IterableThe intervals as a list of bin edges or a list of start-end pairs.
format_string:strUsed for labels like "x to y".
begin_format_string:strUsed for the first label with infinity begin.
end_format_string:strUsed for the last label with infinity end.
Returns
list[str]A list of labels.

Original WHO ICD-Data as Pandas DataFrames.

See create_icd_catalog() for details.

Parameters
file_or_url:Union[pathlib.Path, str]File path or URL.
Returns
Tuple[pandas.DataFrame, pandas.DataFrame, pandas.DataFrame]Three data frames for chapters, blocks and codes.
Raises
FileNotFoundErrorIf the files wasn't found.
requests.exceptions.HTTPErrorIf there's a problem with the URL.
def _parse_specs_and_rules(specs_and_rules: dict) -> tuple[list, dict, dict, dict, dict]: (source)

Parse, validate and separate the specs_and_rules argument used in read_and_validate_csv().

Parameters
specs_and_rules:dictThe dictionary to process.
Returns
A dictionary with six elementscolumns, ignored columns, dtypes, missing values, length rules and value rules.
def _replace_lines_and_substrings(buffer: io.IOBase, replace_lines: Dict[str, str], replace_substrings: Dict[str, str]) -> io.IOBase: (source)

Replace lines and substrings in the content of a file or buffer.

Lines are replaced complete only and not treated as like substrings. Line endings (e.g. \n) need to be part of the line to replace. Substrings are searched via is in operator. Line replacement has priority.

Parameters
buffer:io.IOBaseThe buffer (file-like object) to read content from.
replace_lines:Dict[str, str]Indexed by complete lines to replace.
replace_substrings:Dict[str, str]Indexed by substrings replaced in lines.
Returns
io.IOBaseA buffer (file-like object) sougth back to 0.
def _store_spec_na_values(column_name: str, specs: Any, NA_VALUES: dict) -> dict: (source)

What is wrong here?

Called by _parse_specs_and_rules().

Missing values are specified in specs and stored in NA_VALUES indexed by `column_name.

Undocumented