Skip to content

Commit

Permalink
rename kafka TimeZone to DateTimeZone #1043
Browse files Browse the repository at this point in the history
  • Loading branch information
ffffwh committed Feb 23, 2023
1 parent 49d9a1b commit 192b3e2
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 12 deletions.
4 changes: 2 additions & 2 deletions driver/common/taskconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ type DtleTaskConfig struct {
SrcConnectionConfig *mysqlconfig.ConnectionConfig `codec:"SrcConnectionConfig"`
DestConnectionConfig *mysqlconfig.ConnectionConfig `codec:"DestConnectionConfig"`
KafkaConfig *KafkaConfig `codec:"KafkaConfig"`
DestType string `codec:"DestType"`
DestType string `codec:"DestType"`
// support oracle extractor/applier
SrcOracleConfig *config.OracleConfig `codec:"SrcOracleConfig"`
}
Expand Down Expand Up @@ -146,7 +146,7 @@ type KafkaConfig struct {
Brokers []string
Topic string
Converter string
TimeZone string
DateTimeZone string
User string
Password string
MessageGroupMaxSize uint64
Expand Down
2 changes: 1 addition & 1 deletion driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ var (
"Brokers": hclspec.NewAttr("Brokers", "list(string)", true),
"Converter": hclspec.NewDefault(hclspec.NewAttr("Converter", "string", false),
hclspec.NewLiteral(`"json"`)),
"TimeZone": hclspec.NewDefault(hclspec.NewAttr("TimeZone", "string", false),
"DateTimeZone": hclspec.NewDefault(hclspec.NewAttr("DateTimeZone", "string", false),
hclspec.NewLiteral(`"UTC"`)),
"MessageGroupMaxSize": hclspec.NewAttr("MessageGroupMaxSize", "number", false),
"MessageGroupTimeout": hclspec.NewAttr("MessageGroupTimeout", "number", false),
Expand Down
19 changes: 10 additions & 9 deletions driver/kafka/kafka3.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ type KafkaRunner struct {
BinlogFile string
BinlogPos int64

location *time.Location
// Only for Datetime column type. Not for
dateTimeLocation *time.Location

chBinlogEntries chan *common.DataEntries
chDumpEntry chan *common.DumpEntry
Expand Down Expand Up @@ -108,7 +109,7 @@ func NewKafkaRunner(execCtx *common.ExecContext, logger g.LoggerType, storeManag
chDumpEntry: make(chan *common.DumpEntry, 2),
chBinlogEntries: make(chan *common.DataEntries, 2),

location: time.UTC, // default value
dateTimeLocation: time.UTC, // default value

memory1: new(int64),
memory2: new(int64),
Expand Down Expand Up @@ -280,8 +281,8 @@ func (kr *KafkaRunner) Run() {
return
}

if kr.kafkaConfig.TimeZone != "" {
kr.location, err = time.LoadLocation(kr.kafkaConfig.TimeZone)
if kr.kafkaConfig.DateTimeZone != "" {
kr.dateTimeLocation, err = time.LoadLocation(kr.kafkaConfig.DateTimeZone)
if err != nil {
kr.onError(common.TaskStateDead, errors.Wrap(err, "LoadLocation"))
return
Expand Down Expand Up @@ -318,7 +319,7 @@ func (kr *KafkaRunner) getOrSetTable(schemaName string, tableName string,
} else {
kr.logger.Debug("new table info", "schemaName", schemaName, "tableName", tableName)
tableIdent := fmt.Sprintf("%v.%v.%v", kr.kafkaMgr.Cfg.Topic, table.TableSchema, table.TableName)
colDefs, keyColDefs := kafkaColumnListToColDefs(table.OriginalTableColumns, kr.location)
colDefs, keyColDefs := kafkaColumnListToColDefs(table.OriginalTableColumns, kr.dateTimeLocation)
keySchema := &SchemaJson{
schema: NewKeySchema(tableIdent, keyColDefs),
}
Expand Down Expand Up @@ -793,7 +794,7 @@ func (kr *KafkaRunner) kafkaTransformSnapshotData(
value = base64.StdEncoding.EncodeToString([]byte(valueStr))
case mysqlconfig.DateTimeColumnType:
if valueStr != "" {
value = DateTimeValue(valueStr, kr.location)
value = DateTimeValue(valueStr, kr.dateTimeLocation)
}
case mysqlconfig.DateColumnType:
if valueStr != "" {
Expand Down Expand Up @@ -1088,7 +1089,7 @@ func (kr *KafkaRunner) kafkaConvertArg(column *mysqlconfig.Column, theValue inte
}
case mysqlconfig.DateTimeColumnType:
if theValue != nil {
theValue = DateTimeValue(theValue.(string), kr.location)
theValue = DateTimeValue(theValue.(string), kr.dateTimeLocation)
}
case mysqlconfig.VarbinaryColumnType:
if theValue != nil {
Expand Down Expand Up @@ -1224,7 +1225,7 @@ func reverseBytes(bytes []byte) string {
return value
}

func kafkaColumnListToColDefs(colList *common.ColumnList, loc *time.Location) (valColDefs ColDefs, keyColDefs ColDefs) {
func kafkaColumnListToColDefs(colList *common.ColumnList, dateTimeLoc *time.Location) (valColDefs ColDefs, keyColDefs ColDefs) {
cols := colList.ColumnList()
for i, _ := range cols {
var field *Schema
Expand Down Expand Up @@ -1304,7 +1305,7 @@ func kafkaColumnListToColDefs(colList *common.ColumnList, loc *time.Location) (v
case mysqlconfig.YearColumnType:
field = NewYearField(SCHEMA_TYPE_INT32, optional, fieldName, defaultValue)
case mysqlconfig.DateTimeColumnType:
field = NewDateTimeField(optional, fieldName, defaultValue, loc)
field = NewDateTimeField(optional, fieldName, defaultValue, dateTimeLoc)
case mysqlconfig.TimeColumnType:
field = NewTimeField(optional, fieldName, defaultValue)
case mysqlconfig.TimestampColumnType:
Expand Down

0 comments on commit 192b3e2

Please sign in to comment.