Skip to content

Commit

Permalink
Feat: Store Orbit Game Events To DB (#8)
Browse files Browse the repository at this point in the history
* Refactor main game events logic into base class (untested)

* minor newline fix

* Get orbit data to store (but still bug with odds calculation)

* correct orbit odds avergaing
  • Loading branch information
nigelnindodev authored Oct 19, 2023
1 parent 880fa0c commit 65efdac
Show file tree
Hide file tree
Showing 8 changed files with 182 additions and 109 deletions.
87 changes: 0 additions & 87 deletions src/core/game_events/betika/index.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,6 @@
import { BaseGameEventsProcessor } from "..";
import { getConfig } from "../../..";
import { BetProvider } from "../../../bet_providers";
import { BetikaProvider } from "../../../bet_providers/betika";
import { PostgresDataSourceSingleton } from "../../../datastores/postgres";
import { insertThreeWayGameEvent } from "../../../datastores/postgres/queries/three_way_game_event";
import { insertTwoWayGameEvent } from "../../../datastores/postgres/queries/two_way_game_event";
import { RedisSingleton } from "../../../datastores/redis";
import { getRedisProcessedEventsChannelName } from "../../../utils/redis";
import { BetTypes, ProcessedGameEvents } from "../../../utils/types/common";
import { DbThreeWayGameEvent, DbTwoWayGameEvent } from "../../../utils/types/db";
import { Result } from "../../../utils/types/result_type";

const {logger} = getConfig();

export class BetikaGameEventsProcessor extends BaseGameEventsProcessor {
public override betProvider: BetProvider;
Expand All @@ -20,80 +9,4 @@ export class BetikaGameEventsProcessor extends BaseGameEventsProcessor {
super();
this.betProvider = new BetikaProvider();
}

public async subscribeToChannels(): Promise<Result<boolean, Error>> {
const getBetProviderConfigResult = await this.betProvider.getConfig();
if (getBetProviderConfigResult.result === "error") {
logger.error("Events processor failed to load config for provider: ", this.betProvider.name);
return getBetProviderConfigResult;
}

const getPostgresDbResult = await PostgresDataSourceSingleton.getInstance(getConfig());
if (getPostgresDbResult.result === "error") {
logger.error(`Events processor failed to get postgres connection for provider ${this.betProvider.name} with error: `,getPostgresDbResult.value.message);
return getPostgresDbResult;
}

const getRedisSubscriberResult = await RedisSingleton.getSubscriber();
if (getRedisSubscriberResult.result === "success") {
const betProviderConfig = getBetProviderConfigResult.value;
const results = betProviderConfig.games.map(async game => {
await getRedisSubscriberResult.value.subscribe(getRedisProcessedEventsChannelName(this.betProvider, game.name, game.betType), async message => {
const parsedMessage = JSON.parse(message) as ProcessedGameEvents;


/**
* TODO: Payload is really similar to DbTwoWayGameEvent / DbThreeWayGameEvent. Is there any way we can combine them?
* We are already using discriminated unions to correctly type case different game types.
*/
const innerResults = parsedMessage.data.map(async item => {
/**
* TODO: Check if games exists before attempting insert. (We shouldn't get double inserts as now as well due to unique constraints on the database)
*/
switch (item.type) {
case BetTypes.TWO_WAY:
const twoWayEventToDB: DbTwoWayGameEvent = {
betProviderName: parsedMessage.betProviderName,
betProviderId: item.betProviderId,
clubA: item.clubA,
clubB: item.clubB,
oddsAWin: item.oddsAWin,
oddsBWin: item.oddsBWin,
gameName: parsedMessage.gameName,
league: item.league,
metaData: item.meta
}
await insertTwoWayGameEvent(getPostgresDbResult.value, twoWayEventToDB);
break;
case BetTypes.THREE_WAY:
const threeWayEventToDb: DbThreeWayGameEvent = {
betProviderName: parsedMessage.betProviderName,
betProviderId: item.betProviderId,
clubA: item.clubA,
clubB: item.clubB,
oddsAWin: item.oddsAWin,
oddsBWin: item.oddsBWin,
oddsDraw: item.oddsDraw,
gameName: parsedMessage.gameName,
league: item.league,
metaData: item.meta
}
await insertThreeWayGameEvent(getPostgresDbResult.value, threeWayEventToDb);
break;
default:
const message = `Unknown bet type encountered when saving processed game events to database.`;
logger.error(message, item);
throw new Error(message);
}
});
await Promise.all(innerResults);
});
});
await Promise.all(results);
return {result: "success", value: true};
} else {
logger.error("Events processor failed to connect to redis subscriber for provider: ", this.betProvider.name);
return getRedisSubscriberResult;
}
}
}
95 changes: 95 additions & 0 deletions src/core/game_events/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,100 @@
import { getConfig } from "../..";
import { BetProvider } from "../../bet_providers";
import { PostgresDataSourceSingleton } from "../../datastores/postgres";
import { getThreeWayGame, insertThreeWayGameEvent } from "../../datastores/postgres/queries/three_way_game_event";
import { getTwoWayGame, insertTwoWayGameEvent } from "../../datastores/postgres/queries/two_way_game_event";
import { RedisSingleton } from "../../datastores/redis";
import { getRedisProcessedEventsChannelName } from "../../utils/redis";
import { BetTypes, ProcessedGameEvents } from "../../utils/types/common";
import { Result } from "../../utils/types/result_type";

const {logger} = getConfig();

export abstract class BaseGameEventsProcessor {
public abstract betProvider: BetProvider
/**
* Listener for new game events for all providers. Can use the same implementation, since the data is already
* normalized in the `parsers` stage.
* Inserts the game events into the respective game event table.
* TODO:
* - Do not double insert if event already exists (should already be blocked by database constraints)
* - Update on updated odds
* - Store outdated odds in a historical database
* @returns
*/
public async initGameEventsListener(): Promise<Result<boolean, Error>> {
const getBetProviderConfigResult = await this.betProvider.getConfig();
if (getBetProviderConfigResult.result === "error") {
logger.error("Event processor failed to load config for provider: ", this.betProvider.name);
return getBetProviderConfigResult;
}

const getPostgresDbResult = await PostgresDataSourceSingleton.getInstance(getConfig());
if (getPostgresDbResult.result === "error") {
logger.error(`Events processor failed to get postgres connection for provider ${this.betProvider.name} error: `, getPostgresDbResult.value.message);
return getPostgresDbResult;
}

const getRedisSubscriberResult = await RedisSingleton.getSubscriber();
if (getRedisSubscriberResult.result === "error") {
logger.error("Events processor failed to connect to redis subscriber for provider: ", this.betProvider.name);
return getRedisSubscriberResult;
}

const results = getBetProviderConfigResult.value.games.map(async game => {
await getRedisSubscriberResult.value.subscribe(getRedisProcessedEventsChannelName(this.betProvider, game.name, game.betType), async message => {
logger.trace("Listening for redis messages on channel: ", getRedisProcessedEventsChannelName(this.betProvider, game.name, game.betType));
const parsedMessage = JSON.parse(message) as ProcessedGameEvents;

const innerResults = parsedMessage.data.map(async item => {
switch (item.type) {
case BetTypes.TWO_WAY:
const twoWayGame = await getTwoWayGame(getPostgresDbResult.value, item.betProviderId, parsedMessage.betProviderName);
if (twoWayGame === null) {
await insertTwoWayGameEvent(getPostgresDbResult.value, {
betProviderName: parsedMessage.betProviderName,
betProviderId: item.betProviderId,
clubA: item.clubA,
clubB: item.clubB,
oddsAWin: item.oddsAWin,
oddsBWin: item.oddsBWin,
gameName: parsedMessage.gameName,
league: item.league,
metaData: item.meta
});
} else {
logger.trace("Two way game already exists: ", twoWayGame);
}
break;
case BetTypes.THREE_WAY:
const threeWayGame = await getThreeWayGame(getPostgresDbResult.value, item.betProviderId, parsedMessage.betProviderName);
if (threeWayGame === null) {
await insertThreeWayGameEvent(getPostgresDbResult.value, {
betProviderName: parsedMessage.betProviderName,
betProviderId: item.betProviderId,
clubA: item.clubA,
clubB: item.clubB,
oddsAWin: item.oddsAWin,
oddsBWin: item.oddsBWin,
oddsDraw: item.oddsDraw,
gameName: parsedMessage.gameName,
league: item.league,
metaData: item.meta
});
} else {
logger.trace("Three way game already exists: ", threeWayGame);
}
break;
default:
const message = `Unknown bet type encountered when saving processed game events to database.`;
logger.error(message, item);
throw new Error(message);
}
});
await Promise.all(innerResults);
});
});
await Promise.all(results);
return {result: "success", value: true};
}
}
12 changes: 12 additions & 0 deletions src/core/game_events/orbit/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import { BaseGameEventsProcessor } from "..";
import { BetProvider } from "../../../bet_providers";
import { OrbitProvider } from "../../../bet_providers/orbit";

export class OrbitGameEventsProcessor extends BaseGameEventsProcessor {
public override betProvider: BetProvider;

constructor() {
super();
this.betProvider = new OrbitProvider();
}
}
8 changes: 4 additions & 4 deletions src/core/parsers/betika/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,10 @@ export class BetikaParser extends BaseParser {
getRedisPublisherResult.value,
getRedisProcessedEventsChannelName(this.betProvider, parsedMessage.gameName, parsedMessage.betType),
{
betProviderName: parsedMessage.betProviderName,
betType: parsedMessage.betType,
gameName: parsedMessage.gameName,
data: parsedResults
betProviderName: parsedMessage.betProviderName,
betType: parsedMessage.betType,
gameName: parsedMessage.gameName,
data: parsedResults
});
} else {
const message = "Failed to get redis publisher to send processed events: ";
Expand Down
78 changes: 66 additions & 12 deletions src/core/parsers/orbit/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ import { getConfig } from "../../..";
import { BetProvider } from "../../../bet_providers";
import { OrbitProvider } from "../../../bet_providers/orbit";
import { RedisSingleton } from "../../../datastores/redis";
import { getRedisHtmlParserChannelName } from "../../../utils/redis";
import { BetTypes, RawHtmlForProcessingMessage } from "../../../utils/types/common";
import { getRedisHtmlParserChannelName, getRedisProcessedEventsChannelName } from "../../../utils/redis";
import { BetTypes, ProcessedThreeWayGameEvent, ProcessedTwoWayGameEvent, RawHtmlForProcessingMessage } from "../../../utils/types/common";
import { Result } from "../../../utils/types/result_type";
import { processOrbitThreeWayGamesHtml } from "./parser_types";
import { processOrbitGamesHtml } from "./parser_types";

const {logger} = getConfig();

Expand Down Expand Up @@ -50,15 +50,55 @@ export class OrbitParser extends BaseParser {
}
}

