Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Store Orbit Game Events To DB #8

Merged
merged 4 commits into from
Oct 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 0 additions & 87 deletions src/core/game_events/betika/index.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,6 @@
import { BaseGameEventsProcessor } from "..";
import { getConfig } from "../../..";
import { BetProvider } from "../../../bet_providers";
import { BetikaProvider } from "../../../bet_providers/betika";
import { PostgresDataSourceSingleton } from "../../../datastores/postgres";
import { insertThreeWayGameEvent } from "../../../datastores/postgres/queries/three_way_game_event";
import { insertTwoWayGameEvent } from "../../../datastores/postgres/queries/two_way_game_event";
import { RedisSingleton } from "../../../datastores/redis";
import { getRedisProcessedEventsChannelName } from "../../../utils/redis";
import { BetTypes, ProcessedGameEvents } from "../../../utils/types/common";
import { DbThreeWayGameEvent, DbTwoWayGameEvent } from "../../../utils/types/db";
import { Result } from "../../../utils/types/result_type";

const {logger} = getConfig();

export class BetikaGameEventsProcessor extends BaseGameEventsProcessor {
public override betProvider: BetProvider;
Expand All @@ -20,80 +9,4 @@ export class BetikaGameEventsProcessor extends BaseGameEventsProcessor {
super();
this.betProvider = new BetikaProvider();
}

public async subscribeToChannels(): Promise<Result<boolean, Error>> {
const getBetProviderConfigResult = await this.betProvider.getConfig();
if (getBetProviderConfigResult.result === "error") {
logger.error("Events processor failed to load config for provider: ", this.betProvider.name);
return getBetProviderConfigResult;
}

const getPostgresDbResult = await PostgresDataSourceSingleton.getInstance(getConfig());
if (getPostgresDbResult.result === "error") {
logger.error(`Events processor failed to get postgres connection for provider ${this.betProvider.name} with error: `,getPostgresDbResult.value.message);
return getPostgresDbResult;
}

const getRedisSubscriberResult = await RedisSingleton.getSubscriber();
if (getRedisSubscriberResult.result === "success") {
const betProviderConfig = getBetProviderConfigResult.value;
const results = betProviderConfig.games.map(async game => {
await getRedisSubscriberResult.value.subscribe(getRedisProcessedEventsChannelName(this.betProvider, game.name, game.betType), async message => {
const parsedMessage = JSON.parse(message) as ProcessedGameEvents;


/**
* TODO: Payload is really similar to DbTwoWayGameEvent / DbThreeWayGameEvent. Is there any way we can combine them?
* We are already using discriminated unions to correctly type case different game types.
*/
const innerResults = parsedMessage.data.map(async item => {
/**
* TODO: Check if games exists before attempting insert. (We shouldn't get double inserts as now as well due to unique constraints on the database)
*/
switch (item.type) {
case BetTypes.TWO_WAY:
const twoWayEventToDB: DbTwoWayGameEvent = {
betProviderName: parsedMessage.betProviderName,
betProviderId: item.betProviderId,
clubA: item.clubA,
clubB: item.clubB,
oddsAWin: item.oddsAWin,
oddsBWin: item.oddsBWin,
gameName: parsedMessage.gameName,
league: item.league,
metaData: item.meta
}
await insertTwoWayGameEvent(getPostgresDbResult.value, twoWayEventToDB);
break;
case BetTypes.THREE_WAY:
const threeWayEventToDb: DbThreeWayGameEvent = {
betProviderName: parsedMessage.betProviderName,
betProviderId: item.betProviderId,
clubA: item.clubA,
clubB: item.clubB,
oddsAWin: item.oddsAWin,
oddsBWin: item.oddsBWin,
oddsDraw: item.oddsDraw,
gameName: parsedMessage.gameName,
league: item.league,
metaData: item.meta
}
await insertThreeWayGameEvent(getPostgresDbResult.value, threeWayEventToDb);
break;
default:
const message = `Unknown bet type encountered when saving processed game events to database.`;
logger.error(message, item);
throw new Error(message);
}
});
await Promise.all(innerResults);
});
});
await Promise.all(results);
return {result: "success", value: true};
} else {
logger.error("Events processor failed to connect to redis subscriber for provider: ", this.betProvider.name);
return getRedisSubscriberResult;
}
}
}
95 changes: 95 additions & 0 deletions src/core/game_events/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,100 @@
import { getConfig } from "../..";
import { BetProvider } from "../../bet_providers";
import { PostgresDataSourceSingleton } from "../../datastores/postgres";
import { getThreeWayGame, insertThreeWayGameEvent } from "../../datastores/postgres/queries/three_way_game_event";
import { getTwoWayGame, insertTwoWayGameEvent } from "../../datastores/postgres/queries/two_way_game_event";
import { RedisSingleton } from "../../datastores/redis";
import { getRedisProcessedEventsChannelName } from "../../utils/redis";
import { BetTypes, ProcessedGameEvents } from "../../utils/types/common";
import { Result } from "../../utils/types/result_type";

