-
Notifications
You must be signed in to change notification settings - Fork 15
/
Copy pathrstock.py
194 lines (180 loc) · 9.08 KB
/
rstock.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
# -*- coding: utf-8 -*-
import time
import datetime
import const as ct
import numpy as np
import pandas as pd
from cmysql import CMySQL
from cstock import CStock
from gevent.pool import Pool
from functools import partial
from datetime import datetime
from base.clog import getLogger
from ccalendar import CCalendar
from cstock_info import CStockInfo
from collections import OrderedDict
from base.cdate import get_day_nday_ago, delta_days, get_dates_array
from common import create_redis_obj, queue_process_concurrent_run, is_df_has_unexpected_data
RINDEX_STOCK_INFO_DB = "rstock"
class RIndexStock:
def __init__(self, dbinfo = ct.DB_INFO, redis_host = None):
self.redis = create_redis_obj() if redis_host is None else create_redis_obj(host = redis_host)
self.redis_host = redis_host
self.dbname = self.get_dbname()
self.logger = getLogger(__name__)
self.cal_client = CCalendar(dbinfo = dbinfo, redis_host = redis_host, without_init = True)
self.mysql_client = CMySQL(dbinfo, self.dbname, iredis = self.redis)
if not self.mysql_client.create_db(self.get_dbname()): raise Exception("init rstock database failed")
@staticmethod
def get_dbname():
return RINDEX_STOCK_INFO_DB
def get_table_name(self, cdate):
cdates = cdate.split('-')
return "%s_day_%s_%s" % (self.get_dbname(), cdates[0], (int(cdates[1])-1)//3 + 1)
def is_date_exists(self, table_name, cdate):
if self.redis.exists(table_name):
return self.redis.sismember(table_name, cdate)
return False
def is_table_exists(self, table_name):
if self.redis.exists(self.dbname):
return self.redis.sismember(self.dbname, table_name)
return False
def create_table(self, table):
sql = 'create table if not exists %s(date varchar(10) not null,\
code varchar(10) not null,\
open float,\
high float,\
close float,\
preclose float,\
low float,\
volume float,\
amount float,\
outstanding float,\
totals float,\
adj float,\
aprice float,\
pchange float,\
turnover float,\
sai float,\
sri float,\
uprice float,\
sprice float,\
mprice float,\
lprice float,\
ppercent float,\
npercent float,\
base float,\
ibase bigint,\
breakup int,\
ibreakup bigint,\
pday int,\
profit float,\
gamekline float,\
PRIMARY KEY (date, code))' % table
return True if table in self.mysql_client.get_all_tables() else self.mysql_client.create(sql, table)
def get_k_data_in_range(self, start_date, end_date):
ndays = delta_days(start_date, end_date)
date_dmy_format = time.strftime("%m/%d/%Y", time.strptime(start_date, "%Y-%m-%d"))
data_times = pd.date_range(date_dmy_format, periods=ndays, freq='D')
date_only_array = np.vectorize(lambda s: s.strftime('%Y-%m-%d'))(data_times.to_pydatetime())
data_dict = OrderedDict()
for _date in date_only_array:
if self.cal_client.is_trading_day(_date):
table_name = self.get_table_name(_date)
if table_name not in data_dict: data_dict[table_name] = list()
data_dict[table_name].append(str(_date))
all_df = pd.DataFrame()
for key in data_dict:
table_list = sorted(data_dict[key], reverse=False)
if len(table_list) == 1:
df = self.get_data(table_list[0])
if df is not None: all_df = all_df.append(df)
else:
start_date = table_list[0]
end_date = table_list[len(table_list) - 1]
df = self.get_data_between(start_date, end_date)
if df is not None: all_df = all_df.append(df)
return all_df
def get_data_between(self, start_date, end_date):
#start_date and end_date should be in the same table
sql = "select * from %s where date between \"%s\" and \"%s\"" % (self.get_table_name(start_date), start_date, end_date)
return self.mysql_client.get(sql)
def get_data(self, cdate = datetime.now().strftime('%Y-%m-%d')):
sql = "select * from %s where date=\"%s\"" % (self.get_table_name(cdate), cdate)
return self.mysql_client.get(sql)
def get_stock_data(self, cdate, code):
return (code, CStock(code).get_k_data(cdate))
def generate_all_data_1(self, cdate, black_list = ct.BLACK_LIST):
failed_list = CStockInfo(redis_host = self.redis_host).get().code.tolist()
if len(black_list) > 0: failed_list = list(set(failed_list).difference(set(black_list)))
cfunc = partial(self.get_stock_data, cdate)
return queue_process_concurrent_run(cfunc, failed_list, num = 500)
def generate_all_data(self, cdate, black_list = ct.BLACK_LIST):
obj_pool = Pool(5000)
failed_list = CStockInfo(redis_host = self.redis_host).get().code.tolist()
if len(black_list) > 0:
failed_list = list(set(failed_list).difference(set(black_list)))
all_df = pd.DataFrame()
last_length = len(failed_list)
cfunc = partial(self.get_stock_data, cdate)
while last_length > 0:
self.logger.info("all stock list:%s, cdate:%s", len(failed_list), cdate)
for code_data in obj_pool.imap_unordered(cfunc, failed_list):
if code_data[1] is not None:
tem_df = code_data[1]
tem_df['code'] = code_data[0]
all_df = all_df.append(tem_df)
failed_list.remove(code_data[0])
if len(failed_list) != last_length:
self.logger.debug("last failed list:%s, current failed list:%s" % (last_length, len(failed_list)))
last_length = len(failed_list)
else:
if last_length > 0: time.sleep(600)
obj_pool.join(timeout = 5)
obj_pool.kill()
all_df = all_df.drop_duplicates()
all_df = all_df.sort_values(by = 'date', ascending= True)
all_df = all_df.reset_index(drop = True)
return all_df
def update(self, end_date = datetime.now().strftime('%Y-%m-%d'), num = 30):
#if end_date == datetime.now().strftime('%Y-%m-%d'): end_date = get_day_nday_ago(end_date, num = 1, dformat = "%Y-%m-%d")
start_date = get_day_nday_ago(end_date, num = num, dformat = "%Y-%m-%d")
date_array = get_dates_array(start_date, end_date)
succeed = True
for mdate in date_array:
if self.cal_client.is_trading_day(mdate):
if not self.set_day_data(mdate):
self.logger.error("set %s data for rstock failed" % mdate)
succeed = False
return succeed
def set_day_data(self, cdate):
table_name = self.get_table_name(cdate)
if not self.is_table_exists(table_name):
if not self.create_table(table_name):
self.logger.error("create tick table failed")
return False
self.redis.sadd(self.dbname, table_name)
if self.is_date_exists(table_name, cdate):
self.logger.debug("existed table:%s, date:%s" % (table_name, cdate))
return True
df = self.generate_all_data(cdate)
if df is None: return False
if is_df_has_unexpected_data(df): return False
if self.mysql_client.set(df, table_name):
self.redis.sadd(table_name, cdate)
return True
return False
if __name__ == '__main__':
start_date = '2019-12-02'
end_date = '2019-12-06'
ris = RIndexStock(dbinfo = ct.OUT_DB_INFO, redis_host = '127.0.0.1')
sdata = ris.get_data(start_date)
sdata = sdata.loc[(sdata.profit > 0) & (sdata.pday > 0)]
sset = set(sdata.code.tolist())
edata = ris.get_data(end_date)
edata = edata.loc[(edata.profit > 0) & (edata.pday > 0)]
eset = set(edata.code.tolist())
print("新增股票")
print(eset - sset)
print("减少股票")
print(sset - eset)