From 65efdac8ac126e5d055efd289aa107629dea236b Mon Sep 17 00:00:00 2001 From: Nigel Nindo <99314049+nigelnindodev@users.noreply.github.com> Date: Thu, 19 Oct 2023 15:57:21 +0300 Subject: [PATCH] Feat: Store Orbit Game Events To DB (#8) * Refactor main game events logic into base class (untested) * minor newline fix * Get orbit data to store (but still bug with odds calculation) * correct orbit odds avergaing --- src/core/game_events/betika/index.ts | 87 ----------------------- src/core/game_events/index.ts | 95 ++++++++++++++++++++++++++ src/core/game_events/orbit/index.ts | 12 ++++ src/core/parsers/betika/index.ts | 8 +-- src/core/parsers/orbit/index.ts | 78 +++++++++++++++++---- src/core/parsers/orbit/parser_types.ts | 4 +- src/core/scrapping/orbit/index.ts | 1 - src/testbed/testbed_3.ts | 6 +- 8 files changed, 182 insertions(+), 109 deletions(-) create mode 100644 src/core/game_events/orbit/index.ts diff --git a/src/core/game_events/betika/index.ts b/src/core/game_events/betika/index.ts index 0c26a54..d892641 100644 --- a/src/core/game_events/betika/index.ts +++ b/src/core/game_events/betika/index.ts @@ -1,17 +1,6 @@ import { BaseGameEventsProcessor } from ".."; -import { getConfig } from "../../.."; import { BetProvider } from "../../../bet_providers"; import { BetikaProvider } from "../../../bet_providers/betika"; -import { PostgresDataSourceSingleton } from "../../../datastores/postgres"; -import { insertThreeWayGameEvent } from "../../../datastores/postgres/queries/three_way_game_event"; -import { insertTwoWayGameEvent } from "../../../datastores/postgres/queries/two_way_game_event"; -import { RedisSingleton } from "../../../datastores/redis"; -import { getRedisProcessedEventsChannelName } from "../../../utils/redis"; -import { BetTypes, ProcessedGameEvents } from "../../../utils/types/common"; -import { DbThreeWayGameEvent, DbTwoWayGameEvent } from "../../../utils/types/db"; -import { Result } from "../../../utils/types/result_type"; - -const {logger} = getConfig(); export class BetikaGameEventsProcessor extends BaseGameEventsProcessor { public override betProvider: BetProvider; @@ -20,80 +9,4 @@ export class BetikaGameEventsProcessor extends BaseGameEventsProcessor { super(); this.betProvider = new BetikaProvider(); } - - public async subscribeToChannels(): Promise> { - const getBetProviderConfigResult = await this.betProvider.getConfig(); - if (getBetProviderConfigResult.result === "error") { - logger.error("Events processor failed to load config for provider: ", this.betProvider.name); - return getBetProviderConfigResult; - } - - const getPostgresDbResult = await PostgresDataSourceSingleton.getInstance(getConfig()); - if (getPostgresDbResult.result === "error") { - logger.error(`Events processor failed to get postgres connection for provider ${this.betProvider.name} with error: `,getPostgresDbResult.value.message); - return getPostgresDbResult; - } - - const getRedisSubscriberResult = await RedisSingleton.getSubscriber(); - if (getRedisSubscriberResult.result === "success") { - const betProviderConfig = getBetProviderConfigResult.value; - const results = betProviderConfig.games.map(async game => { - await getRedisSubscriberResult.value.subscribe(getRedisProcessedEventsChannelName(this.betProvider, game.name, game.betType), async message => { - const parsedMessage = JSON.parse(message) as ProcessedGameEvents; - - - /** - * TODO: Payload is really similar to DbTwoWayGameEvent / DbThreeWayGameEvent. Is there any way we can combine them? - * We are already using discriminated unions to correctly type case different game types. - */ - const innerResults = parsedMessage.data.map(async item => { - /** - * TODO: Check if games exists before attempting insert. (We shouldn't get double inserts as now as well due to unique constraints on the database) - */ - switch (item.type) { - case BetTypes.TWO_WAY: - const twoWayEventToDB: DbTwoWayGameEvent = { - betProviderName: parsedMessage.betProviderName, - betProviderId: item.betProviderId, - clubA: item.clubA, - clubB: item.clubB, - oddsAWin: item.oddsAWin, - oddsBWin: item.oddsBWin, - gameName: parsedMessage.gameName, - league: item.league, - metaData: item.meta - } - await insertTwoWayGameEvent(getPostgresDbResult.value, twoWayEventToDB); - break; - case BetTypes.THREE_WAY: - const threeWayEventToDb: DbThreeWayGameEvent = { - betProviderName: parsedMessage.betProviderName, - betProviderId: item.betProviderId, - clubA: item.clubA, - clubB: item.clubB, - oddsAWin: item.oddsAWin, - oddsBWin: item.oddsBWin, - oddsDraw: item.oddsDraw, - gameName: parsedMessage.gameName, - league: item.league, - metaData: item.meta - } - await insertThreeWayGameEvent(getPostgresDbResult.value, threeWayEventToDb); - break; - default: - const message = `Unknown bet type encountered when saving processed game events to database.`; - logger.error(message, item); - throw new Error(message); - } - }); - await Promise.all(innerResults); - }); - }); - await Promise.all(results); - return {result: "success", value: true}; - } else { - logger.error("Events processor failed to connect to redis subscriber for provider: ", this.betProvider.name); - return getRedisSubscriberResult; - } - } } \ No newline at end of file diff --git a/src/core/game_events/index.ts b/src/core/game_events/index.ts index 142e552..556ea2b 100644 --- a/src/core/game_events/index.ts +++ b/src/core/game_events/index.ts @@ -1,5 +1,100 @@ +import { getConfig } from "../.."; import { BetProvider } from "../../bet_providers"; +import { PostgresDataSourceSingleton } from "../../datastores/postgres"; +import { getThreeWayGame, insertThreeWayGameEvent } from "../../datastores/postgres/queries/three_way_game_event"; +import { getTwoWayGame, insertTwoWayGameEvent } from "../../datastores/postgres/queries/two_way_game_event"; +import { RedisSingleton } from "../../datastores/redis"; +import { getRedisProcessedEventsChannelName } from "../../utils/redis"; +import { BetTypes, ProcessedGameEvents } from "../../utils/types/common"; +import { Result } from "../../utils/types/result_type"; + +const {logger} = getConfig(); export abstract class BaseGameEventsProcessor { public abstract betProvider: BetProvider + /** + * Listener for new game events for all providers. Can use the same implementation, since the data is already + * normalized in the `parsers` stage. + * Inserts the game events into the respective game event table. + * TODO: + * - Do not double insert if event already exists (should already be blocked by database constraints) + * - Update on updated odds + * - Store outdated odds in a historical database + * @returns + */ + public async initGameEventsListener(): Promise> { + const getBetProviderConfigResult = await this.betProvider.getConfig(); + if (getBetProviderConfigResult.result === "error") { + logger.error("Event processor failed to load config for provider: ", this.betProvider.name); + return getBetProviderConfigResult; + } + + const getPostgresDbResult = await PostgresDataSourceSingleton.getInstance(getConfig()); + if (getPostgresDbResult.result === "error") { + logger.error(`Events processor failed to get postgres connection for provider ${this.betProvider.name} error: `, getPostgresDbResult.value.message); + return getPostgresDbResult; + } + + const getRedisSubscriberResult = await RedisSingleton.getSubscriber(); + if (getRedisSubscriberResult.result === "error") { + logger.error("Events processor failed to connect to redis subscriber for provider: ", this.betProvider.name); + return getRedisSubscriberResult; + } + + const results = getBetProviderConfigResult.value.games.map(async game => { + await getRedisSubscriberResult.value.subscribe(getRedisProcessedEventsChannelName(this.betProvider, game.name, game.betType), async message => { + logger.trace("Listening for redis messages on channel: ", getRedisProcessedEventsChannelName(this.betProvider, game.name, game.betType)); + const parsedMessage = JSON.parse(message) as ProcessedGameEvents; + + const innerResults = parsedMessage.data.map(async item => { + switch (item.type) { + case BetTypes.TWO_WAY: + const twoWayGame = await getTwoWayGame(getPostgresDbResult.value, item.betProviderId, parsedMessage.betProviderName); + if (twoWayGame === null) { + await insertTwoWayGameEvent(getPostgresDbResult.value, { + betProviderName: parsedMessage.betProviderName, + betProviderId: item.betProviderId, + clubA: item.clubA, + clubB: item.clubB, + oddsAWin: item.oddsAWin, + oddsBWin: item.oddsBWin, + gameName: parsedMessage.gameName, + league: item.league, + metaData: item.meta + }); + } else { + logger.trace("Two way game already exists: ", twoWayGame); + } + break; + case BetTypes.THREE_WAY: + const threeWayGame = await getThreeWayGame(getPostgresDbResult.value, item.betProviderId, parsedMessage.betProviderName); + if (threeWayGame === null) { + await insertThreeWayGameEvent(getPostgresDbResult.value, { + betProviderName: parsedMessage.betProviderName, + betProviderId: item.betProviderId, + clubA: item.clubA, + clubB: item.clubB, + oddsAWin: item.oddsAWin, + oddsBWin: item.oddsBWin, + oddsDraw: item.oddsDraw, + gameName: parsedMessage.gameName, + league: item.league, + metaData: item.meta + }); + } else { + logger.trace("Three way game already exists: ", threeWayGame); + } + break; + default: + const message = `Unknown bet type encountered when saving processed game events to database.`; + logger.error(message, item); + throw new Error(message); + } + }); + await Promise.all(innerResults); + }); + }); + await Promise.all(results); + return {result: "success", value: true}; + } } diff --git a/src/core/game_events/orbit/index.ts b/src/core/game_events/orbit/index.ts new file mode 100644 index 0000000..398f3ae --- /dev/null +++ b/src/core/game_events/orbit/index.ts @@ -0,0 +1,12 @@ +import { BaseGameEventsProcessor } from ".."; +import { BetProvider } from "../../../bet_providers"; +import { OrbitProvider } from "../../../bet_providers/orbit"; + +export class OrbitGameEventsProcessor extends BaseGameEventsProcessor { + public override betProvider: BetProvider; + + constructor() { + super(); + this.betProvider = new OrbitProvider(); + } +} diff --git a/src/core/parsers/betika/index.ts b/src/core/parsers/betika/index.ts index c7cf24e..bc8a053 100644 --- a/src/core/parsers/betika/index.ts +++ b/src/core/parsers/betika/index.ts @@ -125,10 +125,10 @@ export class BetikaParser extends BaseParser { getRedisPublisherResult.value, getRedisProcessedEventsChannelName(this.betProvider, parsedMessage.gameName, parsedMessage.betType), { - betProviderName: parsedMessage.betProviderName, - betType: parsedMessage.betType, - gameName: parsedMessage.gameName, - data: parsedResults + betProviderName: parsedMessage.betProviderName, + betType: parsedMessage.betType, + gameName: parsedMessage.gameName, + data: parsedResults }); } else { const message = "Failed to get redis publisher to send processed events: "; diff --git a/src/core/parsers/orbit/index.ts b/src/core/parsers/orbit/index.ts index d0a18df..e0932a8 100644 --- a/src/core/parsers/orbit/index.ts +++ b/src/core/parsers/orbit/index.ts @@ -3,10 +3,10 @@ import { getConfig } from "../../.."; import { BetProvider } from "../../../bet_providers"; import { OrbitProvider } from "../../../bet_providers/orbit"; import { RedisSingleton } from "../../../datastores/redis"; -import { getRedisHtmlParserChannelName } from "../../../utils/redis"; -import { BetTypes, RawHtmlForProcessingMessage } from "../../../utils/types/common"; +import { getRedisHtmlParserChannelName, getRedisProcessedEventsChannelName } from "../../../utils/redis"; +import { BetTypes, ProcessedThreeWayGameEvent, ProcessedTwoWayGameEvent, RawHtmlForProcessingMessage } from "../../../utils/types/common"; import { Result } from "../../../utils/types/result_type"; -import { processOrbitThreeWayGamesHtml } from "./parser_types"; +import { processOrbitGamesHtml } from "./parser_types"; const {logger} = getConfig(); @@ -50,15 +50,55 @@ export class OrbitParser extends BaseParser { } } - private processRawHtmlMessage(parsedMessage: RawHtmlForProcessingMessage): void { + private async processRawHtmlMessage(parsedMessage: RawHtmlForProcessingMessage): Promise { let results2; + let parsedResults: ProcessedTwoWayGameEvent[] | ProcessedThreeWayGameEvent[]; switch (parsedMessage.betType) { - case BetTypes.THREE_WAY: - results2 = processOrbitThreeWayGamesHtml(parsedMessage.rawHtml); - break; case BetTypes.TWO_WAY: + results2 = processOrbitGamesHtml(parsedMessage.rawHtml); + if (results2.result === "success") { + parsedResults = results2.value.map(item => { + return { + type: BetTypes.TWO_WAY, + betProviderId: `${item.clubA}_${item.clubB}_${item.eventDate}`, // TODO: create id creator on specific betProvider class + clubA: item.clubA, + clubB: item.clubB, + oddsAWin: (item.oddsArray[0] + item.oddsArray[1]) / 2, + oddsBWin: (item.oddsArray[2] + item.oddsArray[3]) / 2, + league: "N/A", + estimatedStartTimeUtc: item.estimatedStartTimeUtc, + meta: JSON.stringify({ + oddsArray: item.oddsArray + }) + }; + }); + } else { + throw new Error("Failed to process Orbit two way games html"); + } + break; + case BetTypes.THREE_WAY: // Thinking that the parser should also work for two way games - results2 = processOrbitThreeWayGamesHtml(parsedMessage.rawHtml); + results2 = processOrbitGamesHtml(parsedMessage.rawHtml); + if (results2.result === "success") { + parsedResults = results2.value.map(item => { + return { + type: BetTypes.THREE_WAY, + betProviderId: `${item.clubA}_${item.clubB}_${item.eventDate}`, + clubA: item.clubA, + clubB: item.clubB, + oddsAWin: (item.oddsArray[0] + item.oddsArray[1]) / 2, + oddsBWin:(item.oddsArray[4] + item.oddsArray[5]) / 2, + oddsDraw: (item.oddsArray[2] + item.oddsArray[3]) / 2, + league: "N/A", + estimatedStartTimeUtc: item.estimatedStartTimeUtc, + meta: JSON.stringify({ + oddsArray: item.oddsArray + }) + } + }); + } else { + throw new Error("Failed to process Orbit three way games html"); + } break; default: const message = "Unknown bet type provided"; @@ -71,15 +111,29 @@ export class OrbitParser extends BaseParser { throw new Error(`Unknown bet type provided for provider: ${this.betProvider.name}`); } - if (results2.result === "success") { - logger.info("Successfully fetched games", results2.value); + logger.info("Successfully fetched games", results2.value); + const getRedisPublisherResult = await RedisSingleton.getPublisher(); + + if (getRedisPublisherResult.result === "success") { + this.publishProcessedGameEvents( + getRedisPublisherResult.value, + getRedisProcessedEventsChannelName(this.betProvider, parsedMessage.gameName, parsedMessage.betType), + { + betProviderName: parsedMessage.betProviderName, + betType: parsedMessage.betType, + gameName: parsedMessage.gameName, + data: parsedResults + } + ); + logger.trace("Published messages to redis on channel: ", getRedisProcessedEventsChannelName(this.betProvider, parsedMessage.gameName, parsedMessage.betType)); } else { - logger.error("Failed to parse html into games: ", { + const message = "Failed to get redis publisher to send processed events: "; + logger.error(message, { betProviderName: parsedMessage.betProviderName, betType: parsedMessage.betType, fromUrl: parsedMessage.fromUrl, gameName: parsedMessage.gameName, - errorMessage: results2.value.message + errorMessage: getRedisPublisherResult.value.message }); } } diff --git a/src/core/parsers/orbit/parser_types.ts b/src/core/parsers/orbit/parser_types.ts index 7c80e96..8651c6a 100644 --- a/src/core/parsers/orbit/parser_types.ts +++ b/src/core/parsers/orbit/parser_types.ts @@ -8,7 +8,7 @@ import { TimeZones } from "../../../utils/types/common"; const {logger} = getConfig(); -export function processOrbitThreeWayGamesHtml(html: string): Result { +export function processOrbitGamesHtml(html: string): Result { const gameEvents: any[] = []; const $ = cheerio.load(html); @@ -83,7 +83,7 @@ export function processOrbitThreeWayGamesHtml(html: string): Result { - return {...item, ...{eventDate: momentTz(`${item.startDate} ${item.parsedTime[0]}`, "ddd DD MMM HH:mm").toDate()}}; + return {...item, ...{estimatedStartTimeUtc: momentTz(`${item.startDate} ${item.parsedTime[0]}`, "ddd DD MMM HH:mm").toDate()}}; }); logger.trace(finalMapping); diff --git a/src/core/scrapping/orbit/index.ts b/src/core/scrapping/orbit/index.ts index 38d3ce3..347b419 100644 --- a/src/core/scrapping/orbit/index.ts +++ b/src/core/scrapping/orbit/index.ts @@ -60,7 +60,6 @@ export class OrbitScrapper extends BaseScrapper { if (getHtmlResult.result === "success") { logger.info("Successfully fetched html for url. ", metadata); - logger.info(getHtmlResult.value.html); this.publishRawHtmlToRedis( getRedisPublisherResult.value, getRedisHtmlParserChannelName(this.betProvider, game), diff --git a/src/testbed/testbed_3.ts b/src/testbed/testbed_3.ts index e5d33f3..415c4ae 100644 --- a/src/testbed/testbed_3.ts +++ b/src/testbed/testbed_3.ts @@ -1,4 +1,4 @@ -import { BetikaGameEventsProcessor } from "../core/game_events/betika"; +import { OrbitGameEventsProcessor } from "../core/game_events/orbit"; -const gameEventsProcessor = new BetikaGameEventsProcessor(); -gameEventsProcessor.subscribeToChannels(); \ No newline at end of file +const gameEventsProcessor = new OrbitGameEventsProcessor(); +gameEventsProcessor.initGameEventsListener();