const {logger} = getConfig();

export abstract class BaseGameEventsProcessor {
public abstract betProvider: BetProvider
/**
* Listener for new game events for all providers. Can use the same implementation, since the data is already
* normalized in the `parsers` stage.
* Inserts the game events into the respective game event table.
* TODO:
* - Do not double insert if event already exists (should already be blocked by database constraints)
* - Update on updated odds
* - Store outdated odds in a historical database
* @returns
*/
public async initGameEventsListener(): Promise<Result<boolean, Error>> {
const getBetProviderConfigResult = await this.betProvider.getConfig();
if (getBetProviderConfigResult.result === "error") {
logger.error("Event processor failed to load config for provider: ", this.betProvider.name);
return getBetProviderConfigResult;
}

const getPostgresDbResult = await PostgresDataSourceSingleton.getInstance(getConfig());
if (getPostgresDbResult.result === "error") {
logger.error(`Events processor failed to get postgres connection for provider ${this.betProvider.name} error: `, getPostgresDbResult.value.message);
return getPostgresDbResult;
}

const getRedisSubscriberResult = await RedisSingleton.getSubscriber();
if (getRedisSubscriberResult.result === "error") {
logger.error("Events processor failed to connect to redis subscriber for provider: ", this.betProvider.name);
return getRedisSubscriberResult;
}

const results = getBetProviderConfigResult.value.games.map(async game => {
await getRedisSubscriberResult.value.subscribe(getRedisProcessedEventsChannelName(this.betProvider, game.name, game.betType), async message => {
logger.trace("Listening for redis messages on channel: ", getRedisProcessedEventsChannelName(this.betProvider, game.name, game.betType));
const parsedMessage = JSON.parse(message) as ProcessedGameEvents;

const innerResults = parsedMessage.data.map(async item => {
switch (item.type) {
case BetTypes.TWO_WAY:
const twoWayGame = await getTwoWayGame(getPostgresDbResult.value, item.betProviderId, parsedMessage.betProviderName);
if (twoWayGame === null) {
await insertTwoWayGameEvent(getPostgresDbResult.value, {
betProviderName: parsedMessage.betProviderName,
betProviderId: item.betProviderId,
clubA: item.clubA,
clubB: item.clubB,
oddsAWin: item.oddsAWin,
oddsBWin: item.oddsBWin,
gameName: parsedMessage.gameName,
league: item.league,
metaData: item.meta
});
} else {
logger.trace("Two way game already exists: ", twoWayGame);
}
break;
case BetTypes.THREE_WAY:
const threeWayGame = await getThreeWayGame(getPostgresDbResult.value, item.betProviderId, parsedMessage.betProviderName);
if (threeWayGame === null) {
await insertThreeWayGameEvent(getPostgresDbResult.value, {
betProviderName: parsedMessage.betProviderName,
betProviderId: item.betProviderId,
clubA: item.clubA,
clubB: item.clubB,
oddsAWin: item.oddsAWin,
oddsBWin: item.oddsBWin,
oddsDraw: item.oddsDraw,
gameName: parsedMessage.gameName,
league: item.league,
metaData: item.meta
});
} else {
logger.trace("Three way game already exists: ", threeWayGame);
}
break;
default:
const message = `Unknown bet type encountered when saving processed game events to database.`;
logger.error(message, item);
throw new Error(message);
}
});
await Promise.all(innerResults);
});
});
await Promise.all(results);
return {result: "success", value: true};
}
}
12 changes: 12 additions & 0 deletions src/core/game_events/orbit/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import { BaseGameEventsProcessor } from "..";
import { BetProvider } from "../../../bet_providers";
import { OrbitProvider } from "../../../bet_providers/orbit";

