-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathdatabase_functions.py
173 lines (148 loc) · 6.13 KB
/
database_functions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
# Goal: provide database functions for plug-and-play database use instead of flat files
from static import PATH_START, PATH_START_PERSONAL
from static import PATH_START_SERVER , PATH_START_PERSONAL_SERVER
from nlp_functions import faculty_finder
from static import UNPAYWALL_EMAIL
import pandas as pd
import calendar
import numpy as np
import requests
from pybliometrics.scopus import ScopusSearch
from pybliometrics.scopus import AbstractRetrieval
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import ProcessPoolExecutor
import time
import datetime
import re
import mysql.connector
from mysql.connector import Error
from mysql.connector import errorcode # new
from datetime import datetime # new
import calendar # new
# imports from our own import framework
from nlp_functions import faculty_finder
from nlp_functions import get_dist
from nlp_functions import corresponding_author_functions
#
from core_functions import add_year_and_month
from core_functions import get_scopus_abstract_info
from .core_functions import get_first_chosen_affiliation_author # ! i want all vu authors now : )
from .core_functions import add_unpaywall_columns
from .core_functions import my_timestamp
from .core_functions import add_deal_info
from .core_functions import make_types_native_basic
# not sure if used at all, test it
#
class NumpyMySQLConverter(mysql.connector.conversion.MySQLConverter):
""" A mysql.connector Converter that handles Numpy types """
def _float32_to_mysql(self, value):
return float(value)
def _float64_to_mysql(self, value):
return float(value)
def _int32_to_mysql(self, value):
return int(value)
def _int64_to_mysql(self, value):
return int(value)
def pre_process_for_push(df_to_upload, primary_key_start):
# timestamp
### df_to_upload['Unnamed: 0'] = formatted_date
#
# id
df_to_upload = df_to_upload.reset_index().rename(columns={'index': 'id'})
df_to_upload['id'] = df_to_upload.id + primary_key_start + 1 # ! primary key
#
# cut columns
df_to_upload = df_to_upload[['id',
'aggregationType',
'creator',
'doi',
'ff_match',
'ff_match_subgroup',
'first_affil_author',
'fund_sponsor',
'publicationName',
'upw_oa_color_category',
'deal_name',
'deal_discount_verbose',
'upw_oa_color_verbose',
'deal_owner_verbose']]
# edit types (will edit again later)
df_to_upload.id = df_to_upload.id.astype('int')
df_to_upload = df_to_upload.fillna('nan') # ?
return df_to_upload
def process_df_to_list_to_push(df_to_upload):
lst_to_push = []
for ii in np.arange(0, len(df_to_upload)):
lst_to_push.append(tuple(make_types_native_basic(df_to_upload.iloc[ii].to_list())))
return lst_to_push
def get_connection(host, database, user, pw):
connection = mysql.connector.connect(host=host,
database=database,
user=user,
password=pw)
return connection
def push_df_to_db(connection, df_to_upload):
# only works for test-situation
# care: id increment is dealt with elsewhere (not nice)
try:
connection.set_converter_class(NumpyMySQLConverter) # avoid write issue with dtype ### try without please
cursor = connection.cursor()
# try upload id too...
#
mySql_insert_query = """INSERT INTO oadata (id,
aggregationType,
creator,
doi,
ff_match,
ff_match_subgroup,
first_affil_author,
fund_sponsor,
publicationName,
upw_oa_color_category,
deal_name,
deal_discount_verbose,
upw_oa_color_verbose,
deal_owner_verbose)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """
records_to_insert = process_df_to_list_to_push(df_to_upload)
cursor.executemany(mySql_insert_query, records_to_insert)
connection.commit()
print(cursor.rowcount, "Record inserted successfully into table")
except mysql.connector.Error as error:
print("Failed to insert record into MySQL table {}".format(error))
finally:
if (connection.is_connected()):
cursor.close()
connection.close()
print("MySQL connection is closed")
def run_query(connection, query):
# no idea if this works for all reads
silent = False
try:
sql_select_Query = query # "select * from oadata"
cursor = connection.cursor()
cursor.execute(sql_select_Query)
records = cursor.fetchall()
df_t = pd.DataFrame(records) # .rename(columns={0: 'eid', 1: 'pub_uuid'})
if silent is False:
print("Total number of rows is: ", cursor.rowcount)
success = True
except Error as e:
# always print this, later also add logging
print("Error reading data from MySQL table", e)
print('returning None')
df_t = None
success = False
finally:
if (connection.is_connected()):
connection.close()
cursor.close()
if silent is False:
print("MySQL connection is closed")
return df_t, success
def get_last_primary_key(connection):
df_pull, success = run_query(connection, "SELECT max(id) FROM mydb.oadata")
last_primary_key = None
if success:
last_primary_key = df_pull.iloc[0, 0]
return last_primary_key, success