Files
pump-parser/internal/example/yellowstone.go

295 lines
7.6 KiB
Go
Raw Permalink Normal View History

2025-12-22 17:56:40 +08:00
package parser
2025-11-20 17:56:45 +08:00
import (
"context"
"crypto/x509"
"encoding/json"
"fmt"
"io"
"log"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
types "github.com/thloyi/pump-parser"
2025-12-22 17:56:40 +08:00
pb "go.onsig.ai/onsig/yellowstone-proto"
2025-11-20 17:56:45 +08:00
)
type Handler interface {
HandleMessage(rawTx *types.RawTx)
}
var kacp = keepalive.ClientParameters{
Time: 10 * time.Second, // send pings every 10 seconds if there is no activity
Timeout: time.Second, // wait 1 second for ping ack before considering the connection dead
PermitWithoutStream: true, // send pings even without active streams
}
type Client struct {
ch chan SubscriptionMessage
endpoint string
conn *grpc.ClientConn
ctx context.Context
lastReceiveTime time.Time
backoffFactor float64
subscription *pb.SubscribeRequest
subStatus bool
leastBlock BlockInfo
firstMessage bool
handler Handler
}
func NewClientWithPumpSwap(endpoint string, ch chan SubscriptionMessage) *Client {
var subscription pb.SubscribeRequest
var failed = false
var vote = false
subscription.Transactions = make(map[string]*pb.SubscribeRequestFilterTransactions)
subscription.Transactions["transactions_sub"] = &pb.SubscribeRequestFilterTransactions{
Failed: &failed,
Vote: &vote,
}
subscription.Transactions["transactions_sub"].AccountInclude = []string{
"pAMMBay6oceH9fJKBRHGP5D4bD4sWpmSwMn52FMfXEA", //Pump AMM
"6EF8rrecthR5Dkzon8Nwu78hRvfCKubJ14M5uBEwF6P", //Pump
}
2025-12-22 17:56:40 +08:00
subscription.Transactions["transactions_sub"].AccountRequired = []string{
"ARu4n5mFdZogZAravu7CcizaojWnS6oqka37gdLT5SZn",
}
2025-11-20 17:56:45 +08:00
subscription.BlocksMeta = make(map[string]*pb.SubscribeRequestFilterBlocksMeta)
subscription.BlocksMeta["block_meta"] = &pb.SubscribeRequestFilterBlocksMeta{}
c := &Client{
backoffFactor: 1.5,
ch: ch,
endpoint: endpoint,
lastReceiveTime: time.Now(),
subStatus: false,
subscription: &subscription,
}
2025-11-21 12:01:44 +08:00
c.handler = NewPumpHandler(func(tx *types.Tx) {
c.sendTx(tx)
2025-11-20 17:56:45 +08:00
})
return c
}
func NewClientWithLaunchLab(endpoint string, ch chan SubscriptionMessage) *Client {
var subscription pb.SubscribeRequest
var failed = false
var vote = false
subscription.Transactions = make(map[string]*pb.SubscribeRequestFilterTransactions)
subscription.Transactions["transactions_sub"] = &pb.SubscribeRequestFilterTransactions{
Failed: &failed,
Vote: &vote,
}
subscription.Transactions["transactions_sub"].AccountInclude = []string{
"LanMV9sAd7wArD4vJFi2qDdfnVhFxYSUg6eADduJ3uj", //LaunchLab
"CPMMoo8L3F4NbTegBCKVNunggL7H1ZpdTHKxQB5qKP1C", //CPMM
//"675kPX9MHTjS2zt1qfr1NYHuzeLXfQM9H24wFSUt1Mp8", //V4
}
subscription.BlocksMeta = make(map[string]*pb.SubscribeRequestFilterBlocksMeta)
subscription.BlocksMeta["block_meta"] = &pb.SubscribeRequestFilterBlocksMeta{}
c := &Client{
backoffFactor: 1.5,
ch: ch,
endpoint: endpoint,
lastReceiveTime: time.Now(),
subStatus: false,
subscription: &subscription,
}
2025-11-21 12:01:44 +08:00
c.handler = NewPumpHandler(func(tx *types.Tx) {
c.sendTx(tx)
2025-11-20 17:56:45 +08:00
})
return c
}
func RunLoopWithReConnect(ctx context.Context, endpoint, program string, ch chan SubscriptionMessage) {
var client *Client
if program == types.SolProgramRaydiumLaunchLab {
client = NewClientWithLaunchLab(endpoint, ch)
} else {
client = NewClientWithPumpSwap(endpoint, ch)
}
for {
select {
case <-ctx.Done():
log.Println("Context done, exiting loop")
return
default:
}
err := client.Connect(ctx)
if err != nil {
log.Printf("Failed to connect: %v", err)
time.Sleep(5 * time.Second)
continue
}
// should not reach here, because Connect will block
panic("geyser already connected, waiting for messages...")
}
}
func (c *Client) SetSubscribe(subscription *pb.SubscribeRequest) {
c.subscription = subscription
}
func (c *Client) Connect(ctx context.Context) error {
c.ctx = ctx
if c.conn == nil {
// 连接到 geyser
conn, err := c.grpcConnect(c.endpoint, true)
if err != nil {
return err
}
c.conn = conn
}
if c.subStatus {
return nil // 已经订阅了
}
// 订阅交易
err := c.grpcSubscribe(ctx, c.conn)
if err != nil {
if c.conn != nil {
c.conn.Close()
}
c.conn = nil
c.subStatus = false
log.Printf("Failed to subscribe: %v", err)
return err
}
return nil
}
func (c *Client) grpcConnect(address string, plaintext bool) (*grpc.ClientConn, error) {
var opts []grpc.DialOption
if plaintext {
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
} else {
pool, _ := x509.SystemCertPool()
creds := credentials.NewClientTLSFromCert(pool, "")
opts = append(opts, grpc.WithTransportCredentials(creds))
}
opts = append(opts, grpc.WithKeepaliveParams(kacp))
log.Println("Starting grpc client, connecting to", address)
conn, err := grpc.NewClient(address, opts...)
if err != nil {
return nil, fmt.Errorf("fail to dial: %v", err)
}
return conn, nil
}
func (c *Client) grpcSubscribe(ctx context.Context, conn *grpc.ClientConn) error {
var err error
client := pb.NewGeyserClient(conn)
//subscription.Transactions["transactions_sub"].AccountExclude = transactionsAccountsExclude
subscriptionJson, err := json.Marshal(c.subscription)
if err != nil {
log.Printf("Failed to marshal subscription request: %v", subscriptionJson)
return err
}
log.Printf("Subscription request: %s", string(subscriptionJson))
// Set up the subscription request
//if *token != "" {
// md := metadata.New(map[string]string{"x-token": *token})
// ctx = metadata.NewOutgoingContext(ctx, md)
//}
md := metadata.New(map[string]string{"x-token": "5adcf1f9-5719-43d1-bf3f-c2d4e1e5f94d"})
ctx = metadata.NewOutgoingContext(ctx, md)
stream, err := client.Subscribe(ctx)
if err != nil {
return err
}
err = stream.Send(c.subscription)
if err != nil {
return err
}
c.subStatus = true
c.firstMessage = true
for {
resp, err := stream.Recv()
if err == io.EOF {
return err
}
if err != nil {
return fmt.Errorf("error occurred in receiving update: %s", err)
}
txn := resp.GetTransaction()
if txn == nil {
blockMeta := resp.GetBlockMeta()
if blockMeta != nil && c.ch != nil {
c.sendBlock(blockMeta)
}
continue
}
2025-12-22 17:56:40 +08:00
rawTx, err := types.ConvertYellowstoneGrpcTransactionToSolanaTransaction(txn, resp.GetCreatedAt().Seconds)
2025-11-20 17:56:45 +08:00
if err != nil {
log.Printf("Failed to convert transaction: %v", err)
continue
}
c.handler.HandleMessage(rawTx)
}
}
func (c *Client) computeDelay(slot uint64) int64 {
if c.leastBlock.Slot == 0 {
return 0
}
if slot < c.leastBlock.Slot {
return 0
}
delay := time.Now().Unix() - c.leastBlock.BlockTime - (4 * int64(slot-c.leastBlock.Slot) / 10)
return delay
}
2025-11-21 12:01:44 +08:00
func (c *Client) sendTx(t *types.Tx) {
2025-11-20 17:56:45 +08:00
c.ch <- SubscriptionMessage{
Reconnect: c.firstMessage,
2025-11-21 12:01:44 +08:00
EstimateDelaySecond: c.computeDelay(t.Block),
2025-11-20 17:56:45 +08:00
Block: nil,
Tx: t,
}
c.firstMessage = false
}
func (c *Client) sendBlock(blockMeta *pb.SubscribeUpdateBlockMeta) {
c.leastBlock.Slot = blockMeta.GetSlot()
c.leastBlock.BlockTime = blockMeta.GetBlockTime().Timestamp
c.leastBlock.BlockHash = blockMeta.Blockhash
c.leastBlock.Height = blockMeta.BlockHeight.BlockHeight
c.ch <- SubscriptionMessage{
EstimateDelaySecond: time.Now().Unix() - blockMeta.GetBlockTime().Timestamp,
Reconnect: c.firstMessage,
Block: &BlockInfo{
Slot: c.leastBlock.Slot,
BlockTime: c.leastBlock.BlockTime,
BlockHash: c.leastBlock.BlockHash,
Height: c.leastBlock.Height,
},
2025-11-21 12:01:44 +08:00
Tx: nil,
2025-11-20 17:56:45 +08:00
}
c.firstMessage = false
}