export class OrbitGameEventsProcessor extends BaseGameEventsProcessor {
public override betProvider: BetProvider;

constructor() {
super();
this.betProvider = new OrbitProvider();
}
}
8 changes: 4 additions & 4 deletions src/core/parsers/betika/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,10 @@ export class BetikaParser extends BaseParser {
getRedisPublisherResult.value,
getRedisProcessedEventsChannelName(this.betProvider, parsedMessage.gameName, parsedMessage.betType),
{
betProviderName: parsedMessage.betProviderName,
betType: parsedMessage.betType,
gameName: parsedMessage.gameName,
data: parsedResults
betProviderName: parsedMessage.betProviderName,
betType: parsedMessage.betType,
gameName: parsedMessage.gameName,
data: parsedResults
});
} else {
const message = "Failed to get redis publisher to send processed events: ";
Expand Down
78 changes: 66 additions & 12 deletions src/core/parsers/orbit/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ import { getConfig } from "../../..";
import { BetProvider } from "../../../bet_providers";
import { OrbitProvider } from "../../../bet_providers/orbit";
import { RedisSingleton } from "../../../datastores/redis";
import { getRedisHtmlParserChannelName } from "../../../utils/redis";
import { BetTypes, RawHtmlForProcessingMessage } from "../../../utils/types/common";
import { getRedisHtmlParserChannelName, getRedisProcessedEventsChannelName } from "../../../utils/redis";
import { BetTypes, ProcessedThreeWayGameEvent, ProcessedTwoWayGameEvent, RawHtmlForProcessingMessage } from "../../../utils/types/common";
import { Result } from "../../../utils/types/result_type";
import { processOrbitThreeWayGamesHtml } from "./parser_types";
import { processOrbitGamesHtml } from "./parser_types";

const {logger} = getConfig();

Expand Down Expand Up @@ -50,15 +50,55 @@ export class OrbitParser extends BaseParser {
}
}

