parallel parsing
This commit is contained in:
@@ -3,17 +3,26 @@ package shreder
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"runtime"
|
||||
"time"
|
||||
|
||||
"github.com/gagliardetto/solana-go/rpc"
|
||||
"github.com/panjf2000/ants/v2"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials/insecure"
|
||||
)
|
||||
|
||||
type Client struct {
|
||||
conn *grpc.ClientConn
|
||||
client ShrederServiceClient
|
||||
tableLoader *AddressTables
|
||||
subscription map[string]*SubscribeRequestFilterTransactions
|
||||
EnableClientStats bool
|
||||
conn *grpc.ClientConn
|
||||
client ShrederServiceClient
|
||||
tableLoader *AddressTables
|
||||
subscription map[string]*SubscribeRequestFilterTransactions
|
||||
|
||||
pool *ants.Pool
|
||||
|
||||
lastSlot uint64
|
||||
lastSlotTime time.Time
|
||||
}
|
||||
|
||||
func NewShrederClient(
|
||||
@@ -30,11 +39,19 @@ func NewShrederClient(
|
||||
return nil, func() {}, err
|
||||
}
|
||||
|
||||
poolSize := runtime.NumCPU()*2 + 2
|
||||
logger.Info("creating shreder client", "url", url, "pool_size", poolSize)
|
||||
pool, err := ants.NewPool(poolSize, ants.WithNonblocking(false))
|
||||
if err != nil {
|
||||
return nil, func() {}, err
|
||||
}
|
||||
|
||||
s := &Client{
|
||||
conn: conn,
|
||||
client: NewShrederServiceClient(conn),
|
||||
subscription: subscription,
|
||||
tableLoader: NewAddressTables(rpcClient),
|
||||
pool: pool,
|
||||
}
|
||||
|
||||
return s, func() {
|
||||
@@ -45,6 +62,10 @@ func NewShrederClient(
|
||||
func (c *Client) Wait() {
|
||||
logger.Debug("waiting for shreder client to stop")
|
||||
|
||||
if c.pool != nil {
|
||||
c.pool.Release()
|
||||
}
|
||||
|
||||
err := c.conn.Close()
|
||||
if err != nil {
|
||||
logger.Error("failed to close connection: ", "err", err)
|
||||
@@ -59,6 +80,8 @@ func (c *Client) ReadSync(ctx context.Context, txCh chan<- TxSignalBatch) error
|
||||
return err
|
||||
}
|
||||
|
||||
logger.Debug("subscribing to transactions")
|
||||
|
||||
err = stream.Send(&SubscribeTransactionsRequest{
|
||||
Transactions: c.subscription,
|
||||
})
|
||||
@@ -72,20 +95,38 @@ func (c *Client) ReadSync(ctx context.Context, txCh chan<- TxSignalBatch) error
|
||||
return err
|
||||
}
|
||||
|
||||
txBatch := ParseTransaction(response.Transaction, c.tableLoader)
|
||||
if len(txBatch) == 0 {
|
||||
continue
|
||||
if c.EnableClientStats {
|
||||
slot := response.Transaction.Slot
|
||||
now := time.Now()
|
||||
if c.lastSlotTime.IsZero() || slot > c.lastSlot {
|
||||
if !c.lastSlotTime.IsZero() {
|
||||
logger.Info("block processed", "running", c.pool.Running(), "slot", slot, "prev_slot", c.lastSlot, "delta_ms", now.Sub(c.lastSlotTime).Milliseconds())
|
||||
}
|
||||
c.lastSlot = slot
|
||||
c.lastSlotTime = now
|
||||
}
|
||||
}
|
||||
|
||||
// set fixed source for tx signals
|
||||
for _, tx := range txBatch {
|
||||
tx.Source = "shreder"
|
||||
}
|
||||
txData := response.Transaction
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case txCh <- txBatch:
|
||||
err = c.pool.Submit(func() {
|
||||
txBatch := ParseTransaction(txData, c.tableLoader, c.EnableClientStats)
|
||||
if len(txBatch) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
for _, tx := range txBatch {
|
||||
tx.Source = "shreder"
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case txCh <- txBatch:
|
||||
}
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user