Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better logging for transaction writer, and TXUpdate de-dup #108

Merged
merged 2 commits into from
Feb 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ require (
github.com/gorilla/mux v1.8.1
github.com/gorilla/websocket v1.5.1
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/hyperledger/firefly-common v1.4.2
github.com/hyperledger/firefly-common v1.4.6-0.20240131185020-80d20a173401
github.com/lib/pq v1.10.9
github.com/oklog/ulid/v2 v2.1.0
github.com/prometheus/client_golang v1.18.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpO
github.com/huandu/xstrings v1.3.3/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE=
github.com/huandu/xstrings v1.4.0 h1:D17IlohoQq4UcpqD7fDk80P7l+lwAmlFaBHgOipl2FU=
github.com/huandu/xstrings v1.4.0/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE=
github.com/hyperledger/firefly-common v1.4.2 h1:sBbiTFWDu1qCnXFA6JobasJl4AXphCAUZU/R4nyWPdE=
github.com/hyperledger/firefly-common v1.4.2/go.mod h1:jkErZdQmC9fsAJZQO427tURdwB9iiW+NMUZSqS3eBIE=
github.com/hyperledger/firefly-common v1.4.6-0.20240131185020-80d20a173401 h1:bcIg8zUalHyjxPmhIggwg/VK/IVDvpH2XJwCkfAYrSU=
github.com/hyperledger/firefly-common v1.4.6-0.20240131185020-80d20a173401/go.mod h1:jkErZdQmC9fsAJZQO427tURdwB9iiW+NMUZSqS3eBIE=
github.com/imdario/mergo v0.3.11/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA=
github.com/imdario/mergo v0.3.16 h1:wwQJbIsHYGMUyLSPrEq1CT16AhnhNJQ51+4fdHUnCl4=
github.com/imdario/mergo v0.3.16/go.mod h1:WBLT9ZmE3lPoWsEzCh9LPo3TiwVN+ZKEjmz+hD27ysY=
Expand Down
7 changes: 5 additions & 2 deletions internal/persistence/postgres/eventstreams_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,11 @@ func TestEventStreamAfterPaginatePSQL(t *testing.T) {
var eventStreams []*apitypes.EventStream
for i := 0; i < 20; i++ {
es := &apitypes.EventStream{
ID: fftypes.NewUUID(),
Name: strPtr(fmt.Sprintf("es_%.3d", i)),
ID: fftypes.NewUUID(),
Name: strPtr(fmt.Sprintf("es_%.3d", i)),
BatchTimeout: ffDurationPtr(22222 * time.Second),
RetryTimeout: ffDurationPtr(33333 * time.Second),
BlockedRetryDelay: ffDurationPtr(44444 * time.Second),
}
err := p.WriteStream(ctx, es)
assert.NoError(t, err)
Expand Down
26 changes: 22 additions & 4 deletions internal/persistence/postgres/transaction_writer.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -40,6 +40,7 @@ type transactionOperation struct {
sentConflict bool
done chan error

opID string
isShutdown bool
txInsert *apitypes.ManagedTX
noncePreAssigned bool
Expand Down Expand Up @@ -122,6 +123,7 @@ func newTransactionWriter(bgCtx context.Context, p *sqlPersistence, conf config.

func newTransactionOperation(txID string) *transactionOperation {
return &transactionOperation{
opID: fftypes.ShortID(),
txID: txID,
done: make(chan error, 1), // 1 slot to ensure we don't block the writer
}
Expand All @@ -130,6 +132,7 @@ func newTransactionOperation(txID string) *transactionOperation {
func (op *transactionOperation) flush(ctx context.Context) error {
select {
case err := <-op.done:
log.L(ctx).Debugf("Flushed write operation %s (err=%v)", op.opID, err)
return err
case <-ctx.Done():
return i18n.NewError(ctx, i18n.MsgContextCanceled)
Expand Down Expand Up @@ -165,6 +168,7 @@ func (tw *transactionWriter) queue(ctx context.Context, op *transactionOperation
h := fnv.New32a() // simple non-cryptographic hash algo
_, _ = h.Write([]byte(hashKey))
routine := h.Sum32() % tw.workerCount
log.L(ctx).Debugf("Queuing write operation %s to worker tx_writer_%.4d", op.opID, routine)
select {
case tw.workQueues[routine] <- op: // it's queued
case <-ctx.Done(): // timeout of caller context
Expand All @@ -180,6 +184,7 @@ func (tw *transactionWriter) worker(i int) {
defer close(tw.workersDone[i])
workerID := fmt.Sprintf("tx_writer_%.4d", i)
ctx := log.WithLogField(tw.bgCtx, "job", workerID)
l := log.L(ctx)
var batch *transactionWriterBatch
batchCount := 0
workQueue := tw.workQueues[i]
Expand Down Expand Up @@ -208,18 +213,20 @@ func (tw *transactionWriter) worker(i int) {
batchCount++
}
batch.ops = append(batch.ops, op)
l.Debugf("Added write operation %s to batch %s (len=%d)", op.opID, batch.id, len(batch.ops))
case <-timeoutContext.Done():
timedOut = true
select {
case <-ctx.Done():
log.L(ctx).Debugf("Transaction writer ending")
l.Debugf("Transaction writer ending")
return
default:
}
}

if batch != nil && (timedOut || (len(batch.ops) >= tw.batchMaxSize)) {
batch.timeoutCancel()
l.Debugf("Running batch %s (len=%d)", batch.id, len(batch.ops))
tw.runBatch(ctx, batch)
batch = nil
}
Expand Down Expand Up @@ -383,6 +390,7 @@ func (tw *transactionWriter) preInsertIdempotencyCheck(ctx context.Context, b *t
txOp.sentConflict = true
txOp.done <- i18n.NewError(ctx, tmmsgs.MsgDuplicateID, txOp.txID)
} else {
log.L(ctx).Debugf("Adding TX %s from write operation %s to insert idx=%d", txOp.txID, txOp.opID, len(validInserts))
validInserts = append(validInserts, txOp.txInsert)
}
}
Expand Down Expand Up @@ -413,9 +421,19 @@ func (tw *transactionWriter) executeBatchOps(ctx context.Context, b *transaction
}
}
// Do all the transaction updates
mergedUpdates := make(map[string]*apitypes.TXUpdates)
for _, op := range b.txUpdates {
if err := tw.p.updateTransaction(ctx, op.txID, op.txUpdate); err != nil {
log.L(ctx).Errorf("Update transaction %s failed: %s", op.txID, err)
update, merge := mergedUpdates[op.txID]
if merge {
update.Merge(op.txUpdate)
} else {
mergedUpdates[op.txID] = op.txUpdate
}
log.L(ctx).Debugf("Updating transaction %s in write operation %s (merged=%t)", op.txID, op.opID, merge)
}
for txID, update := range mergedUpdates {
if err := tw.p.updateTransaction(ctx, txID, update); err != nil {
log.L(ctx).Errorf("Update transaction %s failed: %s", txID, err)
return err
}
}
Expand Down
73 changes: 73 additions & 0 deletions internal/persistence/postgres/transaction_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/hyperledger/firefly-common/pkg/fftypes"
"github.com/hyperledger/firefly-transaction-manager/pkg/apitypes"
"github.com/hyperledger/firefly-transaction-manager/pkg/ffcapi"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -275,6 +276,48 @@ func TestExecuteBatchOpsUpdateTXFail(t *testing.T) {
assert.NoError(t, mdb.ExpectationsWereMet())
}

func TestExecuteBatchOpsUpdateTXMerge(t *testing.T) {
logrus.SetLevel(logrus.TraceLevel)

ctx, p, mdb, done := newMockSQLPersistence(t)
defer done()

mdb.ExpectBegin()
mdb.ExpectExec("UPDATE.*").WillReturnResult(sqlmock.NewResult(-1, 1))
mdb.ExpectExec("UPDATE.*").WillReturnResult(sqlmock.NewResult(-1, 1))
mdb.ExpectCommit()

err := p.db.RunAsGroup(ctx, func(ctx context.Context) error {
return p.writer.executeBatchOps(ctx, &transactionWriterBatch{
txUpdates: []*transactionOperation{
{
txID: "11111",
txUpdate: &apitypes.TXUpdates{
Status: ptrTo(apitypes.TxStatusPending),
From: strPtr("0xaaaaa"),
},
},
{
txID: "22222",
txUpdate: &apitypes.TXUpdates{
Status: ptrTo(apitypes.TxStatusPending),
},
},
{
txID: "11111",
txUpdate: &apitypes.TXUpdates{
Status: ptrTo(apitypes.TxStatusSucceeded),
TransactionHash: strPtr("0xaabbcc"),
},
},
},
})
})
assert.NoError(t, err)

assert.NoError(t, mdb.ExpectationsWereMet())
}

func TestExecuteBatchOpsUpsertReceiptFail(t *testing.T) {
ctx, p, mdb, done := newMockSQLPersistence(t)
defer done()
Expand Down Expand Up @@ -455,3 +498,33 @@ func TestQueueClosedContext(t *testing.T) {
p.writer.queue(closedCtx, newTransactionOperation("tx1"))

}

func TestStopDoneWorker(t *testing.T) {
tw := &transactionWriter{
workersDone: []chan struct{}{
make(chan struct{}),
},
}
tw.bgCtx, tw.cancelCtx = context.WithCancel(context.Background())
close(tw.workersDone[0])
tw.stop()
}

func TestStopDoneCtx(t *testing.T) {
tw := &transactionWriter{
workersDone: []chan struct{}{
make(chan struct{}, 1),
},
}
tw.bgCtx, tw.cancelCtx = context.WithCancel(context.Background())
tw.cancelCtx()
go func() {
time.Sleep(10 * time.Millisecond)
tw.workersDone[0] <- struct{}{}
}()
tw.stop()
}

func ptrTo[T any](v T) *T {
return &v
}
47 changes: 46 additions & 1 deletion pkg/apitypes/managed_tx.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -208,6 +208,51 @@ type TXUpdates struct {
ErrorMessage *string `json:"errorMessage,omitempty"`
}

func (txu *TXUpdates) Merge(txu2 *TXUpdates) {
if txu2.Status != nil {
txu.Status = txu2.Status
}
if txu2.DeleteRequested != nil {
txu.DeleteRequested = txu2.DeleteRequested
}
if txu2.From != nil {
txu.From = txu2.From
}
if txu2.To != nil {
txu.To = txu2.To
}
if txu2.Nonce != nil {
txu.Nonce = txu2.Nonce
}
if txu2.Gas != nil {
txu.Gas = txu2.Gas
}
if txu2.Value != nil {
txu.Value = txu2.Value
}
if txu2.GasPrice != nil {
txu.GasPrice = txu2.GasPrice
}
if txu2.TransactionData != nil {
txu.TransactionData = txu2.TransactionData
}
if txu2.TransactionHash != nil {
txu.TransactionHash = txu2.TransactionHash
}
if txu2.PolicyInfo != nil {
txu.PolicyInfo = txu2.PolicyInfo
}
if txu2.FirstSubmit != nil {
txu.FirstSubmit = txu2.FirstSubmit
}
if txu2.LastSubmit != nil {
txu.LastSubmit = txu2.LastSubmit
}
if txu2.ErrorMessage != nil {
txu.ErrorMessage = txu2.ErrorMessage
}
}

// TXWithStatus is a convenience object that fetches all data about a transaction into one
// large JSON payload (with limits on certain parts, such as the history entries).
// Note that in LevelDB persistence this is the stored form of the single document object.
Expand Down
28 changes: 28 additions & 0 deletions pkg/apitypes/managed_tx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,31 @@ func TestReceiptRecord(t *testing.T) {
r.SetUpdated(t2)
assert.Equal(t, t2, r.Updated)
}

func TestTXUpdatesMerge(t *testing.T) {
txu := &TXUpdates{}
txu2 := &TXUpdates{
Status: ptrTo(TxStatusPending),
DeleteRequested: fftypes.Now(),
From: ptrTo("1111"),
To: ptrTo("2222"),
Nonce: fftypes.NewFFBigInt(3333),
Gas: fftypes.NewFFBigInt(4444),
Value: fftypes.NewFFBigInt(5555),
GasPrice: fftypes.JSONAnyPtr(`{"some": "stuff"}`),
TransactionData: ptrTo("xxxx"),
TransactionHash: ptrTo("yyyy"),
PolicyInfo: fftypes.JSONAnyPtr(`{"more": "stuff"}`),
FirstSubmit: fftypes.Now(),
LastSubmit: fftypes.Now(),
ErrorMessage: ptrTo("pop"),
}
txu.Merge(txu2)
assert.Equal(t, *txu2, *txu)
txu.Merge(&TXUpdates{})
assert.Equal(t, *txu2, *txu)
}

func ptrTo[T any](v T) *T {
return &v
}