private processRawHtmlMessage(parsedMessage: RawHtmlForProcessingMessage): void {
private async processRawHtmlMessage(parsedMessage: RawHtmlForProcessingMessage): Promise<void> {
let results2;
let parsedResults: ProcessedTwoWayGameEvent[] | ProcessedThreeWayGameEvent[];
switch (parsedMessage.betType) {
case BetTypes.THREE_WAY:
results2 = processOrbitThreeWayGamesHtml(parsedMessage.rawHtml);
break;
case BetTypes.TWO_WAY:
results2 = processOrbitGamesHtml(parsedMessage.rawHtml);
if (results2.result === "success") {
parsedResults = results2.value.map(item => {
return {
type: BetTypes.TWO_WAY,
betProviderId: `${item.clubA}_${item.clubB}_${item.eventDate}`, // TODO: create id creator on specific betProvider class
clubA: item.clubA,
clubB: item.clubB,
oddsAWin: (item.oddsArray[0] + item.oddsArray[1]) / 2,
oddsBWin: (item.oddsArray[2] + item.oddsArray[3]) / 2,
league: "N/A",
estimatedStartTimeUtc: item.estimatedStartTimeUtc,
meta: JSON.stringify({
oddsArray: item.oddsArray
})
};
});
} else {
throw new Error("Failed to process Orbit two way games html");
}
break;
case BetTypes.THREE_WAY:
// Thinking that the parser should also work for two way games
results2 = processOrbitThreeWayGamesHtml(parsedMessage.rawHtml);
results2 = processOrbitGamesHtml(parsedMessage.rawHtml);
if (results2.result === "success") {
parsedResults = results2.value.map(item => {
return {
type: BetTypes.THREE_WAY,
betProviderId: `${item.clubA}_${item.clubB}_${item.eventDate}`,
clubA: item.clubA,
clubB: item.clubB,
oddsAWin: (item.oddsArray[0] + item.oddsArray[1]) / 2,
oddsBWin:(item.oddsArray[4] + item.oddsArray[5]) / 2,
oddsDraw: (item.oddsArray[2] + item.oddsArray[3]) / 2,
league: "N/A",
estimatedStartTimeUtc: item.estimatedStartTimeUtc,
meta: JSON.stringify({
oddsArray: item.oddsArray
})
}
});
} else {
throw new Error("Failed to process Orbit three way games html");
}
break;
default:
const message = "Unknown bet type provided";
Expand All @@ -71,15 +111,29 @@ export class OrbitParser extends BaseParser {
throw new Error(`Unknown bet type provided for provider: ${this.betProvider.name}`);
}

if (results2.result === "success") {
logger.info("Successfully fetched games", results2.value);
logger.info("Successfully fetched games", results2.value);
const getRedisPublisherResult = await RedisSingleton.getPublisher();

if (getRedisPublisherResult.result === "success") {
this.publishProcessedGameEvents(
getRedisPublisherResult.value,
getRedisProcessedEventsChannelName(this.betProvider, parsedMessage.gameName, parsedMessage.betType),
{
betProviderName: parsedMessage.betProviderName,
betType: parsedMessage.betType,
gameName: parsedMessage.gameName,
data: parsedResults
}
);
logger.trace("Published messages to redis on channel: ", getRedisProcessedEventsChannelName(this.betProvider, parsedMessage.gameName, parsedMessage.betType));
} else {
logger.error("Failed to parse html into games: ", {
const message = "Failed to get redis publisher to send processed events: ";
logger.error(message, {
betProviderName: parsedMessage.betProviderName,
betType: parsedMessage.betType,
fromUrl: parsedMessage.fromUrl,
gameName: parsedMessage.gameName,
errorMessage: results2.value.message
errorMessage: getRedisPublisherResult.value.message
});
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/core/parsers/orbit/parser_types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { TimeZones } from "../../../utils/types/common";

const {logger} = getConfig();

export function processOrbitThreeWayGamesHtml(html: string): Result<any[], Error> {
export function processOrbitGamesHtml(html: string): Result<any[], Error> {
const gameEvents: any[] = [];

const $ = cheerio.load(html);
Expand Down Expand Up @@ -83,7 +83,7 @@ export function processOrbitThreeWayGamesHtml(html: string): Result<any[], Error
});

finalMapping = finalMapping.map(item => {
return {...item, ...{eventDate: momentTz(`${item.startDate} ${item.parsedTime[0]}`, "ddd DD MMM HH:mm").toDate()}};
return {...item, ...{estimatedStartTimeUtc: momentTz(`${item.startDate} ${item.parsedTime[0]}`, "ddd DD MMM HH:mm").toDate()}};
});

logger.trace(finalMapping);
Expand Down
1 change: 0 additions & 1 deletion src/core/scrapping/orbit/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ export class OrbitScrapper extends BaseScrapper {

if (getHtmlResult.result === "success") {
logger.info("Successfully fetched html for url. ", metadata);
logger.info(getHtmlResult.value.html);
this.publishRawHtmlToRedis(
getRedisPublisherResult.value,
getRedisHtmlParserChannelName(this.betProvider, game),
Expand Down
6 changes: 3 additions & 3 deletions src/testbed/testbed_3.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { BetikaGameEventsProcessor } from "../core/game_events/betika";
import { OrbitGameEventsProcessor } from "../core/game_events/orbit";

const gameEventsProcessor = new BetikaGameEventsProcessor();
gameEventsProcessor.subscribeToChannels();
const gameEventsProcessor = new OrbitGameEventsProcessor();
gameEventsProcessor.initGameEventsListener();