forked from internetarchive/ia2fil
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathdatabase.py
160 lines (136 loc) · 5.68 KB
/
database.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
import os
import pandas as pd
import psycopg2
import streamlit as st
from utils import int_client_id
def client_id_query(client_ids):
return " OR ".join(["client_id = '{client_id}'".format(client_id=int_client_id(i)) for i in client_ids])
# Database queries
def top_clients_for_last_week(first_day, last_day, top_n=10):
return load_oracle(
"""
SELECT client_id, SUM((1::BIGINT << claimed_log2_size) / 1024 / 1024 / 1024) AS size
FROM published_deals
WHERE (status = 'active' OR status = 'published')
AND ts_from_epoch(sector_start_rounded) BETWEEN '{fday}' AND '{lday}'
GROUP BY client_id
ORDER BY size DESC
LIMIT '{top_n}';
""".format(fday=first_day, lday=last_day, top_n=top_n)
).rename(columns={"client_id": "Client ID", "size": "Onchain"})
def active_or_published_daily_size(first_day, last_day, client_ids):
df = load_oracle(
"""
SELECT sq.client_id as client_id, DATE_TRUNC('day', sq.ts_from_epoch) AS dy, SUM((1::BIGINT << sq.claimed_log2_size) / 1024 / 1024 / 1024) AS size, COUNT(sq.claimed_log2_size) AS pieces
FROM (
SELECT client_id, piece_id, ts_from_epoch(sector_start_rounded), claimed_log2_size
FROM published_deals
WHERE
"""
+ client_id_query(client_ids)
+
"""
AND (status = 'active' OR status = 'published')
AND ts_from_epoch(sector_start_rounded) BETWEEN '{fday}' AND '{lday}'
ORDER BY piece_id, entry_created
) sq
GROUP BY sq.client_id, DATE_TRUNC('day', sq.ts_from_epoch);
""".format(fday=first_day, lday=last_day)
).rename(columns={"dy": "PTime", "size": "Onchain", "pieces": "Pieces"})
df["Day"] = pd.to_datetime(df.PTime).dt.tz_localize(None)
df["client_id"] = "f0" + df["client_id"].astype(str)
return df
def total_active_or_published_daily_size():
df = load_oracle(
"""
SELECT DATE_TRUNC('day', sq.ts_from_epoch) AS dy, SUM((1::BIGINT << sq.claimed_log2_size) / 1024 / 1024 / 1024) AS size, COUNT(sq.claimed_log2_size) AS pieces
FROM (
SELECT piece_id, ts_from_epoch(sector_start_rounded), claimed_log2_size
FROM published_deals
WHERE (status = 'active' OR status = 'published')
ORDER BY piece_id, entry_created
) sq
GROUP BY DATE_TRUNC('day', sq.ts_from_epoch);
"""
).rename(columns={"dy": "PTime", "size": "Onchain", "pieces": "Pieces"})
df["Day"] = pd.to_datetime(df.PTime).dt.tz_localize(None)
return df
def copies_count_size(first_day, last_day, client_ids):
return load_oracle(
"""
SELECT sq.copies, COUNT(sq.copies), SUM((1::BIGINT << sq.sz) / 1024 / 1024 / 1024) AS size
FROM (
SELECT COUNT(piece_id) AS copies, MAX(claimed_log2_size) AS sz
FROM published_deals
WHERE
"""
+ client_id_query(client_ids)
+
"""
AND (status = 'active' OR status = 'published')
AND ts_from_epoch(sector_start_rounded) BETWEEN '{fday}' AND '{lday}'
GROUP BY piece_id
) sq
GROUP BY copies;
""".format(fday=first_day, lday=last_day)
).rename(columns={"copies": "Copies", "count": "Count", "size": "Size"})
def provider_item_counts(first_day, last_day, client_ids):
return load_oracle(
"""
SELECT provider_id, count(1) AS cnt
FROM published_deals
WHERE
"""
+ client_id_query(client_ids)
+
"""
AND ts_from_epoch(sector_start_rounded) BETWEEN '{fday}' AND '{lday}'
GROUP BY provider_id
ORDER BY cnt DESC;
""".format(fday=first_day, lday=last_day)
).rename(columns={"provider_id": "Provider", "cnt": "Count"})
def deal_count_by_status(first_day, last_day, client_ids):
return load_oracle(
"""
SELECT status, count(1)
FROM published_deals
WHERE
"""
+ client_id_query(client_ids)
+
"""
AND ts_from_epoch(sector_start_rounded) BETWEEN '{fday}' AND '{lday}'
GROUP BY status;
""".format(fday=first_day, lday=last_day)
).rename(columns={"status": "Status", "count": "Count"})
def terminated_deal_count_by_reason(first_day, last_day, client_ids):
return load_oracle(
"""
SELECT published_deal_meta->>'termination_reason' AS reason, count(1)
FROM published_deals
WHERE
"""
+ client_id_query(client_ids)
+
"""
AND status = 'terminated'
AND ts_from_epoch(sector_start_rounded) BETWEEN '{fday}' AND '{lday}'
GROUP BY reason;
""".format(fday=first_day, lday=last_day)
).rename(columns={"reason": "Reason", "count": "Count"}) \
.replace("deal no longer part of market-actor state", "expired") \
.replace("entered on-chain final-slashed state", "slashed")
def index_age():
return load_oracle(
"""
SELECT ts_from_epoch( ( metadata->'market_state'->'epoch' )::INTEGER )
FROM global;
"""
)
@st.cache_data(ttl=3600, show_spinner="Loading Oracle Results...")
def load_oracle(dbq):
DBSP = "SET SEARCH_PATH = naive;"
with psycopg2.connect(database=os.getenv("DBNAME"), host=os.getenv("DBHOST"), user=os.getenv("DBUSER"),
password=os.getenv("DBPASS"), port=os.getenv("DBPORT")) as conn:
conn.cursor().execute(DBSP)
return pd.read_sql_query(dbq, conn)