private processRawHtmlMessage(parsedMessage: RawHtmlForProcessingMessage): void {
private async processRawHtmlMessage(parsedMessage: RawHtmlForProcessingMessage): Promise<void> {
let results2;
let parsedResults: ProcessedTwoWayGameEvent[] | ProcessedThreeWayGameEvent[];
switch (parsedMessage.betType) {
case BetTypes.THREE_WAY:
results2 = processOrbitThreeWayGamesHtml(parsedMessage.rawHtml);
break;
case BetTypes.TWO_WAY:
results2 = processOrbitGamesHtml(parsedMessage.rawHtml);
if (results2.result === "success") {
parsedResults = results2.value.map(item => {
return {
type: BetTypes.TWO_WAY,
betProviderId: `${item.clubA}_${item.clubB}_${item.eventDate}`, // TODO: create id creator on specific betProvider class
clubA: item.clubA,
clubB: item.clubB,
oddsAWin: (item.oddsArray[0] + item.oddsArray[1]) / 2,
oddsBWin: (item.oddsArray[2] + item.oddsArray[3]) / 2,
league: "N/A",
estimatedStartTimeUtc: item.estimatedStartTimeUtc,
meta: JSON.stringify({
oddsArray: item.oddsArray
})
};
});
} else {
throw new Error("Failed to process Orbit two way games html");
}
break;
case BetTypes.THREE_WAY:
// Thinking that the parser should also work for two way games
results2 = processOrbitThreeWayGamesHtml(parsedMessage.rawHtml);
results2 = processOrbitGamesHtml(parsedMessage.rawHtml);
if (results2.result === "success") {
parsedResults = results2.value.map(item => {
return {
type: BetTypes.THREE_WAY,
betProviderId: `${item.clubA}_${item.clubB}_${item.eventDate}`,
clubA: item.clubA,
clubB: item.clubB,
oddsAWin: (item.oddsArray[0] + item.oddsArray[1]) / 2,
oddsBWin:(item.oddsArray[4] + item.oddsArray[5]) / 2,
oddsDraw: (item.oddsArray[2] + item.oddsArray[3]) / 2,
league: "N/A",
estimatedStartTimeUtc: item.estimatedStartTimeUtc,
meta: JSON.stringify({
oddsArray: item.oddsArray
})
}
});
} else {
throw new Error("Failed to process Orbit three way games html");
}
break;
default:
const message = "Unknown bet type provided";
Expand All @@ -71,15 +111,29 @@ export class OrbitParser extends BaseParser {
throw new Error(`Unknown bet type provided for provider: ${this.betProvider.name}`);
}

if (results2.result === "success") {
logger.info("Successfully fetched games", results2.value);
logger.info("Successfully fetched games", results2.value);
const getRedisPublisherResult = await RedisSingleton.getPublisher();

if (getRedisPublisherResult.result === "success") {
this.publishProcessedGameEvents(
getRedisPublisherResult.value,
getRedisProcessedEventsChannelName(this.betProvider, parsedMessage.gameName, parsedMessage.betType),
{
betProviderName: parsedMessage.betProviderName,
betType: parsedMessage.betType,
gameName: parsedMessage.gameName,
data: parsedResults
}
);
logger.trace("Published messages to redis on channel: ", getRedisProcessedEventsChannelName(this.betProvider, parsedMessage.gameName, parsedMessage.betType));
} else {
logger.error("Failed to parse html into games: ", {
const message = "Failed to get redis publisher to send processed events: ";
logger.error(message, {
betProviderName: parsedMessage.betProviderName,
betType: parsedMessage.betType,
fromUrl: parsedMessage.fromUrl,
gameName: parsedMessage.gameName,
errorMessage: results2.value.message
errorMessage: getRedisPublisherResult.value.message
});
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/core/parsers/orbit/parser_types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { TimeZones } from "../../../utils/types/common";

const {logger} = getConfig();

export function processOrbitThreeWayGamesHtml(html: string): Result<any[], Error> {
export function processOrbitGamesHtml(html: string): Result<any[], Error> {
const gameEvents: any[] = [];

const $ = cheerio.load(html);
Expand Down Expand Up @@ -83,7 +83,7 @@ export function processOrbitThreeWayGamesHtml(html: string): Result<any[], Error
});

finalMapping = finalMapping.map(item => {
return {...item, ...{eventDate: momentTz(`${item.startDate} ${item.parsedTime[0]}`, "ddd DD MMM HH:mm").toDate()}};
return {...item, ...{estimatedStartTimeUtc: momentTz(`${item.startDate} ${item.parsedTime[0]}`, "ddd DD MMM HH:mm").toDate()}};
});

logger.trace(finalMapping);
Expand Down
1 change: 0 additions & 1 deletion src/core/scrapping/orbit/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ export class OrbitScrapper extends BaseScrapper {

if (getHtmlResult.result === "success") {
logger.info("Successfully fetched html for url. ", metadata);
logger.info(getHtmlResult.value.html);
this.publishRawHtmlToRedis(
getRedisPublisherResult.value,
getRedisHtmlParserChannelName(this.betProvider, game),
Expand Down
6 changes: 3 additions & 3 deletions src/testbed/testbed_3.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { BetikaGameEventsProcessor } from "../core/game_events/betika";
import { OrbitGameEventsProcessor } from "../core/game_events/orbit";

const gameEventsProcessor = new BetikaGameEventsProcessor();
gameEventsProcessor.subscribeToChannels();
const gameEventsProcessor = new OrbitGameEventsProcessor();
gameEventsProcessor.initGameEventsListener();

0 comments on commit 65efdac

Please sign in to comment.