-
Notifications
You must be signed in to change notification settings - Fork 7
/
04_var_aggregation.py
135 lines (99 loc) · 4.31 KB
/
04_var_aggregation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# Databricks notebook source
# MAGIC %md
# MAGIC # VaR Aggregation
# MAGIC In this notebook, we demonstrate the versatile nature of our model carlo simulation on **Delta Lake**. Stored in its most granular form, analysts have the flexibility to slice and dice their data to aggregate value-at-risk on demand via aggregated vector functions from **Spark ML**.
# COMMAND ----------
# MAGIC %run ./config/configure_notebook
# COMMAND ----------
from utils.var_udf import weighted_returns
trials_df = spark.read.table(config['database']['tables']['mc_trials'])
simulation_df = (
trials_df
.join(spark.createDataFrame(portfolio_df), ['ticker'])
.withColumn('weighted_returns', weighted_returns('returns', 'weight'))
)
# COMMAND ----------
# MAGIC %md
# MAGIC ## Point in time VaR
# MAGIC With all our simulations stored with finest granularity, we can access a specific slice for a given day and retrieve the associated value at risk as a simple quantile function. We aggregate trial vectors across our entire portfolio using the built in function of spark ML, `Summarizer`.
# COMMAND ----------
from pyspark.sql import functions as F
min_date = trials_df.select(F.min('date').alias('date')).toPandas().iloc[0].date
# COMMAND ----------
from pyspark.ml.stat import Summarizer
point_in_time_vector = (
simulation_df
.filter(F.col('date') == min_date)
.groupBy('date')
.agg(Summarizer.sum(F.col('weighted_returns')).alias('returns'))
.toPandas().iloc[0].returns.toArray()
)
# COMMAND ----------
from utils.var_viz import plot_var
plot_var(point_in_time_vector, 99)
# COMMAND ----------
# MAGIC %md
# MAGIC ## Evolution of risk exposure
# MAGIC The same can be achieved at scale, over our entire trading history. For each date, we aggregate all trial vectors and extract the worst 1% of events
# COMMAND ----------
from utils.var_udf import get_var_udf
risk_exposure = (
simulation_df
.groupBy('date')
.agg(Summarizer.sum(F.col('weighted_returns')).alias('returns'))
.withColumn('var_99', get_var_udf(F.col('returns'), F.lit(99)))
.drop('returns')
.orderBy('date')
.toPandas()
)
# COMMAND ----------
import matplotlib.pyplot as plt
plt.figure(figsize=(20,8))
plt.plot(risk_exposure['date'], risk_exposure['var_99'])
plt.title('VaR across all portfolio')
plt.ylabel('value at risk')
plt.xlabel('date')
plt.show()
# COMMAND ----------
# MAGIC %md
# MAGIC ## Slice and Dice
# MAGIC The main advantage of leaving monte carlo data in its finest granularity is the ability to slice and dice and visualize different segments, industries, countries. Using optimized delta tables, portfolio managers and risk analysts could efficiently run what-if scenario, adhoc analysis, such as value at risk aggregation by country of operation
# COMMAND ----------
risk_exposure_country = (
simulation_df
.groupBy('date', 'country')
.agg(Summarizer.sum(F.col('weighted_returns')).alias('returns'))
.withColumn('var_99', get_var_udf(F.col('returns'), F.lit(99)))
.drop('returns')
.orderBy('date')
.toPandas()
)
# COMMAND ----------
fig, ax = plt.subplots(figsize=(20,8))
for label, df in risk_exposure_country.groupby('country'):
df.plot.line(x='date', y='var_99', ax=ax, label=label)
plt.title('VaR by country')
plt.ylabel('value at risk')
plt.xlabel('date')
plt.show()
# COMMAND ----------
# MAGIC %md
# MAGIC The same can be translated as a risk contribution by industry for a given country. How much of my overall risk is linked to my investment in the mining industry? How would I reduce this exposure by rebalancing my portfolio?
# COMMAND ----------
risk_exposure_industry = (
simulation_df
.filter(F.col('country') == 'PERU')
.groupBy('date', 'industry')
.agg(Summarizer.sum(F.col('weighted_returns')).alias('returns'))
.withColumn('var_99', get_var_udf(F.col('returns'), F.lit(99)))
.drop('returns')
.orderBy('date')
.toPandas()
)
# COMMAND ----------
import pandas as pd
import numpy as np
risk_contribution_country = pd.crosstab(risk_exposure_industry['date'], risk_exposure_industry['industry'], values=risk_exposure_industry['var_99'], aggfunc=np.sum)
risk_contribution_country = risk_contribution_country.div(risk_contribution_country.sum(axis=1), axis=0)
risk_contribution_country.plot.bar(figsize=(20,8), colormap="Pastel1", stacked=True, width=0.9)
# COMMAND ----------