Skip to content
Snippets Groups Projects
Commit 124d643f authored by Josef Hardi's avatar Josef Hardi
Browse files

refactor: reuse methods to avoid duplicates

parent 4e2788b4
No related branches found
No related tags found
No related merge requests found
......@@ -19,9 +19,9 @@ class Optum:
_dataset_id = 'optum_zip5'
# Table IDs
_member_table_id = f'{_project_id}.{_dataset_id}.member'
_claims_table_id = f'{_project_id}.{_dataset_id}.medical_claims'
_diagnosis_table_id = f'{_project_id}.{_dataset_id}.medical_diagnosis'
_member_table_id = f'{_project_id}.{_dataset_id}_member.main'
_claims_table_id = f'{_project_id}.{_dataset_id}_medical_claims.main'
_diagnosis_table_id = f'{_project_id}.{_dataset_id}_medical_diagnosis.main'
# Google BigQuery Client
_client = bigquery.Client()
......@@ -111,16 +111,24 @@ class Optum:
_OBSERVATION_PERIOD = """
observation_period AS (
SELECT DISTINCT
t0.Patid AS person_id,
t0.Eligeff AS observation_period_start_date,
t0.Eligend AS observation_period_end_date,
CAST(CASE
WHEN (STRPOS(t0.Zipcode_5, '_') = 0) THEN t0.Zipcode_5
ELSE SUBSTR(t0.Zipcode_5, 1, STRPOS(t0.Zipcode_5, '_') - 1)
END AS INT64) AS zip
FROM
{person_table} AS t0
SELECT t0.Patid AS person_id,
MIN(t0.Eligeff) AS observation_period_start_date,
MAX(t0.Eligend) AS observation_period_end_date,
t0.zip AS zip
FROM (
SELECT DISTINCT
t1.Patid,
t1.Eligeff,
t1.Eligend,
t1.Product,
CAST(CASE
WHEN (STRPOS(t1.Zipcode_5, '_') = 0) THEN t1.Zipcode_5
ELSE SUBSTR(t1.Zipcode_5, 1, STRPOS(t1.Zipcode_5, '_') - 1)
END AS INT64) AS zip
FROM
{person_table} AS t1
) AS t0
GROUP BY t0.Patid, t0.zip, t0.Product
)"""
_PERSON = """
......@@ -178,10 +186,10 @@ class Optum:
JOIN (
SELECT *, row_number() OVER (PARTITION BY person_id ORDER BY condition_start_date) AS ordinal
FROM condition_occurrence
WHERE condition_source_value IN ({event_valueset})
{condition_filter}
) AS c
ON p.person_id = c.person_id
{event_occurrence_filters}
{condition_occurrence_filters}
) AS e
ON op.person_id = e.person_id AND op.zip = e.zip
AND e.condition_start_date >= op.observation_period_start_date
......@@ -221,12 +229,9 @@ class Optum:
FROM (
SELECT p.person_id, p.zip
FROM qualifying_event AS p
JOIN (
SELECT *
FROM person
{age_filter}
) AS e
ON p.person_id=e.person_id
JOIN person AS e
ON p.person_id=e.person_id AND p.zip=e.zip
{age_filter}
)"""
_INCLUSION_CRITERIA_CONDITION = """
......@@ -240,16 +245,16 @@ class Optum:
JOIN (
SELECT *
FROM condition_occurrence
WHERE condition_source_value IN ({event_valueset})
{condition_filter}
) AS c
ON pe.person_id = c.person_id
{event_occurrence_filters}
{condition_occurrence_filters}
) AS e
ON p.person_id = e.person_id AND p.zip = e.zip
AND e.condition_start_date >= p.observation_period_start_date
AND e.condition_start_date <= p.observation_period_end_date
{event_occurrence_before_index_date}
{event_occurrence_after_index_date}
{condition_occurrence_before_index_date}
{condition_occurrence_after_index_date}
)"""
......@@ -261,14 +266,6 @@ class Optum:
)
)"""
# SQL template for getting a population within zip codes
_GET_LOCATION_POPULATIONS = """
WITH {person}
SELECT person_id, year_of_birth, gender_source_value, zip
FROM person
{filters}
ORDER BY zip"""
# SQL template for getting a cohort population
_GET_COHORT_POPULATIONS = """
WITH {observation_period}, {person}, {condition_occurrence}, {qualifying_event}, {inclusion_criteria}, {inclusion_event},
......@@ -320,216 +317,154 @@ class Optum:
"""
result = self._client.query(query)
return result.to_dataframe()
def get_populations(self, zips, index_date=None, constraining_properties=None):
"""Returns a :obj:`pandas.DataFrame` of people population located at the given \
:code:`zips`.
Args:
zips (:obj:`list`): ZIP codes representing the people's location.
index_date (:obj:`str`): The
Returns:
def get_cohort(self, zips, entry_criteria, inclusion_criteria=None, exit_criteria=None):
"""
filters_sql = ""
if constraining_properties:
filters = {}
for constraining_property in constraining_properties:
for k, v in constraining_property.items():
if k == 'gender':
filter_template = "gender_source_value = '{value_1}'"
filter_str = filter_template.format(value_1=self._GENDERS[v]['value'])
filters['gender'] = filter_str
elif k == 'ageGroup':
filter_template = "EXTRACT(YEAR FROM DATE '{index_date}')-year_of_birth >= {value_1} AND EXTRACT(YEAR FROM DATE '{index_date}')-year_of_birth <= {value_2}"
filter_str = filter_template.format(
index_date=index_date if index_date else date.today().strftime("%Y-%m-%d"),
value_1=self._AGE_GROUPS[v]['start'],
value_2=self._AGE_GROUPS[v]['end'])
filters['age'] = filter_str
elif k == 'areRange':
if not re.match(r"[0-9]*-[0-9]*", v): raise ValueError("Unable to parse value range expression. Some valid examples: -22, or 16-22, or 22-")
filter_template = "EXTRACT(YEAR FROM DATE '{index_date}')-year_of_birth >= {value_1} AND EXTRACT(YEAR FROM DATE '{index_date}')-year_of_birth <= {value_2}"
filter_str = filter_template.format(
index_date=index_date if index_date else date.today().strftime("%Y-%m-%d"),
value_1=int(v.split('-')[0]) if v.split('-')[0] != '' else 0,
value_2=int(v.split('-')[1]) if v.split('-')[1] != '' else 999)
filters['age'] = filter_str
elif k == 'ageAt':
filter_template = "EXTRACT(YEAR FROM DATE '{index_date}')-year_of_birth = {value_1}"
filter_str = filter_template.format(
index_date=index_date if index_date else date.today().strftime("%Y-%m-%d"),
value_1=v)
filters['age'] = filter_str
filters_sql = "WHERE {conditions}".format(conditions="\n AND ".join(v for v in filters.values()))
"""
entry_criteria_sql = self._build_entry_criteria_sql(entry_criteria)
if inclusion_criteria:
inclusion_criteria_sql = self._build_inclusion_criteria_sql(inclusion_criteria)
inclusion_event_sql = self._build_inclusion_event_sql(inclusion_criteria)
if exit_criteria:
exit_criteria_sql = self._build_exit_criteria_sql(exit_criteria)
query = textwrap.dedent(self._GET_COHORT_POPULATIONS.format(
observation_period=self._OBSERVATION_PERIOD.format(
person_table=self._member_table_id),
person=self._PERSON.format(
person_table=self._member_table_id,
zipcode_valueset=",".join(str(z) for z in zips)),
condition_occurrence=self._CONDITION_OCCURRENCE.format(
condition_occurrence_table=self._diagnosis_table_id,
claims_table=self._claims_table_id),
qualifying_event=entry_criteria_sql,
inclusion_criteria=inclusion_criteria_sql,
inclusion_event=inclusion_event_sql,
inclusion_criteria_count=len(inclusion_criteria)))
query = textwrap.dedent(
self._GET_LOCATION_POPULATIONS.format(
person=self._PERSON.format(
person_table=self._member_table_id,
zipcode_valueset=",".join(str(z) for z in zips)),
filters=filters_sql))
# Execute the query
# print(query)
return self.evaluate(query)
def get_cohort(self, zips, entry_criteria, inclusion_criteria=None, exit_criteria=None):
"""
"""
ec_event_valueset = ""
ec_event_occurrence_filters_sql = ""
ec_continuous_observation_filters_sql = ""
def _build_entry_criteria_sql(self, entry_criteria):
entry_criteria_sql = ""
for k, v in entry_criteria.items():
if k == 'event':
event_type = list(v.keys())[0]
if event_type == 'condition':
event_value = v['condition']
if type(event_value) is str:
ec_event_valueset=",".join(f'"{x}"' for x in self._CONDITIONS[event_value]['codes'])
elif type(event_value) is list:
ec_event_valueset=",".join(f'"{x}"' for x in event_value)
condition_occurrence_parameters = self._get_condition_occurrence_parameters(entry_criteria)
entry_criteria_sql = self._build_condition_entry_criteria_sql(condition_occurrence_parameters)
return entry_criteria_sql
def _get_condition_occurrence_parameters(self, criteria):
condition_occurrence_parameters = {}
condition_occurrence_parameters['condition_filter'] = ""
condition_occurrence_parameters['condition_occurrence_filter'] = ""
condition_occurrence_parameters['condition_restriction_before_index_date_filter'] = ""
condition_occurrence_parameters['condition_restriction_after_index_date_filter'] = "AND e.condition_start_date <= DATE_ADD(CAST(p.index_date AS date), INTERVAL 0 DAY)"
condition_occurrence_parameters['continuous_observation_filter'] = ""
for k, v in criteria.items():
if k == 'event':
condition_value = v['condition']
if type(condition_value) is str:
condition_valueset = ",".join(f'"{x}"' for x in self._CONDITIONS[condition_value]['codes'])
condition_occurrence_parameters['condition_filter'] = f'WHERE condition_source_value IN ({condition_valueset})'
elif type(condition_value) is list:
condition_valueset = ",".join(f'"{x}"' for x in condition_value)
condition_occurrence_parameters['condition_filter'] = f'WHERE condition_source_value IN ({condition_valueset})'
elif k == 'occurrence':
occurrence_type = list(v.keys())[0]
if occurrence_type == 'startDateBefore':
occurrence_date = v['startDateBefore']
ec_event_occurrence_filters_sql = f'WHERE c.ordinal = 1 AND c.condition_start_date < "{occurrence_date}"'
condition_occurrence_parameters['condition_occurrence_filter'] = f'WHERE c.ordinal = 1 AND c.condition_start_date < "{occurrence_date}"'
elif occurrence_type == 'startDateAfter':
occurrence_date = v['startDateAfter']
ec_event_occurrence_filters_sql = f'WHERE c.ordinal = 1 AND c.condition_start_date > "{occurrence_date}"'
condition_occurrence_parameters['condition_occurrence_filter'] = f'WHERE c.ordinal = 1 AND c.condition_start_date > "{occurrence_date}"'
elif occurrence_type == 'startDateAt':
occurrence_date = v['startDateAt']
ec_event_occurrence_filters_sql = f'WHERE c.ordinal = 1 AND c.condition_start_date = "{occurrence_date}"'
condition_occurrence_parameters['condition_occurrence_filter'] = f'WHERE c.ordinal = 1 AND c.condition_start_date = "{occurrence_date}"'
elif occurrence_type == 'startDateBetween':
occurrence_start_date = v['startDateBetween'].split('/')[0]
occurrence_end_date = v['startDateBetween'].split('/')[1]
ec_event_occurrence_filters_sql = f'WHERE c.ordinal = 1 AND c.condition_start_date >= "{occurrence_start_date}" AND c.condition_start_date <= "{occurrence_end_date}"'
condition_occurrence_parameters['condition_occurrence_filter'] = f'WHERE c.ordinal = 1 AND c.condition_start_date >= "{occurrence_start_date}" AND c.condition_start_date <= "{occurrence_end_date}"'
elif k == 'restriction':
for restriction_type, restriction_value in v.items():
if restriction_type == 'withinDaysBeforeIndexDate':
condition_occurrence_parameters['condition_restriction_before_index_date_filter'] = f'AND e.condition_start_date >= DATE_ADD(CAST(p.index_date AS date), INTERVAL -{restriction_value} DAY)'
elif restriction_type == 'withinDaysAfterIndexDate':
condition_occurrence_parameters['condition_restriction_after_index_date_filter'] = f'AND e.condition_start_date <= DATE_ADD(CAST(p.index_date AS date), INTERVAL {restriction_value} DAY)'
elif k == 'continuousObservation':
days_before = v['minDaysBeforeIndexDate'] if 'minDaysBeforeIndexDate' in v else 0
days_after = v['minDaysAfterIndexDate'] if 'minDaysAfterEntryEvent' in v else 0
ec_continuous_observation_filters_sql = f'WHERE DATE_ADD(op.observation_period_start_date, interval {days_before} DAY) <= e.condition_start_date AND DATE_ADD(e.condition_start_date, interval {days_after} DAY) <= op.observation_period_end_date'
entry_criteria_sql = textwrap.dedent(
self._ENTRY_CRITERIA.format(
event_valueset=ec_event_valueset,
event_occurrence_filters=ec_event_occurrence_filters_sql,
continuous_observation_filters=ec_continuous_observation_filters_sql))
condition_occurrence_parameters['continuous_observation_filter'] = f'WHERE DATE_ADD(op.observation_period_start_date, interval {days_before} DAY) <= e.condition_start_date AND DATE_ADD(e.condition_start_date, interval {days_after} DAY) <= op.observation_period_end_date'
return condition_occurrence_parameters
def _build_condition_entry_criteria_sql(self, condition_occurrence_parameters):
return textwrap.dedent(self._ENTRY_CRITERIA.format(
condition_filter=condition_occurrence_parameters['condition_filter'],
condition_occurrence_filters=condition_occurrence_parameters['condition_occurrence_filter'],
continuous_observation_filters=condition_occurrence_parameters['continuous_observation_filter']))
def _build_inclusion_criteria_sql(self, inclusion_criteria):
inclusion_criteria_list = []
if inclusion_criteria:
for index, inclusion_criterion in enumerate(inclusion_criteria, start=0):
ic_type = ""
ic_event_valueset = ""
ic_event_occurrence_filters_sql = ""
ic_event_occurrence_before_index_date_sql = ""
ic_event_occurrence_after_index_date_sql = "AND e.condition_start_date <= DATE_ADD(CAST(p.index_date AS date), INTERVAL 0 DAY)"
ic_demographics_filters = {}
for k, v in inclusion_criterion.items():
if k == 'gender':
ic_type = "DEMOGRAPHICS"
filter_template = "WHERE gender_source_value = '{value_1}'"
filter_str = filter_template.format(value_1=self._GENDERS[v]['value'])
ic_demographics_filters['gender'] = filter_str
elif k == 'ageGroup':
ic_type = "DEMOGRAPHICS"
filter_template = "WHERE EXTRACT(YEAR FROM index_date)-year_of_birth >= {value_1} AND EXTRACT(YEAR FROM index_date)-year_of_birth <= {value_2}"
filter_str = filter_template.format(
value_1=self._AGE_GROUPS[v]['start'],
value_2=self._AGE_GROUPS[v]['end'])
ic_demographics_filters['age'] = filter_str
elif k == 'ageRange':
ic_type = "DEMOGRAPHICS"
if not re.match(r"[0-9]*-[0-9]*", v): raise ValueError("Unable to parse value range expression. Some valid examples: -22, or 16-22, or 22-")
filter_template = "WHERE EXTRACT(YEAR FROM index_date)-year_of_birth >= {value_1} AND EXTRACT(YEAR FROM index_date)-year_of_birth <= {value_2}"
filter_str = filter_template.format(
value_1=int(v.split('-')[0]) if v.split('-')[0] != '' else 0,
value_2=int(v.split('-')[1]) if v.split('-')[1] != '' else 999)
ic_demographics_filters['age'] = filter_str
elif k == 'ageAt':
ic_type = "DEMOGRAPHICS"
filter_template = "WHERE EXTRACT(YEAR FROM index_date)-year_of_birth = {value_1}"
filter_str = filter_template.format(value_1=v)
ic_demographics_filters['age'] = filter_str
elif k == 'event':
ic_type = "CONDITION_OCCURRENCE"
event_type = list(v.keys())[0]
if event_type == 'condition':
event_value = v['condition']
if type(event_value) is str:
ic_event_valueset=",".join(f'"{x}"' for x in self._CONDITIONS[event_value]['codes'])
elif type(event_value) is list:
ic_event_valueset=",".join(f'"{x}"' for x in event_value)
elif k == 'occurrence':
ic_type = "CONDITION_OCCURRENCE"
for occurrence_type, occurrence_value in v.items():
if occurrence_type == 'startDateBefore':
ic_event_occurrence_filters_sql = f'WHERE c.ordinal = 1 AND c.condition_start_date < "{occurrence_value}"'
elif occurrence_type == 'startDateAfter':
ic_event_occurrence_filters_sql = f'WHERE c.ordinal = 1 AND c.condition_start_date > "{occurrence_value}"'
elif occurrence_type == 'startDateAt':
ic_event_occurrence_filters_sql = f'WHERE c.ordinal = 1 AND c.condition_start_date = "{occurrence_value}"'
elif occurrence_type == 'startDateBetween':
occurrence_start_date = occurrence_value.split('/')[0]
occurrence_end_date = occurrence_value.split('/')[1]
ic_event_occurrence_filters_sql = f'WHERE c.ordinal = 1 AND c.condition_start_date >= "{occurrence_start_date}" AND c.condition_start_date <= "{occurrence_end_date}"'
elif k == 'restriction':
ic_type = "CONDITION_OCCURRENCE"
for restriction_type, restriction_value in v.items():
if restriction_type == 'minDaysBeforeIndexDate':
ic_event_occurrence_before_index_date_sql = f'AND e.condition_start_date >= DATE_ADD(CAST(p.index_date AS date), INTERVAL -{restriction_value} DAY)'
elif restriction_type == 'minDaysAfterIndexDate':
ic_event_occurrence_after_index_date_sql = f'AND e.condition_start_date <= DATE_ADD(CAST(p.index_date AS date), INTERVAL {restriction_value} DAY)'
if ic_type == "DEMOGRAPHICS":
for demographics_type, filter_value in ic_demographics_filters.items():
if demographics_type == 'gender':
inclusion_criteria_sql = self._INCLUSION_CRITERIA_GENDER.format(
gender_filter=filter_value)
inclusion_criteria_list.append(self._INCLUSION_CRITERIA.format(
event_index=index,
inclusion_criteria=inclusion_criteria_sql))
elif demographics_type == 'age':
inclusion_criteria_sql = self._INCLUSION_CRITERIA_AGE.format(
age_filter=filter_value)
inclusion_criteria_list.append(self._INCLUSION_CRITERIA.format(
event_index=index,
inclusion_criteria=inclusion_criteria_sql))
elif ic_type == "CONDITION_OCCURRENCE":
inclusion_criteria_sql = self._INCLUSION_CRITERIA_CONDITION.format(
event_valueset=ic_event_valueset,
event_occurrence_filters=ic_event_occurrence_filters_sql,
event_occurrence_after_index_date=ic_event_occurrence_after_index_date_sql,
event_occurrence_before_index_date=ic_event_occurrence_before_index_date_sql)
inclusion_criteria_list.append(self._INCLUSION_CRITERIA.format(
event_index=index,
inclusion_criteria=inclusion_criteria_sql))
ic_type = ""
for index, inclusion_criterion in enumerate(inclusion_criteria, start=0):
for k, v in inclusion_criterion.items():
if k == 'gender':
gender_code = self._GENDERS[v]['value']
filter_sql = f'WHERE gender_source_value = "{gender_code}"'
inclusion_criteria_sql = self._build_gender_inclusion_criteria_sql(index, filter_sql)
inclusion_criteria_list.append(inclusion_criteria_sql)
elif k == 'ageGroup':
start_age = self._AGE_GROUPS[v]['start']
end_age = self._AGE_GROUPS[v]['end']
filter_sql = f'WHERE EXTRACT(YEAR FROM index_date)-year_of_birth >= {start_age} AND EXTRACT(YEAR FROM index_date)-year_of_birth <= {end_age}'
inclusion_criteria_sql = self._build_age_inclusion_criteria_sql(index, filter_sql)
inclusion_criteria_list.append(inclusion_criteria_sql)
elif k == 'ageRange':
if not re.match(r"[0-9]*-[0-9]*", v): raise ValueError("Unable to parse value range expression. Some valid examples: -22, or 16-22, or 22-")
start_age = int(v.split('-')[0]) if v.split('-')[0] != '' else 0
end_age = int(v.split('-')[1]) if v.split('-')[1] != '' else 999
filter_sql = f'WHERE EXTRACT(YEAR FROM index_date)-year_of_birth >= {start_age} AND EXTRACT(YEAR FROM index_date)-year_of_birth <= {end_age}'
inclusion_criteria_sql = self._build_age_inclusion_criteria_sql(index, filter_sql)
inclusion_criteria_list.append(inclusion_criteria_sql)
elif k == 'ageAt':
age_value = v
filter_sql = f'WHERE EXTRACT(YEAR FROM index_date)-year_of_birth = {age_value}'
inclusion_criteria_sql = self._build_age_inclusion_criteria_sql(index, filter_sql)
inclusion_criteria_list.append(inclusion_criteria_sql)
elif k == 'event':
event_type = list(v.keys())[0]
if event_type == 'condition':
condition_occurrence_parameters = self._get_condition_occurrence_parameters(inclusion_criterion)
inclusion_criteria_sql = self._build_condition_inclusion_criteria_sql(index, condition_occurrence_parameters)
inclusion_criteria_list.append(inclusion_criteria_sql)
return ",".join(ic for ic in inclusion_criteria_list)
inclusion_criteria_sql = ",".join(ic for ic in inclusion_criteria_list)
inclusion_criteria_members = [f'SELECT inclusion_rule_id, person_id, zip FROM inclusion_event_{i}' for i in range(len(inclusion_criteria_list))]
inclusion_event_sql = self._INCLUSION_EVENT.format(
union_inclusion_criteria="\nUNION ALL\n".join(member_sql for member_sql in inclusion_criteria_members))
query = textwrap.dedent(
self._GET_COHORT_POPULATIONS.format(
observation_period=self._OBSERVATION_PERIOD.format(
person_table=self._member_table_id),
person=self._PERSON.format(
person_table=self._member_table_id,
zipcode_valueset=",".join(str(z) for z in zips)),
condition_occurrence=self._CONDITION_OCCURRENCE.format(
condition_occurrence_table=self._diagnosis_table_id,
claims_table=self._claims_table_id),
qualifying_event=entry_criteria_sql,
inclusion_criteria=inclusion_criteria_sql,
inclusion_event=inclusion_event_sql,
inclusion_criteria_count=len(inclusion_criteria_list)))
# Execute the query
print(query)
# return self.evaluate(query)
\ No newline at end of file
def _build_gender_inclusion_criteria_sql(self, index, gender_filter_sql):
inclusion_criteria_sql = self._INCLUSION_CRITERIA_GENDER.format(gender_filter=gender_filter_sql)
return self._INCLUSION_CRITERIA.format(
event_index=index,
inclusion_criteria=inclusion_criteria_sql)
def _build_age_inclusion_criteria_sql(self, index, age_filter_sql):
inclusion_criteria_sql = self._INCLUSION_CRITERIA_AGE.format(age_filter=age_filter_sql)
return self._INCLUSION_CRITERIA.format(
event_index=index,
inclusion_criteria=inclusion_criteria_sql)
def _build_condition_inclusion_criteria_sql(self, index, condition_occurrence_parameters):
inclusion_criteria_sql = self._INCLUSION_CRITERIA_CONDITION.format(
condition_filter=condition_occurrence_parameters['condition_filter'],
condition_occurrence_filters=condition_occurrence_parameters['condition_occurrence_filter'],
condition_occurrence_after_index_date=condition_occurrence_parameters['condition_restriction_after_index_date_filter'],
condition_occurrence_before_index_date=condition_occurrence_parameters['condition_restriction_before_index_date_filter'])
return self._INCLUSION_CRITERIA.format(
event_index=index,
inclusion_criteria=inclusion_criteria_sql)
def _build_inclusion_event_sql(self, inclusion_criteria):
ic_size = len(inclusion_criteria)
inclusion_criteria_members = [f'SELECT inclusion_rule_id, person_id, zip FROM inclusion_event_{i}' for i in range(ic_size)]
return self._INCLUSION_EVENT.format(
union_inclusion_criteria="\nUNION ALL\n".join(member_sql for member_sql in inclusion_criteria_members))
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment