Commit b8eb8f92 authored by Shengpu Tang (tangsp)'s avatar Shengpu Tang (tangsp)
Browse files

experiments: update data extraction

parent e3fc4342
# FIDDLE
FIDDLE – <b>F</b>lex<b>I</b>ble <b>D</b>ata-<b>D</b>riven pipe<b>L</b>in<b>E</b> – is a preprocessing pipeline that transforms structured EHR data into feature vectors that can be used with ML algorithms, relying on a small number of user-defined arguments.
FIDDLE – <b>F</b>lex<b>I</b>ble <b>D</b>ata-<b>D</b>riven pipe<b>L</b>in<b>E</b> – is a preprocessing pipeline that transforms structured EHR data into feature vectors that can be used with ML algorithms, relying on only a small number of user-defined arguments.
Required packages and versions are listed in `requirements.txt`. Older versions may still work but have not been tested.
Requires python 3.6 or above. Required packages and versions are listed in `requirements.txt`. Older versions may still work but have not been tested.
## Usage Notes
FIDDLE generates feature vectors based on data within the observation period $`t\in[0,T]`$. This feature representation can be used to make predictions of adverse outcomes at t=T. More specifically, FIDDLE outputs a set of binary feature vectors for each example $`i`$, $`\{(s_i,x_i)\ \text{for}\ i=1 \dots N\}`$ where $`s_i \in R^d`$ contains time-invariant features and $`x_i \in R^{L \times D}`$ contains time-dependent features.
......@@ -41,3 +41,7 @@ python -m FIDDLE.run \
--theta_1=0.001 --theta_2=0.001 --theta_freq=1 \
--stats_functions 'min' 'max' 'mean'
```
## Experiments
In order to show the flexibility and utility of FIDDLE, swe conducted several experiments using data from MIMIC-III. The code to reproduce the results are located in the `mimic3_experiments` subdirectory.
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"import pandas as pd"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"outcome=ARF,T=4.0,dt=1.0 5034261\n",
"<class 'pandas.core.frame.DataFrame'>\n",
"RangeIndex: 5034261 entries, 0 to 5034260\n",
"Data columns (total 4 columns):\n",
"ID int64\n",
"t float64\n",
"variable_name object\n",
"variable_value object\n",
"dtypes: float64(1), int64(1), object(2)\n",
"memory usage: 153.6+ MB\n",
"\n",
"outcome=ARF,T=12.0,dt=1.0 11967172\n",
"<class 'pandas.core.frame.DataFrame'>\n",
"RangeIndex: 11967172 entries, 0 to 11967171\n",
"Data columns (total 4 columns):\n",
"ID int64\n",
"t float64\n",
"variable_name object\n",
"variable_value object\n",
"dtypes: float64(1), int64(1), object(2)\n",
"memory usage: 365.2+ MB\n",
"\n",
"outcome=Shock,T=4.0,dt=1.0 6747035\n",
"<class 'pandas.core.frame.DataFrame'>\n",
"RangeIndex: 6747035 entries, 0 to 6747034\n",
"Data columns (total 4 columns):\n",
"ID int64\n",
"t float64\n",
"variable_name object\n",
"variable_value object\n",
"dtypes: float64(1), int64(1), object(2)\n",
"memory usage: 205.9+ MB\n",
"\n",
"outcome=Shock,T=12.0,dt=1.0 16547927\n",
"<class 'pandas.core.frame.DataFrame'>\n",
"RangeIndex: 16547927 entries, 0 to 16547926\n",
"Data columns (total 4 columns):\n",
"ID int64\n",
"t float64\n",
"variable_name object\n",
"variable_value object\n",
"dtypes: float64(1), int64(1), object(2)\n",
"memory usage: 505.0+ MB\n",
"\n",
"benchmark,outcome=mortality,T=48.0,dt=1.0 46945544\n",
"<class 'pandas.core.frame.DataFrame'>\n",
"RangeIndex: 46945544 entries, 0 to 46945543\n",
"Data columns (total 4 columns):\n",
"ID int64\n",
"t float64\n",
"variable_name object\n",
"variable_value object\n",
"dtypes: float64(1), int64(1), object(2)\n",
"memory usage: 1.4+ GB\n",
"\n"
]
}
],
"source": [
"for d in [\n",
" 'outcome=ARF,T=4.0,dt=1.0',\n",
" 'outcome=ARF,T=12.0,dt=1.0',\n",
" 'outcome=Shock,T=4.0,dt=1.0',\n",
" 'outcome=Shock,T=12.0,dt=1.0',\n",
" 'benchmark,outcome=mortality,T=48.0,dt=1.0',\n",
"]:\n",
" df = pd.read_pickle('../data/processed/features/' + d + '/input_data.p')\n",
" print(d, len(df))\n",
" df.info()\n",
" print()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.5.2"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
......@@ -118,7 +118,7 @@
"output_type": "stream",
"text": [
"======\n",
"prediction time 4 hour\n",
"prediction time 4.0 hour\n",
"Exclude deaths 23499\n",
"Exclude discharges 23401\n",
"---\n",
......@@ -128,7 +128,7 @@
"Outcome Shock\n",
"Exclude onset 19342\n",
"======\n",
"prediction time 12 hour\n",
"prediction time 12.0 hour\n",
"Exclude deaths 23319\n",
"Exclude discharges 23060\n",
"---\n",
......@@ -141,7 +141,7 @@
}
],
"source": [
"for T in [4, 12]:\n",
"for T in [4.0, 12.0]:\n",
" print('======')\n",
" print('prediction time', T, 'hour')\n",
"\n",
......@@ -166,6 +166,7 @@
" .set_index('index').copy()\n",
" pop = pop[(pop['{}_ONSET_HOUR'.format(task)] >= T) | pop['{}_ONSET_HOUR'.format(task)].isnull()]\n",
" pop['{}_LABEL'.format(task)] = pop['{}_ONSET_HOUR'.format(task)].notnull().astype(int)\n",
" pop = pop.rename(columns={'ICUSTAY_ID': 'ID'})\n",
" pop.to_csv(data_path + 'population/{}_{}h.csv'.format(task, T), index=False)\n",
"\n",
" # Construct boolean mask\n",
......@@ -195,7 +196,7 @@
"output_type": "stream",
"text": [
"======\n",
"prediction time 48 hour\n",
"prediction time 48.0 hour\n",
"Exclude deaths 22776\n",
"Exclude discharges 11695\n",
"---\n",
......@@ -205,7 +206,7 @@
}
],
"source": [
"for T in [48]:\n",
"for T in [48.0]:\n",
" print('======')\n",
" print('prediction time', T, 'hour')\n",
"\n",
......@@ -223,6 +224,7 @@
" print('Outcome', task)\n",
" examples['{}_LABEL'.format(task)] = examples.HOSPITAL_EXPIRE_FLAG\n",
" pop = examples[['ICUSTAY_ID', '{}_LABEL'.format(task)]]\n",
" pop = pop.rename(columns={'ICUSTAY_ID': 'ID'})\n",
" pop.to_csv(data_path + 'population/{}_{}h.csv'.format(task, T), index=False)\n",
" print('Exclude onset', len(pop))"
]
......@@ -251,7 +253,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.5"
"version": "3.5.2"
}
},
"nbformat": 4,
......
......@@ -5,5 +5,10 @@ with open(os.path.join(os.path.dirname(__file__), '../config.yaml')) as f:
data_path = os.path.join(os.path.dirname(__file__), config['data_path'])
mimic3_path = os.path.join(os.path.dirname(__file__), config['mimic3_path'])
ID_col = config['column_names']['ID']
t_col = config['column_names']['t']
var_col = config['column_names']['var_name']
val_col = config['column_names']['var_value']
parallel = True
n_jobs = 72
"""
python prepare_input.py
"""
import os, yaml
with open(os.path.join(os.path.dirname(__file__), '../config.yaml')) as f:
config = yaml.full_load(f)
data_path = os.path.join(os.path.dirname(__file__), config['data_path'])
parallel = True
ID_col = config['column_names']['ID']
t_col = config['column_names']['t']
var_col = config['column_names']['var_name']
val_col = config['column_names']['var_value']
import argparse
import pickle
import pandas as pd
import numpy as np
from tqdm import tqdm
from joblib import Parallel, delayed
from config import parallel, data_path, ID_col, t_col, var_col, val_col
def main():
parser = argparse.ArgumentParser(description='')
parser.add_argument('--outcome', type=str, required=True)
parser.add_argument('--T', type=int, required=True)
parser.add_argument('--T', type=float, required=True)
parser.add_argument('--dt', type=float, required=True)
args = parser.parse_args()
task = args.outcome
outcome = args.outcome
T = args.T
dt = args.dt
print('Preparing pipeline input for: outcome={}, T={}, dt={}'.format(task, T, dt))
print('Preparing pipeline input for: outcome={}, T={}, dt={}'.format(outcome, T, dt))
import pathlib
pathlib.Path(data_path, 'features', 'outcome={}.T={}.dt={}'.format(task, T, dt)) \
pathlib.Path(data_path, 'features', 'outcome={},T={},dt={}'.format(outcome, T, dt)) \
.mkdir(parents=True, exist_ok=True)
# Load in study population
population = pd.read_csv(data_path + 'population/{}_{}h.csv'.format(task, T)) \
population = pd.read_csv(data_path + 'population/{}_{}h.csv'.format(outcome, T)) \
.rename(columns={'ICUSTAY_ID': 'ID'}).set_index('ID')[[]]
# Load in raw data (from prepare.py)
with open(data_path + 'formatted/all_data.stacked.p', 'rb') as f:
data = pickle.load(f)
# with open(data_path + 'formatted/reformatted_data.T={}.dt={}.p'.format(T, 0.5), 'rb') as f:
# data = pickle.load(f)
####### TODO: Refactor: resample continuous, resolve duplicates (discrete & continuous)
# data = resolve_duplicates_discrete_old(data)
# data = resample_continuous_events_old(data, T, dt) # includes filtering by time window
# data = filter_prediction_time(data, T)
# data = resample_continuous_events(data, T, dt)
# data = resolve_duplicates(data)
# Resample continuous, resolve duplicates (discrete & continuous)
data = resolve_duplicates_discrete(data)
data = filter_prediction_time(data, T)
data = resample_continuous_events(data, T, dt)
......@@ -68,7 +49,7 @@ def main():
assert set(df_data['ID'].unique()) == set(population.index)
# Save
df_data.to_pickle(data_path + 'features/outcome={}.T={}.dt={}/input_data.p'.format(task, T, dt))
df_data.to_pickle(data_path + 'features/outcome={},T={},dt={}/input_data.p'.format(outcome, T, dt))
################################
......@@ -176,7 +157,7 @@ def resolve_duplicates_discrete(data):
return df_v_out
if parallel:
df_dedup = Parallel(n_jobs=20)(delayed(_resolve_duplicates_impl)(v, df_v) for v, df_v in tqdm(all_dups.groupby('variable_name')))
df_dedup = Parallel(n_jobs=20, verbose=10)(delayed(_resolve_duplicates_impl)(v, df_v) for v, df_v in all_dups.groupby('variable_name'))
else:
df_dedup = [_resolve_duplicates_impl(v, df_v) for v, df_v in tqdm(all_dups.groupby('variable_name'))]
......@@ -251,7 +232,7 @@ def resolve_duplicates_continuous(data):
return df_v_out
if parallel:
df_dedup = Parallel(n_jobs=20)(delayed(resolve_duplicates_inputevents)(v, df_v) for v, df_v in tqdm(all_dups.groupby('variable_name')))
df_dedup = Parallel(n_jobs=20, verbose=10)(delayed(resolve_duplicates_inputevents)(v, df_v) for v, df_v in all_dups.groupby('variable_name'))
else:
df_dedup = [resolve_duplicates_inputevents(v, df_v) for v, df_v in tqdm(all_dups.groupby('variable_name'))]
df_out = pd.concat([not_dups, *df_dedup])
......@@ -266,128 +247,6 @@ def resolve_duplicates_continuous(data):
return data
# def resolve_duplicates(data):
# """
# Assume input format:
# ––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––
# | ID | t (or t_start + t_end) | variable_name | variable_value |
# ––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––
# """
# print_header('Resolve duplicated event records', char='-')
# ### Chart events - duplicate rows
# print('*** CHARTEVENTS')
# df = data['CHARTEVENTS']
# m_dups = df.duplicated(subset=['ID', 't', 'variable_name'], keep=False)
# dups = df[m_dups]
# dup_variables = dups['variable_name'].unique()
# all_dups = df[df['variable_name'].isin(dup_variables)]
# not_dups = df[~df['variable_name'].isin(dup_variables)]
# def _resolve_duplicates_impl(v, df_v):
# # Categorical variables
# # Map to different variable names with value 0/1
# if pd.to_numeric(df_v['variable_value'], errors='ignore').dtype == 'object':
# df_mask = df_v.copy()
# df_mask['variable_value'] = 1
# df_mask = df_mask.drop_duplicates(subset=['ID', 't', 'variable_name'])
# df_attr = df_v.copy()
# df_attr['variable_name'] = df_attr['variable_name'].str.cat(df_attr['variable_value'], sep=': ')
# df_attr['variable_value'] = 1
# df_v_out = pd.concat([df_mask, df_attr], ignore_index=True).sort_values(by=['ID', 't', 'variable_name'])
# else:
# # Numerical variables: vitals, '226871', '226873'
# # Adjust timestamps by a small epsilon
# eps = 1e-6
# df_v_out = df_v.copy()
# for d_i, d_g in df_v.groupby('ID'):
# df_v_out.loc[d_g.index, 't'] += eps * np.arange(len(d_g))
# return df_v_out
# if parallel:
# df_dedup = Parallel(n_jobs=20)(delayed(_resolve_duplicates_impl)(v, df_v) for v, df_v in tqdm(all_dups.groupby('variable_name')))
# else:
# df_dedup = [_resolve_duplicates_impl(v, df_v) for v, df_v in tqdm(all_dups.groupby('variable_name'))]
# df_out = pd.concat([not_dups, *df_dedup])
# # assert sum(df_out.duplicated()) == 7 ##### Handle the case where the exact record is duplicated
# df_out.drop_duplicates(inplace=True)
# assert not any(df_out.duplicated(subset=['ID', 't', 'variable_name'], keep=False))
# data['CHARTEVENTS'] = df_out.sort_values(by=['ID', 't', 'variable_name']).reset_index(drop=True)
# ### Labs - duplicate rows
# # Adjust timestamps by a small epsilon
# print('*** LABEVENTS')
# df = data['LABEVENTS']
# df_lab = df.copy()
# eps = 1e-6
# m_dups = df_lab.duplicated(subset=['ID', 't', 'variable_name'], keep=False)
# df_lab_out = df_lab[m_dups].copy()
# for v, df_v in df_lab_out.groupby('variable_name'):
# for d_i, d_g in df_v.groupby('ID'):
# df_lab_out.loc[d_g.index, 't'] += eps * np.arange(len(d_g))
# df_lab_final = pd.concat([df_lab[~m_dups], df_lab_out])
# assert not any(df_lab_final.duplicated(subset=['ID', 't', 'variable_name'], keep=False))
# data['LABEVENTS'] = df_lab_final.sort_values(by=['ID', 't', 'variable_name']).reset_index(drop=True)
# # Remove repeated procedure events since they are just 0/1 mask
# print('*** PROCEDUREEVENTS_MV')
# data['PROCEDUREEVENTS_MV'] = data['PROCEDUREEVENTS_MV'].drop_duplicates(keep='first')
# # Handle duplicated input events
# ## Add up rates/amounts
# ## Map routes to separate indicator variables
# print('*** INPUTEVENTS_MV')
# df = data['INPUTEVENTS_MV']
# m_dups = df.duplicated(subset=['ID', 't', 'variable_name'], keep=False)
# dups = df[m_dups]
# dup_variables = dups['variable_name'].unique()
# all_dups = df[df['variable_name'].isin(dup_variables)]
# not_dups = df[~df['variable_name'].isin(dup_variables)]
# def resolve_duplicates_inputevents(v, df_v):
# # InputRoute - categorical
# if pd.to_numeric(df_v['variable_value'], errors='ignore').dtype == 'object':
# df_mask = df_v.copy()
# df_mask['variable_value'] = 1
# df_mask = df_mask.drop_duplicates(subset=['ID', 't', 'variable_name'])
# df_attr = df_v.copy()
# df_attr['variable_name'] = df_attr['variable_name'].str.cat(df_attr['variable_value'], sep=': ')
# df_attr['variable_value'] = 1
# df_v_out = pd.concat([df_mask, df_attr], ignore_index=True).drop_duplicates().sort_values(by=['ID', 't', 'variable_name'])
# else:
# # Numerical variables
# if len(v) == 6: # plain ITEMID
# # just use 0/1 indicator
# df_v_out = df_v.drop_duplicates(keep='first')
# else: # Rate/Amout - add up numbers at the same time stamp
# df_v_out = df_v.groupby(['ID', 't', 'variable_name'])[['variable_value']].sum().reset_index()
# return df_v_out
# if parallel:
# df_dedup = Parallel(n_jobs=20)(delayed(resolve_duplicates_inputevents)(v, df_v) for v, df_v in tqdm(all_dups.groupby('variable_name')))
# else:
# df_dedup = [resolve_duplicates_inputevents(v, df_v) for v, df_v in tqdm(all_dups.groupby('variable_name'))]
# df_out = pd.concat([not_dups, *df_dedup])
# assert not any(df_out.duplicated(subset=['ID', 't', 'variable_name'], keep=False))
# data['INPUTEVENTS_MV'] = df_out
# print('Verifying no more duplicates...')
# for table_name, df_table in data.items():
# if 't' in df_table.columns:
# assert not any(df_table.duplicated(subset=['ID', 't', 'variable_name'])), '{} contains duplicate records'.format(table_name)
# return data
if __name__ == '__main__':
main()
......@@ -14,7 +14,7 @@
"outputs": [],
"source": [
"import yaml\n",
"with open('../config.yaml') as f:\n",
"with open('../../config.yaml') as f:\n",
" config = yaml.full_load(f)\n",
"\n",
"data_path = config['data_path']"
......@@ -96,12 +96,12 @@
"metadata": {},
"outputs": [],
"source": [
"my_labels = pd.read_csv('../' + data_path + 'population/mortality_48h.csv').set_index('ICUSTAY_ID')"
"my_labels = pd.read_csv('../' + data_path + 'population/mortality_48.0h.csv').set_index('ID')"
]
},
{
"cell_type": "code",
"execution_count": 10,
"execution_count": 9,
"metadata": {},
"outputs": [],
"source": [
......@@ -125,7 +125,7 @@
},
{
"cell_type": "code",
"execution_count": 11,
"execution_count": 10,
"metadata": {},
"outputs": [],
"source": [
......@@ -134,7 +134,7 @@
},
{
"cell_type": "code",
"execution_count": 12,
"execution_count": 11,
"metadata": {},
"outputs": [],
"source": [
......@@ -143,7 +143,27 @@
},
{
"cell_type": "code",
"execution_count": 15,
"execution_count": 12,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"8577"
]
},
"execution_count": 12,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"len(df_out)"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [],
"source": [
......@@ -152,7 +172,7 @@
},
{
"cell_type": "code",
"execution_count": 16,
"execution_count": 14,
"metadata": {},
"outputs": [
{
......@@ -161,7 +181,7 @@
"0.12020519995336365"
]
},
"execution_count": 16,
"execution_count": 14,
"metadata": {},
"output_type": "execute_result"
}
......@@ -194,7 +214,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.5"
"version": "3.5.2"
}
},
"nbformat": 4,
......
#!/bin/bash
set -euxo pipefail
python prepare_input.py --outcome=ARF --T=4 --dt=1
python prepare_input.py --outcome=ARF --T=12 --dt=1
python prepare_input.py --outcome=Shock --T=4 --dt=1
python prepare_input.py --outcome=Shock --T=12 --dt=1
python prepare_input.py --outcome=mortality --T=48 --dt=1
mv ../data/processed/features/outcome=mortality,T=48.0,dt=1.0 ../data/processed/features/benchmark,outcome=mortality,T=48.0,dt=1.0
......@@ -3,15 +3,17 @@
- Extract and format data from structured tables in MIMIC-III as input to FIDDLE
- Goal: using data from all structured tables, generate Time-Invariant features **s** and Time Series features __X__.
## Usage
## Steps to reproduce results
0. Modify `config.yaml` to specify `mimic3_path` and `data_path`.
### 1) Data Extraction
1. Data Extraction
- Execute `python -c "from extract_data import *; check_nrows();"` to verify the integrity of raw csv files.
- Run `python extract_data.py`.
1. Labels & Analyses
1. Labels & Cohort definitions
- Run `python generate_labels.py` to generate the event onset time and labels for two outcomes: ARF and shock. The output should be
```
ARF: {0: 13125, 1: 10495} N = 23620
......@@ -19,8 +21,13 @@
```
- Run the following notebooks in order: `LabelDistribution.ipynb`, `InclusionExclusion.ipynb` and `PopulationSummary.ipynb`.
1. Apply FIDDLE to generate features
1. Prepare input tables for each cohort
- Run `python prepare_input.py --outcome={outcome} --T={T} --dt={dt}`
### 2) Apply FIDDLE
1. Apply FIDDLE to generate features
- Run `python make_features.py --outcome={outcome} --T={T} --dt={dt}`
Note: a bash script is provided for generating features.
......@@ -31,3 +38,6 @@ The generated features and associated metadata are located in `{data_path}/featu
- `X.npz`: a sparse tensor of shape (N, L, D)
- `s.feature_names.txt`: names of _d_ time-invariant features
- `X.feature_names.txt`: names of _D_ time-series features
### 3) ML Models
[Contents will be added soon. ]
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment