Skip to content

Commit

Permalink
Merge pull request #2 from istat-methodology/dev
Browse files Browse the repository at this point in the history
Refactored App
  • Loading branch information
liminalfever authored Nov 25, 2024
2 parents 58dcd72 + 5796ebf commit a50e7e6
Show file tree
Hide file tree
Showing 8 changed files with 700 additions and 487 deletions.
170 changes: 170 additions & 0 deletions batch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
import json
import polars as pl
import logging
from modules import params

logging.basicConfig(
level=logging.INFO,
format="%(asctime)s.%(msecs)03d %(levelname)s %(module)s - %(funcName)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)

class BatchProcessing():

def __init__(self):
self.dfs = {}
self.output = {}

def _load_data(self, path):
logging.info(f"[LOAD PARQUET] Loading Parquet file from \"{path}\"")
self.dfs['-'] = pl.read_parquet(path)
logging.info("[LOAD PARQUET] Parquet file loaded succesfully!")

def _classify_sentiment(self, score):
if score == None:
return None
if score <= params.SENTIMENT_THRESHOLDS['t1']:
sentiment = 'Strongly Negative'
return sentiment
elif score > params.SENTIMENT_THRESHOLDS['t1'] and score <= params.SENTIMENT_THRESHOLDS['t2']:
sentiment = 'Negative'
return sentiment
elif score > params.SENTIMENT_THRESHOLDS['t2'] and score <= params.SENTIMENT_THRESHOLDS['t3']:
sentiment = 'Neutral'
return sentiment
elif score > params.SENTIMENT_THRESHOLDS['t3'] and score <= params.SENTIMENT_THRESHOLDS['t4']:
sentiment = 'Positive'
return sentiment
else:
sentiment = 'Strongly Positive'
return sentiment

def _add_sentiment_class(self):
logging.info('[SENTIMENT_CLASS] Extracting sentiment classes...')
self.dfs['-'] = self.dfs['-'].with_columns(
pl.col('SENTIMENT_SCORE').map_elements(
self._classify_sentiment, return_dtype=pl.String
).alias('SENTIMENT_CLASS')
)
logging.info('[SENTIMENT CLASS] Sentiment classes extracted succesfully!')

def _get_subsets(self):
logging.info(f"[WORD LIST TO STRING] Converting word lists to strings...")
self.dfs['-'] = self.dfs['-'].with_columns(
pl.col("WORD_LIST").cast(pl.List(pl.String)).list.join(", ").alias("TEXT_STR")
)
for group in params.SEMANTIC_GROUPS['GROUPS'].values():
if group:
logging.info(f"[DATA FILTERING] Filtering series by group \"{group}\"...")
self.dfs[group] = self.dfs['-'].filter(pl.col('TEXT_STR').str.contains(group))

def _aggregate_series(self):
self.aggregate_dfs = {}
for group, df in self.dfs.items():
logging.info(f"[SERIES AGGREGATION] Aggregating series for group \"{group}\"...")
grouped_df = df.group_by('DATE', maintain_order=True).agg([
pl.col('SENTIMENT_SCORE').mean(),
pl.col('SENTIMENT_CLASS').value_counts(),
pl.col('TEXT_STR').len().alias('TOTAL_COUNT')
])
sentiment_exploded = grouped_df.explode('SENTIMENT_CLASS').unnest('SENTIMENT_CLASS')
sentiment_pivot = sentiment_exploded.pivot(
values="count",
index="DATE",
on=["SENTIMENT_CLASS"],
aggregate_function="sum"
)
sentiment_pivot = sentiment_pivot.with_columns(
[pl.col(col).cast(pl.Int64).alias(col) for col in df['SENTIMENT_CLASS'].unique()]
)
self.aggregate_dfs[group] = sentiment_pivot.with_columns(
grouped_df["SENTIMENT_SCORE"],
(((sentiment_pivot["Positive"] + sentiment_pivot["Strongly Positive"]) -
(sentiment_pivot["Negative"] + sentiment_pivot["Strongly Negative"])) /
grouped_df["TOTAL_COUNT"]).alias('QUALITATIVE_SCORE'),
grouped_df["TOTAL_COUNT"].cast(pl.Int64)
)

def _get_quarters(self):
logging.info("[EXTRACT QUARTERS] Extracting quarters...")
self.freq_dfs = {}
for group, df in self.dfs.items():
self.freq_dfs[group] = {}
df = df.select(['DATE', 'SENTIMENT_CLASS', 'WORD_LIST'])
df = df.with_columns(pl.col("DATE").str.strptime(pl.Date, "%Y-%m-%d"))
self.freq_dfs[group] = df.with_columns(("Q" + pl.col("DATE").dt.quarter().cast(pl.Utf8) + pl.col("DATE").dt.year().cast(pl.Utf8)).alias("QUARTER"))

def _word_frequencies(self, n_top_words):
logging.info('[WORD FREQUENCY] Starting word frequency procedure.')
self.word_freqs = {}

for group, df in self.freq_dfs.items():
self.word_freqs[group] = {}

if group == '-':
classes = list(df['SENTIMENT_CLASS'].unique()) + ['Total']
else:
classes = ['Total']

for quarter in df["QUARTER"].unique():
self.word_freqs[group][quarter] = {}
quarter_df = df.filter(pl.col("QUARTER") == quarter)

for sent_class in classes:
logging.info(f'[WORD FREQUENCY] Processing "{sent_class} | {quarter}" for group "{group}"...')
self.word_freqs[group][quarter][sent_class] = {}

if sent_class != 'Total':
sent_df = quarter_df.filter(pl.col("SENTIMENT_CLASS") == sent_class)
else:
sent_df = quarter_df

# explode the word_list column
sent_df = sent_df.explode("WORD_LIST").rename({"WORD_LIST": "WORD"})

# count the word frequencies
word_counts = sent_df.group_by(['QUARTER', 'WORD']).len().rename({'len': 'FREQUENCY'})

# extract the top 30 words
top_words = (
word_counts
.sort(["QUARTER", "FREQUENCY"], descending=[False, True])
.group_by("QUARTER")
.head(n_top_words)
)

# create a dictionary where "key" = word, and "value" = count
df_temp = top_words.select('WORD', 'FREQUENCY').to_dict(as_series=False)
self.word_freqs[group][quarter][sent_class] = dict(zip(df_temp['WORD'], df_temp['FREQUENCY']))

def _export(self, output_dir: str):
logging.info(f'[EXPORT] Exporting data to \"{output_dir}\"...')
time_series = {}

for group, df in self.aggregate_dfs.items():
time_series[group] = df.sort('DATE').to_dict(as_series=False)

self.output['time_series'] = time_series
self.output['word_frequencies'] = self.word_freqs

with open(output_dir + 'dashboard_data.json', 'w') as fp:
json.dump(self.output, fp)

def run(self, path_to_file: str, n_top_words: int = 30, output_dir: str = 'data/'):
self._load_data(path_to_file)
self._add_sentiment_class()
self._get_subsets()
self._aggregate_series()
self._get_quarters()
self._word_frequencies(n_top_words)
self._export(output_dir)
logging.info("[COMPLETED] Batch procedure completed succesfully!")


if __name__ == "__main__":
path_to_file = "data/full_polars_df.parquet"
output_dir = "data/"
n_top_words = 30

processor = BatchProcessing()
processor.run(path_to_file, n_top_words=n_top_words, output_dir=output_dir)
32 changes: 21 additions & 11 deletions main.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,32 @@
import streamlit as st
from modules import bits, tabs, params
import streamlit_nested_layout
from modules import bits, plots

bits.set_configs()
bits.set_session_states()
bits.set_header()
bits.set_session_states()
bits.load_data()
bits.global_filters()
bits.get_filtered_data()

overview_tab, frequency_tab = st.tabs(['Overview', 'Frequency Analysis'])

if st.session_state['data_ready']:
with overview_tab:
tabs.OverviewTab().add()
with overview_tab:
col1, col2 = st.columns(2)
with col1:
plots.VolumePlot().add()
with col2:
plots.QuantitativePlot().add()

with frequency_tab:
tabs.FrequencyTab().add()
with frequency_tab:
col1, col2 = st.columns(2)
with col1:
plots.QualitativePlot().add()
with col2:
plots.SentimentClassTS().add()

with st.sidebar:
tabs.Sidebar().add()
else:
st.warning(params.WARNINGS['no_data'])
col3, col4 = st.columns(2)
with col3:
plots.WordFrequencyPlot().add()
with col4:
plots.SentimentPie().add()
83 changes: 47 additions & 36 deletions modules/bits.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,19 @@
import streamlit as st
from modules.utils import Corpus
from modules.utils import Data
from modules import params

def set_configs():
st.set_page_config(page_title=params.PAGE_CONFIGS['TITLE'], layout=params.PAGE_CONFIGS['LAYOUT'], initial_sidebar_state=params.PAGE_CONFIGS['SIDEBAR'])

def load_data():
if st.session_state['initialized'] is False:
with st.spinner("Caricamento dei dati in corso..."):
st.session_state['data'] = Data(
path = params.PATHS['JSON_PATH']
)
st.session_state['initialized'] = True
st.toast("Dati caricati con successo!", icon="✅")

def set_session_states():
for state, default in params.SESSION_STATES.items():
if state not in st.session_state:
Expand All @@ -15,44 +24,46 @@ def set_header():
st.markdown(params.HEADER['SUBTITLE'])

def global_filters():
col1, col2, col3, _ = st.columns([1, 1, 2, 4])
with col1:
st.date_input('Da', params.GLOBAL_FILTERS['default_start_date'], min_value=params.GLOBAL_FILTERS['min_start_date'], max_value=st.session_state['end_date'], key='start_date')
with col2:
st.date_input('A', min_value=st.session_state['start_date'], max_value=params.GLOBAL_FILTERS['max_end_date'], key='end_date')
with col3:
year, quarter, group, _ = st.columns([2, 2, 3, 7])
with year:
st.selectbox(
label = params.YEAR_FILTER['LABEL'],
options = params.YEAR_FILTER['YEAR'],
key = params.YEAR_FILTER['KEY'],
help = params.YEAR_FILTER['HELPER']
)
with quarter:
st.selectbox(
label = params.QUARTER_FILTER['LABEL'],
options = params.QUARTER_FILTER['QUARTER'][st.session_state[params.YEAR_FILTER['KEY']]],
key = params.QUARTER_FILTER['KEY'],
help = params.QUARTER_FILTER['HELPER']
)
with group:
st.selectbox(
'Gruppo semantico',
label = params.SEMANTIC_GROUPS['LABEL'],
options=params.SEMANTIC_GROUPS['GROUPS'].keys(),
key='semantic_group_tmp',
key=params.SEMANTIC_GROUPS['KEY'],
help=params.SEMANTIC_GROUPS['HELPER']
)

def update_data():
st.session_state['daily_stats'] = st.session_state['full_df'].filter_data(
start_date=st.session_state['start_date'],
end_date=st.session_state['end_date'],
word_filter=params.SEMANTIC_GROUPS['GROUPS'][st.session_state['semantic_group_tmp']]
)
if st.session_state['semantic_group_tmp'] == '-':
st.session_state['disable_comparison'] = True
else:
st.session_state['disable_comparison'] = False
st.session_state['data_ready'] = True
st.session_state['semantic_group'] = st.session_state['semantic_group_tmp']

st.button("Filtra", on_click=update_data)

def load_data():
if st.session_state['initialized'] is False:
with st.spinner("Caricamento dei dati in corso..."):
st.session_state['full_df'] = Corpus(
path=params.LOADING_PARAMS['data_path'],
date_column=params.LOADING_PARAMS['date_column'],
sentiment_column=params.LOADING_PARAMS['sentiment_column'],
class_column=params.LOADING_PARAMS['class_column'],
text_column=params.LOADING_PARAMS['text_column']
)
st.session_state['initialized'] = True
st.toast("Dati caricati con successo!", icon="✅")
def get_filtered_data():
filtered_data = st.session_state['data'].filter_data(
year = st.session_state[params.YEAR_FILTER['KEY']],
quarter = st.session_state[params.QUARTER_FILTER['KEY']],
semantic_group = params.SEMANTIC_GROUPS['GROUPS'][st.session_state[params.SEMANTIC_GROUPS['KEY']]]
)
benchmark_data = st.session_state['data'].filter_data(
year = st.session_state[params.YEAR_FILTER['KEY']],
quarter = st.session_state[params.QUARTER_FILTER['KEY']],
semantic_group = '-'
)
if st.session_state['semantic_group'] == '-':
st.session_state['disable_comparison'] = True
else:
st.session_state['disable_comparison'] = False

st.session_state['filtered_data'] = {
'data': filtered_data,
'benchmark': benchmark_data
}
Loading

0 comments on commit a50e7e6

Please sign in to comment.