Skip to content
Snippets Groups Projects
Commit 60ea5704 authored by Josef Hardi's avatar Josef Hardi
Browse files

Initial commit

parents
No related branches found
No related tags found
No related merge requests found
"""
"""
import os
import re
import textwrap
from redivis import bigquery
class Optum:
"""
"""
# Google BigQuery Client
_client = bigquery.Client()
# Variables in the Optum database
_VARIABLES = {
'claim_id': 'Clmid',
'claim_date': 'Fst_Dt',
'eligibility_start_date': 'Eligeff',
'eligibility_end_date': 'Eligend',
'person_id': 'Patid',
'gender': 'Gdr_Cd',
'birth_year': 'Yrdob',
'zip': 'Zipcode_5',
'diagnosis_code': 'Diag',
'icd_flag': 'Icd_Flag'
}
# Tables in the Optum database
# _TABLES = {
# 'person': 'Optum ZIP5 Member',
# 'diagnosis': 'Optum ZIP5 Medical Diagnosis',
# 'drug': 'Optum ZIP5 Rx Pharmacy',
# 'claims': 'Optum ZIP5 Medical Claims'
# }
# Tables in the Optum database for 1% sample
_TABLES = {
'person': '23224',
'diagnosis': '23212',
'drug': '23228',
'claims': '23236'
}
# Value set for human genders
_GENDERS = {
'Male': {
'name': 'Gender Male',
'value': 'M'
},
'Female': {
'name': 'Gender Female',
'value': 'F'
}
}
# Value set for age groups
_AGE_GROUPS = {
'Under5Years': {
'name': 'Under 5 years',
'start': 0,
'end': 4,
'unitOfMeasurement': 'Year'
},
'Under18Years': {
'name': 'Under 18 years',
'start': 0,
'end': 17,
'unitOfMeasurement': 'Year'
},
'Years5To17': {
'name': '5 to 17 years',
'start': 5,
'end': 17,
'unitOfMeasurement': 'Year'
},
'Years18To24': {
'name': '18 to 24 years',
'start': 18,
'end': 24,
'unitOfMeasurement': 'Year'
},
'Years18To44': {
'name': '18 to 44 years',
'start': 18,
'end': 44,
'unitOfMeasurement': 'Year'
},
'Years25To44': {
'name': '25 to 44 years',
'start': 25,
'end': 44,
'unitOfMeasurement': 'Year'
},
'Years45To64': {
'name': '45 to 64 years',
'start': 45,
'end': 64,
'unitOfMeasurement': 'Year'
},
'Above65Years': {
'name': 'Above 65 years',
'start': 65,
'end': 999,
'unitOfMeasurement': 'Year'
}
}
# Value set for disease conditions
_CONDITIONS = {
'Asthma': {
'name': 'Asthma',
'vocabulary': 'ICD9',
'codes': ( "49300", "49301", "49302", "49310", "49311", "49312", "49320", "49321", "49322", "49382", "49390", "49391", "49392" )
}
}
# SQL template for getting a population within zip codes
_GET_POPULATIONS_TEMPLATE = """
-- START 'Get Population Within ZIP Codes' >>>>
WITH populations_within_zips AS (
SELECT DISTINCT
t1.person_id,
t1.gender_source_value,
t1.year_of_birth,
t1.zip,
t1.observation_period_start_date,
t1.observation_period_end_date
FROM (
SELECT
t0.{var_1} AS person_id,
t0.{var_2} AS gender_source_value,
t0.{var_3} AS year_of_birth,
t0.{var_4} AS observation_period_start_date,
t0.{var_5} AS observation_period_end_date,
CAST(CASE
WHEN (STRPOS(t0.{var_6}, '_') = 0) THEN t0.{var_6}
ELSE SUBSTR(t0.{var_6}, 1, STRPOS(t0.{var_6}, '_') - 1)
END AS INT64) AS zip
FROM
`{table_1}` AS t0
) AS t1
WHERE
t1.zip in ({valueset_1})
)
-- <<<< END 'Get Population Within ZIP Codes'
"""
# SQL template for getting a population with a certain disease
_GET_POP_CONDITION_TEMPLATE = """
-- START 'Entry Criteria' >>>>
SELECT DISTINCT
t0.person_id,
t0.gender_source_value,
t0.year_of_birth,
t0.observation_period_start_date,
t0.observation_period_end_date,
t0.zip,
t1.{var_2} AS claim_id,
t1.{var_3} AS condition_source_value
FROM
populations_within_zips AS t0
LEFT JOIN
`{table_1}` AS t1
ON t0.person_id = t1.{var_1}
WHERE
t1.{var_3} in ({valueset_1})
AND t1.{var_4} = 9
-- <<< END 'Entry Criteria'
"""
def info(self):
"""Returns a brief description about the dataset.
Returns:
A description string about the dataset.
"""
print("Optum ZIP5 dataset v7.1")
def evaluate(self, query):
"""Returns a :obj:`pandas.DataFrame`
Args:
Returns:
"""
result = self._client.query(query)
return result.to_dataframe()
def get_populations(self, zips, index_date=None, constraining_properties=None):
"""Returns a :obj:`pandas.DataFrame` of people population located at the given \
:code:`zips`.
Args:
zips (:obj:`list`): ZIP codes representing the people's location.
index_date (:obj:`str`): The
Returns:
"""
get_populations_sql = textwrap.dedent(
self._GET_POPULATIONS_TEMPLATE.format(
var_1=self._VARIABLES['person_id'],
var_2=self._VARIABLES['gender'],
var_3=self._VARIABLES['birth_year'],
var_4=self._VARIABLES['eligibility_start_date'],
var_5=self._VARIABLES['eligibility_end_date'],
var_6=self._VARIABLES['zip'],
table_1=self._TABLES['person'],
valueset_1=",".join(str(z) for z in zips)))
query = f'{get_populations_sql}\nSELECT * FROM populations_within_zips'
if constraining_properties:
filters = {}
for k, v in constraining_properties.items():
if k == 'gender':
filter_template = "gender_source_value = '{value_1}'"
filter_str = filter_template.format(value_1=self._GENDERS[v]['value'])
filters['gender'] = filter_str
elif k == 'ageGroup':
if not index_date: raise ValueError("Constraining the patient age requires the 'index_date' argument")
filter_template = "EXTRACT(YEAR FROM DATE '{index_date}')-year_of_birth >= {value_1} AND EXTRACT(YEAR FROM DATE '{index_date}')-year_of_birth <= {value_2}"
filter_str = filter_template.format(
index_date=index_date,
value_1=self._AGE_GROUPS[v]['start'],
value_2=self._AGE_GROUPS[v]['end'])
filters['age'] = filter_str
elif k == 'ageRange':
if not re.match(r"[0-9]*-[0-9]*", v): raise ValueError("Unable to parse value range expression. Some valid examples: -22, or 16-22, or 22-")
if not index_date: raise ValueError("Constraining the patient age requires the 'index_date' argument")
filter_template = "EXTRACT(YEAR FROM DATE '{index_date}')-year_of_birth >= {value_1} AND EXTRACT(YEAR FROM DATE '{index_date}')-year_of_birth <= {value_2}"
filter_str = filter_template.format(
index_date=index_date,
value_1=int(v.split('-')[0]) if v.split('-')[0] != '' else 0,
value_2=int(v.split('-')[1]) if v.split('-')[1] != '' else 999)
filters['age'] = filter_str
elif k == 'ageAt':
if not index_date: raise ValueError("Constraining the patient age requires the 'index_date' argument")
filter_template = "EXTRACT(YEAR FROM DATE '{index_date}')-year_of_birth = {value_1}"
filter_str = filter_template.format(
index_date=index_date,
value_1=v)
filters['age'] = filter_str
filter_sql = "\n AND ".join(v for v in filters.values())
query = f'{query}\nWHERE {filter_sql}'
# Execute the query
print(query)
# return self.evaluate(query)
def get_cohort(self, zips, entry_criteria, inclusion_criteria=None, exit_criteria=None):
"""
"""
get_populations_sql = textwrap.dedent(
self._GET_POPULATIONS_TEMPLATE.format(
var_1=self._VARIABLES['person_id'],
var_2=self._VARIABLES['gender'],
var_3=self._VARIABLES['birth_year'],
var_4=self._VARIABLES['eligibility_start_date'],
var_5=self._VARIABLES['eligibility_end_date'],
var_6=self._VARIABLES['zip'],
table_1=self._TABLES['person'],
valueset_1=",".join(str(z) for z in zips)))
entry_criteria_subquery = ""
for k, v in entry_criteria.items():
if k == 'condition':
entry_criteria_subquery = textwrap.dedent(
self._GET_POP_CONDITION_TEMPLATE.format(
var_1=self._VARIABLES['person_id'],
var_2=self._VARIABLES['claim_id'],
var_3=self._VARIABLES['diagnosis_code'],
var_4=self._VARIABLES['icd_flag'],
table_1=self._TABLES['diagnosis'],
valueset_1=",".join(f'"{x}"' for x in self._CONDITIONS[v]['codes'])))
elif k == 'conditionCodes':
entry_criteria_subquery = textwrap.dedent(
self._GET_POP_CONDITION_TEMPLATE.format(
var_1=self._VARIABLES['person_id'],
var_2=self._VARIABLES['claim_id'],
var_3=self._VARIABLES['diagnosis_code'],
var_4=self._VARIABLES['icd_flag'],
table_1=self._TABLES['diagnosis'],
valueset_1=",".join(f'"{x}"' for x in v)))
primary_events_template = """
WITH primary_events AS (
{module}
SELECT
t1.person_id,
t1.gender_source_value,
t1.year_of_birth,
t1.zip,
t1.condition_source_value,
t2.index_date,
t1.observation_period_start_date,
t1.observation_period_end_date
FROM (
{subquery_1}
) AS t1
LEFT JOIN (
SELECT DISTINCT
t1.person_id,
t1.claim_id,
MIN(t1.claim_date) OVER (PARTITION BY t1.person_id, t1.claim_id) AS index_date
FROM (
SELECT
t0.{var_1} AS person_id,
t0.{var_2} AS claim_id,
t0.{var_3} AS claim_date
FROM
`{table_1}` AS t0
) AS t1
) AS t2
ON t1.person_id = t2.person_id AND t1.claim_id = t2.claim_id
AND t2.index_date >= t1.observation_period_start_date
AND t2.index_date <= t1.observation_period_end_date
WHERE DATE_ADD(t1.observation_period_start_date, interval {value_1} DAY) <= t2.index_date
AND DATE_ADD(t2.index_date, interval {value_2} DAY) <= t1.observation_period_end_date
)
"""
primary_events_sql = textwrap.dedent(
primary_events_template.format(
module=get_populations_sql,
subquery_1=entry_criteria_subquery,
var_1=self._VARIABLES['person_id'],
var_2=self._VARIABLES['claim_id'],
var_3=self._VARIABLES['claim_date'],
table_1=self._TABLES['claims'],
value_1=entry_criteria['days_prior'] if 'days_prior' in entry_criteria else 0,
value_2=entry_criteria['days_subsequent'] if 'days_subsequent' in entry_criteria else 0))
query = f'{primary_events_sql}\nSELECT * FROM primary_events'
if inclusion_criteria:
filters = {}
for k, v in inclusion_criteria.items():
if k == 'gender':
filter_template = "gender_source_value = '{value_1}'"
filter_str = filter_template.format(value_1=self._GENDERS[v]['value'])
filters['gender'] = filter_str
elif k == 'ageGroup':
filter_template = "EXTRACT(YEAR FROM index_date)-year_of_birth >= {value_1} AND EXTRACT(YEAR FROM index_date)-year_of_birth <= {value_2}"
filter_str = filter_template.format(
value_1=self._AGE_GROUPS[v]['start'],
value_2=self._AGE_GROUPS[v]['end'])
filters['age'] = filter_str
elif k == 'ageRange':
if not re.match(r"[0-9]*-[0-9]*", v): raise ValueError("Unable to parse value range expression. Some valid examples: -22, or 16-22, or 22-")
filter_template = "EXTRACT(YEAR FROM index_date)-year_of_birth >= {value_1} AND EXTRACT(YEAR FROM index_date)-year_of_birth <= {value_2}"
filter_str = filter_template.format(
value_1=int(v.split('-')[0]) if v.split('-')[0] != '' else 0,
value_2=int(v.split('-')[1]) if v.split('-')[1] != '' else 999)
filters['age'] = filter_str
elif k == 'ageAt':
filter_template = "EXTRACT(YEAR FROM index_date)-year_of_birth = {value_1}"
filter_str = filter_template.format(value_1=v)
filters['age'] = filter_str
filter_sql = "\n AND ".join(v for v in filters.values())
query = f'{query}\nWHERE {filter_sql}'
# Execute the query
return self.evaluate(query)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment