26 Commits
v0.0.1 ... test

Author SHA1 Message Date
thloyi
9f73e8f57f pipo tx parse 2026-01-23 17:58:59 +08:00
6bab10866b fix: photon buy 2026-01-08 16:25:34 +08:00
83aa772710 chore: improve test 2026-01-08 16:20:17 +08:00
da51b19b50 chore: add bonk parser 2026-01-08 16:07:57 +08:00
f39b89b497 chore: add simple test 2026-01-08 15:20:50 +08:00
26e07ec52e chore: add gmgn and remove entry contract 2026-01-08 12:54:21 +08:00
35c57c3c7a chore: remove useless 2026-01-08 12:46:58 +08:00
3e58b62e1f chore: enable term 2026-01-08 12:44:47 +08:00
thloyi
4c0abc5c34 stats 2026-01-08 11:57:57 +08:00
thloyi
d9aea3e8d7 parallel parsing 2026-01-07 21:15:54 +08:00
thloyi
b82b7d9b0e dflow parser 2026-01-07 18:03:30 +08:00
d9bc106eb1 chore: add SWQoSAgentSoyas address 2026-01-07 17:59:21 +08:00
871dac8bd3 chore: improve clients, add soyas and other regions 2026-01-07 17:26:00 +08:00
thloyi
156fd9b0bf okxv2 parser 2026-01-07 15:39:32 +08:00
thloyi
2504636fb0 fix axiom 2026-01-07 13:24:23 +08:00
thloyi
c4d35bd3d4 merge 2026-01-07 13:19:48 +08:00
thloyi
214d9e984e fix axios parser 2026-01-07 13:16:22 +08:00
c30d64fe88 Merge branch 'master' of https://github.com/samlior/libsam 2026-01-07 12:19:12 +08:00
27dde60e93 chore: add entry contract and improve axiom parse 2026-01-07 12:18:24 +08:00
thloyi
122d474524 juptierv6 fix 2026-01-07 11:57:31 +08:00
thloyi
2d3f46ebbf juptierv6 2026-01-07 11:18:02 +08:00
thloyi
c732bb2b46 fix looptable index 2026-01-06 16:42:07 +08:00
99ff9968bd fix address table lookup 2026-01-05 19:34:35 +08:00
thloyi
8c98ec7875 cache address table 2026-01-05 14:38:02 +08:00
thloyi
e6922e4561 loading address tables 2026-01-05 12:45:32 +08:00
4afa412231 chore: add swqos fee addrsses 2025-12-30 16:52:41 +08:00
22 changed files with 6852 additions and 616 deletions

187
README.md
View File

@@ -15,6 +15,8 @@ go get github.com/samlior/libsam
| fra | fra1.shreder.xyz:9991 |
| ams | ams1.shreder.xyz:9991 |
| ewr | ny1.shreder.xyz:9991 |
| uk | lon.shreder.xyz:9991 |
| jp | tyo.shreder.xyz:9991 |
### Usage
@@ -105,6 +107,13 @@ See [example](./cmd/shreder/main.go).
"keepAliveUrl": "http://germany.solana.dex.blxrbdn.com/ping",
"tips": "0.001",
"rateLimit": 0
},
{
"name": "soyas",
"sendTxUrl": "fra.landing.soyas.xyz:9000",
"sendBundleUrl": "",
"tips": "0.001",
"rateLimit": 0
}
```
@@ -191,6 +200,13 @@ See [example](./cmd/shreder/main.go).
"keepAliveUrl": "http://amsterdam.solana.dex.blxrbdn.com/api/v2/ping",
"tips": "0.001",
"rateLimit": 0
},
{
"name": "soyas",
"sendTxUrl": "ams.landing.soyas.xyz:9000",
"sendBundleUrl": "",
"tips": "0.001",
"rateLimit": 0
}
```
@@ -278,6 +294,177 @@ See [example](./cmd/shreder/main.go).
"keepAliveUrl": "http://ny.solana.dex.blxrbdn.com/api/v2/ping",
"tips": "0.001",
"rateLimit": 0
},
{
"name": "soyas",
"sendTxUrl": "nyc.landing.soyas.xyz:9000",
"sendBundleUrl": "",
"tips": "0.001",
"rateLimit": 0
}
```
</details>
<details><summary> London </summary>
```json
{
"name": "helius",
"sendTxUrl": "http://lon-sender.helius-rpc.com/fast",
"sendBundleUrl": "",
"keepAliveUrl": "http://lon-sender.helius-rpc.com/ping",
"tips": "0.001",
"rateLimit": 0
},
{
"name": "blockrazor",
"sendTxUrl": "london.solana-grpc.blockrazor.xyz:80",
"sendBundleUrl": "",
"tips": "0.001",
"rateLimit": 0
},
{
"name": "node1",
"sendTxUrl": "http://lon.node1.me",
"sendBundleUrl": "",
"keepAliveUrl": "http://lon.node1.me/ping",
"tips": "0.001",
"rateLimit": 0
},
{
"name": "nextblock",
"sendTxUrl": "http://london.nextblock.io/api/v2/submit",
"sendBundleUrl": "",
"keepAliveUrl": "http://london.nextblock.io/api/v2/ping",
"tips": "0.001",
"rateLimit": 0
},
{
"name": "flashBlock",
"sendTxUrl": "http://london.flashblock.trade/api/v2/submit-batch",
"sendBundleUrl": "http://london.flashblock.trade/api/v2/submit-batch",
"keepAliveUrl": "http://london.flashblock.trade/api/v2/ping",
"tips": "0.001",
"rateLimit": 0
},
{
"name": "stellium",
"sendTxUrl": "http://lhr1.flashrpc.com/be95e80d-afc2-4a48-b017-db021fc4c19e",
"sendBundleUrl": "",
"keepAliveUrl": "http://lhr1.flashrpc.com/ping",
"tips": "0.001",
"rateLimit": 0
},
{
"name": "blocxroute",
"sendTxUrl": "http://uk.solana.dex.blxrbdn.com/api/v2/submit",
"sendBundleUrl": "",
"keepAliveUrl": "http://uk.solana.dex.blxrbdn.com/api/v2/ping",
"tips": "0.001",
"rateLimit": 0
},
{
"name": "soyas",
"sendTxUrl": "lon.landing.soyas.xyz:9000",
"sendBundleUrl": "",
"tips": "0.001",
"rateLimit": 0
}
```
</details>
<details><summary> Japan </summary>
```json
{
"name": "helius",
"sendTxUrl": "http://tyo-sender.helius-rpc.com/fast",
"sendBundleUrl": "",
"keepAliveUrl": "http://tyo-sender.helius-rpc.com/ping",
"tips": "0.001",
"rateLimit": 0
},
{
"name": "0slot",
"sendTxUrl": "http://jp1.0slot.trade?api-key=3fec78a0d361418a8eff95be9ed85cc3&anti-mev=true",
"sendBundleUrl": "",
"keepAliveUrl": "http://jp1.0slot.trade/ping",
"tips": "0.001",
"rateLimit": 0
},
{
"name": "blockrazor",
"sendTxUrl": "tokyo.solana-grpc.blockrazor.xyz:80",
"sendBundleUrl": "",
"tips": "0.001",
"rateLimit": 0
},
{
"name": "node1",
"sendTxUrl": "http://tk.node1.me",
"sendBundleUrl": "",
"keepAliveUrl": "http://tk.node1.me/ping",
"tips": "0.001",
"rateLimit": 0
},
{
"name": "nextblock",
"sendTxUrl": "http://tokyo.nextblock.io/api/v2/submit",
"sendBundleUrl": "",
"keepAliveUrl": "http://tokyo.nextblock.io/api/v2/ping",
"tips": "0.001",
"rateLimit": 4
},
{
"name": "flashBlock",
"sendTxUrl": "http://tokyo.flashblock.trade/api/v2/submit-batch",
"sendBundleUrl": "http://tokyo.flashblock.trade/api/v2/submit-batch",
"keepAliveUrl": "http://tokyo.flashblock.trade/api/v2/ping",
"tips": "0.001",
"rateLimit": 0
},
{
"name": "astralane",
"sendTxUrl": "http://jp.gateway.astralane.io/iris?api-key=zhaozNc5OIadLPI3r9nUVVPpCZcQAUjngO6Tgr5XUJcmBrIisFaaZF81Ijn01Ytn",
"sendBundleUrl": "",
"keepAliveUrl": "http://jp.gateway.astralane.io/ping",
"tips": "0.001",
"rateLimit": 0
},
{
"name": "nozomi",
"sendTxUrl": "http://tyo1.nozomi.temporal.xyz/?c=34cff37e-f1a5-446a-98bb-66aa1b62cb74",
"sendBundleUrl": "",
"keepAliveUrl": "http://tyo1.nozomi.temporal.xyz/ping",
"tips": "0.001",
"rateLimit": 0
},
{
"name": "stellium",
"sendTxUrl": "http://tyo1.flashrpc.com/be95e80d-afc2-4a48-b017-db021fc4c19e",
"sendBundleUrl": "",
"keepAliveUrl": "http://tyo1.flashrpc.com/ping",
"tips": "0.001",
"rateLimit": 0
},
{
"name": "blocxroute",
"sendTxUrl": "http://tokyo.solana.dex.blxrbdn.com/api/v2/submit",
"sendBundleUrl": "",
"keepAliveUrl": "http://tokyo.solana.dex.blxrbdn.com/api/v2/ping",
"tips": "0.001",
"rateLimit": 0
},
{
"name": "soyas",
"sendTxUrl": "tyo.landing.soyas.xyz:9000",
"sendBundleUrl": "",
"tips": "0.001",
"rateLimit": 0
}
```

View File

@@ -2,12 +2,16 @@ package main
import (
"context"
"encoding/json"
"errors"
"fmt"
"log/slog"
"os"
"os/signal"
"syscall"
"time"
"github.com/gagliardetto/solana-go/rpc"
"github.com/shopspring/decimal"
"github.com/samlior/libsam/pkg/shreder"
)
@@ -17,13 +21,25 @@ func main() {
if url == "" {
panic("URL is not set")
}
rpcUrl := os.Getenv("RPC_URL")
if rpcUrl == "" {
panic("RPC_URL is not set")
}
rpcClient := rpc.New(rpcUrl)
shreder.SetLogLevel(slog.LevelDebug)
shrederClient, cleanup, err := shreder.NewShrederClient(
url,
rpcClient,
map[string]*shreder.SubscribeRequestFilterTransactions{
"pumpfunamm": {
AccountRequired: []string{
//AccountRequired: []string{
// "pAMMBay6oceH9fJKBRHGP5D4bD4sWpmSwMn52FMfXEA",
//},
AccountInclude: []string{
"pAMMBay6oceH9fJKBRHGP5D4bD4sWpmSwMn52FMfXEA",
"GS4CU59F31iL7aR2Q8zVS8DRrcRnXX1yjQ66TqNVQnaR", //Event Authority
"5PHirr8joyTMp9JMm6nW7hNDVyEYdkzDqazxPD7RaTjx", // Fee Config
"pfeeUxB6jkeY1Hxd7CsFCAjcbHA9rWtchMGdZ6VojVZ", // pump fee program
},
},
"photon": {
@@ -31,8 +47,23 @@ func main() {
"BSfD6SHZigAfDWSjzD5Q41jw8LmKwtmjskPH9XW1mrRW",
},
},
"jupiterV6": {
AccountRequired: []string{
"JUP6LkbZbjS1jKKwapdHNy74zcZ3tLUZoi5QNyVTaV4",
},
},
"okxdexroutev2": {
AccountRequired: []string{
"proVF4pMXVaYqmy4NjniPh4pqKNfMmsihgd4wdkCX3u",
},
},
"dflow": {
AccountRequired: []string{
"DF1ow4tspfHX9JwWJsAb9epbkA8hmpSEAtxXy1V27QBH",
},
},
// TODO: axiom, gmgn, etc.
})
}, shreder.BlocksStats(false), shreder.LogParsedStats(true), shreder.ShowTableLoaded(false))
if err != nil {
panic(err)
}
@@ -46,11 +77,10 @@ func main() {
<-exitSignal
cancel()
}()
// async read from shreder
txCh := make(chan shreder.TxSignalBatch, 1000)
txCh := make(chan shreder.TxSignal, 1000)
go func() {
err := shrederClient.ReadSync(ctx, txCh)
err := shrederClient.ReadEntriesSync(ctx, txCh)
if err != nil {
if !errors.Is(err, context.Canceled) {
panic(err)
@@ -62,9 +92,21 @@ func main() {
select {
case <-ctx.Done():
return
case txBatch := <-txCh:
jsonData, _ := json.MarshalIndent(txBatch, "", " ")
fmt.Println(string(jsonData))
case tx := <-txCh:
//jsonData, _ := json.MarshalIndent(txBatch, "", " ")
if tx.Token0Amount.GreaterThan(decimal.NewFromInt(100)) && (tx.Label == "okxdexroutev2" || tx.Label == "jupiterv6" || tx.Label == "dflow") {
fmt.Println(time.Now(), "===============", tx.TxHash,
"parse time:", tx.Stats.Done.Sub(tx.Stats.Filter),
"decode time:", tx.Stats.Decoded.Sub(tx.Stats.FEC),
"filter time:", tx.Stats.Filter.Sub(tx.Stats.Decoded),
"dataLen", tx.Stats.DataLen, "txCount", tx.Stats.TxCount, "txOffset", tx.Stats.TxOffset, tx.Label, tx.Event, "token:", tx.Token0Amount)
}
//if tx.Token0Amount.GreaterThan(decimal.NewFromInt(100)) && (tx.Label == "okxdexroutev2" || tx.Label == "jupiterv6" || tx.Label == "dflow") {
// fmt.Println(time.Now(), "===============", tx.TxHash,
// tx.Label, tx.Event, "token:", tx.Token0Amount)
//}
//fmt.Println(txBatch[0].TxHash)
}
}
}

6
go.mod
View File

@@ -5,7 +5,9 @@ go 1.25.1
require (
github.com/BlockRazorinc/solana-trader-client-go v0.0.0-20250722092120-44561cb37455
github.com/gagliardetto/solana-go v1.12.0
github.com/mr-tron/base58 v1.2.0
github.com/near/borsh-go v0.3.2-0.20220516180422-1ff87d108454
github.com/panjf2000/ants/v2 v2.11.4
github.com/shopspring/decimal v1.4.0
google.golang.org/grpc v1.75.0
google.golang.org/protobuf v1.36.10
@@ -20,6 +22,7 @@ require (
github.com/gagliardetto/binary v0.8.0 // indirect
github.com/gagliardetto/treeout v0.1.4 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.13.6 // indirect
github.com/logrusorgru/aurora v2.0.3+incompatible // indirect
@@ -29,7 +32,7 @@ require (
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/mostynb/zstdpool-freelist v0.0.0-20201229113212-927304c0c3b1 // indirect
github.com/mr-tron/base58 v1.2.0 // indirect
github.com/quic-go/quic-go v0.58.0 // indirect
github.com/streamingfast/logging v0.0.0-20230608130331-f22c91403091 // indirect
go.mongodb.org/mongo-driver v1.12.2 // indirect
go.uber.org/atomic v1.7.0 // indirect
@@ -38,6 +41,7 @@ require (
go.uber.org/zap v1.21.0 // indirect
golang.org/x/crypto v0.44.0 // indirect
golang.org/x/net v0.47.0 // indirect
golang.org/x/sync v0.18.0 // indirect
golang.org/x/sys v0.38.0 // indirect
golang.org/x/term v0.37.0 // indirect
golang.org/x/text v0.31.0 // indirect

12
go.sum
View File

@@ -36,6 +36,8 @@ github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k=
github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/klauspost/compress v1.11.4/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
@@ -68,11 +70,15 @@ github.com/near/borsh-go v0.3.2-0.20220516180422-1ff87d108454 h1:lFN7TVecCMbCHVN
github.com/near/borsh-go v0.3.2-0.20220516180422-1ff87d108454/go.mod h1:NeMochZp7jN/pYFuxLkrZtmLqbADmnp/y1+/dL+AsyQ=
github.com/onsi/gomega v1.10.1 h1:o0+MgICZLuZ7xjH7Vx6zS/zcu93/BEp1VwkIW1mEXCE=
github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
github.com/panjf2000/ants/v2 v2.11.4 h1:UJQbtN1jIcI5CYNocTj0fuAUYvsLjPoYi0YuhqV/Y48=
github.com/panjf2000/ants/v2 v2.11.4/go.mod h1:8u92CYMUc6gyvTIw8Ru7Mt7+/ESnJahz5EVtqfrilek=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/quic-go/quic-go v0.58.0 h1:ggY2pvZaVdB9EyojxL1p+5mptkuHyX5MOSv4dgWF4Ug=
github.com/quic-go/quic-go v0.58.0/go.mod h1:upnsH4Ju1YkqpLXC305eW3yDZ4NfnNbmQRCMWS58IKU=
github.com/shopspring/decimal v1.3.1/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o=
github.com/shopspring/decimal v1.4.0 h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp81k=
github.com/shopspring/decimal v1.4.0/go.mod h1:gawqmDU56v4yIKSwfBSFip1HdCCXN8/+DMd9qYNcwME=
@@ -81,8 +87,10 @@ github.com/streamingfast/logging v0.0.0-20230608130331-f22c91403091/go.mod h1:Vl
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/test-go/testify v1.1.4 h1:Tf9lntrKUMHiXQ07qBScBTSA0dhYQlu83hswqelv1iE=
github.com/test-go/testify v1.1.4/go.mod h1:rH7cfJo/47vWGdi4GPj16x3/t1xGOj2YxzmNQzk2ghU=
github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
@@ -140,6 +148,8 @@ golang.org/x/net v0.47.0/go.mod h1:/jNxtkgq5yWUGYkaZGqo27cfGZ1c5Nen03aYrrKpVRU=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.18.0 h1:kr88TuHDroi+UVf+0hZnirlk8o8T+4MrK6mr60WkH/I=
golang.org/x/sync v0.18.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=

View File

@@ -0,0 +1,125 @@
package consts
import "github.com/samlior/libsam/pkg/enum"
var SWQoSFeeAddresses = map[string]string{
"96gYZGLnJYVFmbjzopPSU6QiEV5fGqZNyN9nmNhvrZU5": enum.SWQoSAgentJito,
"HFqU5x63VTqvQss8hp11i4wVV8bD44PvwucfZ2bU7gRe": enum.SWQoSAgentJito,
"Cw8CFyM9FkoMi7K7Crf6HNQqf4uEMzpKw6QNghXLvLkY": enum.SWQoSAgentJito,
"ADaUMid9yfUytqMBgopwjb2DTLSokTSzL1zt6iGPaS49": enum.SWQoSAgentJito,
"DfXygSm4jCyNCybVYYK6DwvWqjKee8pbDmJGcLWNDXjh": enum.SWQoSAgentJito,
"ADuUkR4vqLUMWXxW9gh6D6L8pMSawimctcNZ5pGwDcEt": enum.SWQoSAgentJito,
"DttWaMuVvTiduZRnguLF7jNxTgiMBZ1hyAumKUiL2KRL": enum.SWQoSAgentJito,
"3AVi9Tg9Uo68tJfuvoKvqKNWKkC5wPdSSdeBnizKZ6jT": enum.SWQoSAgentJito,
"6fQaVhYZA4w3MBSXjJ81Vf6W1EDYeUPXpgVQ6UQyU1Av": enum.SWQoSAgent0slot,
"4HiwLEP2Bzqj3hM2ENxJuzhcPCdsafwiet3oGkMkuQY4": enum.SWQoSAgent0slot,
"7toBU3inhmrARGngC7z6SjyP85HgGMmCTEwGNRAcYnEK": enum.SWQoSAgent0slot,
"8mR3wB1nh4D6J9RUCugxUpc6ya8w38LPxZ3ZjcBhgzws": enum.SWQoSAgent0slot,
"6SiVU5WEwqfFapRuYCndomztEwDjvS5xgtEof3PLEGm9": enum.SWQoSAgent0slot,
"TpdxgNJBWZRL8UXF5mrEsyWxDWx9HQexA9P1eTWQ42p": enum.SWQoSAgent0slot,
"D8f3WkQu6dCF33cZxuAsrKHrGsqGP2yvAHf8mX6RXnwf": enum.SWQoSAgent0slot,
"GQPFicsy3P3NXxB5piJohoxACqTvWE9fKpLgdsMduoHE": enum.SWQoSAgent0slot,
"Ey2JEr8hDkgN8qKJGrLf2yFjRhW7rab99HVxwi5rcvJE": enum.SWQoSAgent0slot,
"4iUgjMT8q2hNZnLuhpqZ1QtiV8deFPy2ajvvjEpKKgsS": enum.SWQoSAgent0slot,
"3Rz8uD83QsU8wKvZbgWAPvCNDU6Fy8TSZTMcPm3RB6zt": enum.SWQoSAgent0slot,
"DiTmWENJsHQdawVUUKnUXkconcpW4Jv52TnMWhkncF6t": enum.SWQoSAgent0slot,
"HRyRhQ86t3H4aAtgvHVpUJmw64BDrb61gRiKcdKUXs5c": enum.SWQoSAgent0slot,
"7y4whZmw388w1ggjToDLSBLv47drw5SUXcLk6jtmwixd": enum.SWQoSAgent0slot,
"J9BMEWFbCBEjtQ1fG5Lo9kouX1HfrKQxeUxetwXrifBw": enum.SWQoSAgent0slot,
"8U1JPQh3mVQ4F5jwRdFTBzvNRQaYFQppHQYoH38DJGSQ": enum.SWQoSAgent0slot,
"Eb2KpSC8uMt9GmzyAEm5Eb1AAAgTjRaXWFjKyFXHZxF3": enum.SWQoSAgent0slot,
"FCjUJZ1qozm1e8romw216qyfQMaaWKxWsuySnumVCCNe": enum.SWQoSAgent0slot,
"ENxTEjSQ1YabmUpXAdCgevnHQ9MHdLv8tzFiuiYJqa13": enum.SWQoSAgent0slot,
"6rYLG55Q9RpsPGvqdPNJs4z5WTxJVatMB8zV3WJhs5EK": enum.SWQoSAgent0slot,
"Cix2bHfqPcKcM233mzxbLk14kSggUUiz2A87fJtGivXr": enum.SWQoSAgent0slot,
"HWEoBxYs7ssKuudEjzjmpfJVX7Dvi7wescFsVx2L5yoY": enum.SWQoSAgentBlocxRoute,
"HZTmLyC683y74TW3HtGbNX5orxjm2sPuZBEYwwSgAM8v": enum.SWQoSAgentBlocxRoute,
"FogxVNs6Mm2w9rnGL1vkARSwJxvLE8mujTv3LK8RnUhF": enum.SWQoSAgentBlocxRoute,
"3UQUKjhMKaY2S6bjcQD6yHB7utcZt5bfarRCmctpRtUd": enum.SWQoSAgentBlocxRoute,
"TEMPaMeCRFAS9EKF53Jd6KpHxgL47uWLcpFArU1Fanq": enum.SWQoSAgentNozomi,
"noz3jAjPiHuBPqiSPkkugaJDkJscPuRhYnSpbi8UvC4": enum.SWQoSAgentNozomi,
"noz3str9KXfpKknefHji8L1mPgimezaiUyCHYMDv1GE": enum.SWQoSAgentNozomi,
"noz6uoYCDijhu1V7cutCpwxNiSovEwLdRHPwmgCGDNo": enum.SWQoSAgentNozomi,
"noz9EPNcT7WH6Sou3sr3GGjHQYVkN3DNirpbvDkv9YJ": enum.SWQoSAgentNozomi,
"nozc5yT15LazbLTFVZzoNZCwjh3yUtW86LoUyqsBu4L": enum.SWQoSAgentNozomi,
"nozFrhfnNGoyqwVuwPAW4aaGqempx4PU6g6D9CJMv7Z": enum.SWQoSAgentNozomi,
"nozievPk7HyK1Rqy1MPJwVQ7qQg2QoJGyP71oeDwbsu": enum.SWQoSAgentNozomi,
"noznbgwYnBLDHu8wcQVCEw6kDrXkPdKkydGJGNXGvL7": enum.SWQoSAgentNozomi,
"nozNVWs5N8mgzuD3qigrCG2UoKxZttxzZ85pvAQVrbP": enum.SWQoSAgentNozomi,
"nozpEGbwx4BcGp6pvEdAh1JoC2CQGZdU6HbNP1v2p6P": enum.SWQoSAgentNozomi,
"nozrhjhkCr3zXT3BiT4WCodYCUFeQvcdUkM7MqhKqge": enum.SWQoSAgentNozomi,
"nozrwQtWhEdrA6W8dkbt9gnUaMs52PdAv5byipnadq3": enum.SWQoSAgentNozomi,
"nozUacTVWub3cL4mJmGCYjKZTnE9RbdY5AP46iQgbPJ": enum.SWQoSAgentNozomi,
"nozWCyTPppJjRuw2fpzDhhWbW355fzosWSzrrMYB1Qk": enum.SWQoSAgentNozomi,
"nozWNju6dY353eMkMqURqwQEoM3SFgEKC6psLCSfUne": enum.SWQoSAgentNozomi,
"nozxNBgWohjR75vdspfxR5H9ceC7XXH99xpxhVGt3Bb": enum.SWQoSAgentNozomi,
"NextbLoCkVtMGcV47JzewQdvBpLqT9TxQFozQkN98pE": enum.SWQoSAgentNextBlock,
"NexTbLoCkWykbLuB1NkjXgFWkX9oAtcoagQegygXXA2": enum.SWQoSAgentNextBlock,
"NeXTBLoCKs9F1y5PJS9CKrFNNLU1keHW71rfh7KgA1X": enum.SWQoSAgentNextBlock,
"NexTBLockJYZ7QD7p2byrUa6df8ndV2WSd8GkbWqfbb": enum.SWQoSAgentNextBlock,
"neXtBLock1LeC67jYd1QdAa32kbVeubsfPNTJC1V5At": enum.SWQoSAgentNextBlock,
"nEXTBLockYgngeRmRrjDV31mGSekVPqZoMGhQEZtPVG": enum.SWQoSAgentNextBlock,
"NEXTbLoCkB51HpLBLojQfpyVAMorm3zzKg7w9NFdqid": enum.SWQoSAgentNextBlock,
"nextBLoCkPMgmG8ZgJtABeScP35qLa2AMCNKntAP7Xc": enum.SWQoSAgentNextBlock,
"4ACfpUFoaSD9bfPdeu6DBt89gB6ENTeHBXCAi87NhDEE": enum.SWQoSAgentHelius,
"D2L6yPZ2FmmmTKPgzaMKdhu6EWZcTpLy1Vhx8uvZe7NZ": enum.SWQoSAgentHelius,
"9bnz4RShgq1hAnLnZbP8kbgBg1kEmcJBYQq3gQbmnSta": enum.SWQoSAgentHelius,
"5VY91ws6B2hMmBFRsXkoAAdsPHBJwRfBht4DXox3xkwn": enum.SWQoSAgentHelius,
"2nyhqdwKcJZR2vcqCyrYsaPVdAnFoJjiksCXJ7hfEYgD": enum.SWQoSAgentHelius,
"2q5pghRs6arqVjRvT5gfgWfWcHWmw1ZuCzphgd5KfWGJ": enum.SWQoSAgentHelius,
"wyvPkWjVZz1M8fHQnMMCDTQDbkManefNNhweYk5WkcF": enum.SWQoSAgentHelius,
"3KCKozbAaF75qEU33jtzozcJ29yJuaLJTy2jFdzUY8bT": enum.SWQoSAgentHelius,
"4vieeGHPYPG2MmyPRcYjdiDmmhN3ww7hsFNap8pVN3Ey": enum.SWQoSAgentHelius,
"4TQLFNWK8AovT1gFvda5jfw2oJeRMKEmw7aH6MGBJ3or": enum.SWQoSAgentHelius,
"node1PqAa3BWWzUnTHVbw8NJHC874zn9ngAkXjgWEej": enum.SWQoSAgentNode1,
"node1UzzTxAAeBTpfZkQPJXBAqixsbdth11ba1NXLBG": enum.SWQoSAgentNode1,
"node1Qm1bV4fwYnCurP8otJ9s5yrkPq7SPZ5uhj3Tsv": enum.SWQoSAgentNode1,
"node1PUber6SFmSQgvf2ECmXsHP5o3boRSGhvJyPMX1": enum.SWQoSAgentNode1,
"node1AyMbeqiVN6eoQzEAwCA6Pk826hrdqdAHR7cdJ3": enum.SWQoSAgentNode1,
"node1YtWCoTwwVYTFLfS19zquRQzYX332hs1HEuRBjC": enum.SWQoSAgentNode1,
"node1EoLojAvoUmyDytcvgdXs6GPtY3zpQXPCRVncEA": enum.SWQoSAgentNode1,
"node1CVxtFas2Pw5Vcf86Pq89Hqx4jveo1ntY7ARFMK": enum.SWQoSAgentNode1,
"node1E3hguapYA18HCpEEkRHQmLNiyv9pdfE9s2zo5X": enum.SWQoSAgentNode1,
"node1zrVjcY2XB3Au8qYj5MxjbNfGu3baHaqZMkPM7Z": enum.SWQoSAgentNode1,
"node1FdMPnJBN7QTuhzNw3VS823nxFuDTizrrbcEqzp": enum.SWQoSAgentNode1,
"node1VwH169UqyJHr5MYCH3EBuwrdvn5KHXAkhEEfav": enum.SWQoSAgentNode1,
"node1L7Xat2tSkRNNi6TSuUScMYfj64ovhr2aceJm9g": enum.SWQoSAgentNode1,
"FLasHstqx11M8W56zrSEqkCyhMCCpr6ze6Mjdvqope5s": enum.SWQoSAgentFlashBlock,
"FLasHXTqrbNvpWFB6grN47HGZfK6pze9HLNTgbukfPSk": enum.SWQoSAgentFlashBlock,
"FLashhsorBmM9dLpuq6qATawcpqk1Y2aqaZfkd48iT3W": enum.SWQoSAgentFlashBlock,
"FLASHRzANfcAKDuQ3RXv9hbkBy4WVEKDzoAgxJ56DiE4": enum.SWQoSAgentFlashBlock,
"FLAsHZTRcf3Dy1APaz6j74ebdMC6Xx4g6i9YxjyrDybR": enum.SWQoSAgentFlashBlock,
"FLAshyAyBcKb39KPxSzXcepiS8iDYUhDGwJcJDPX4g2B": enum.SWQoSAgentFlashBlock,
"FLaSHJNm5dWYzEgnHJWWJP5ccu128Mu61NJLxUf7mUXU": enum.SWQoSAgentFlashBlock,
"FLaSHR4Vv7sttd6TyDF4yR1bJyAxRwWKbohDytEMu3wL": enum.SWQoSAgentFlashBlock,
"FLaShB3iXXTWE1vu9wQsChUKq3HFtpMAhb8kAh1pf1wi": enum.SWQoSAgentFlashBlock,
"FLAShWTjcweNT4NSotpjpxAkwxUr2we3eXQGhpTVzRwy": enum.SWQoSAgentFlashBlock,
"FjmZZrFvhnqqb9ThCuMVnENaM3JGVuGWNyCAxRJcFpg9": enum.SWQoSAgentBlockRazor,
"6No2i3aawzHsjtThw81iq1EXPJN6rh8eSJCLaYZfKDTG": enum.SWQoSAgentBlockRazor,
"A9cWowVAiHe9pJfKAj3TJiN9VpbzMUq6E4kEvf5mUT22": enum.SWQoSAgentBlockRazor,
"Gywj98ophM7GmkDdaWs4isqZnDdFCW7B46TXmKfvyqSm": enum.SWQoSAgentBlockRazor,
"68Pwb4jS7eZATjDfhmTXgRJjCiZmw1L7Huy4HNpnxJ3o": enum.SWQoSAgentBlockRazor,
"4ABhJh5rZPjv63RBJBuyWzBK3g9gWMUQdTZP2kiW31V9": enum.SWQoSAgentBlockRazor,
"B2M4NG5eyZp5SBQrSdtemzk5TqVuaWGQnowGaCBt8GyM": enum.SWQoSAgentBlockRazor,
"5jA59cXMKQqZAVdtopv8q3yyw9SYfiE3vUCbt7p8MfVf": enum.SWQoSAgentBlockRazor,
"5YktoWygr1Bp9wiS1xtMtUki1PeYuuzuCF98tqwYxf61": enum.SWQoSAgentBlockRazor,
"295Avbam4qGShBYK7E9H5Ldew4B3WyJGmgmXfiWdeeyV": enum.SWQoSAgentBlockRazor,
"EDi4rSy2LZgKJX74mbLTFk4mxoTgT6F7HxxzG2HBAFyK": enum.SWQoSAgentBlockRazor,
"BnGKHAC386n4Qmv9xtpBVbRaUTKixjBe3oagkPFKtoy6": enum.SWQoSAgentBlockRazor,
"Dd7K2Fp7AtoN8xCghKDRmyqr5U169t48Tw5fEd3wT9mq": enum.SWQoSAgentBlockRazor,
"AP6qExwrbRgBAVaehg4b5xHENX815sMabtBzUzVB4v8S": enum.SWQoSAgentBlockRazor,
"astrazznxsGUhWShqgNtAdfrzP2G83DzcWVJDxwV9bF": enum.SWQoSAgentAstralane,
"astra4uejePWneqNaJKuFFA8oonqCE1sqF6b45kDMZm": enum.SWQoSAgentAstralane,
"astra9xWY93QyfG6yM8zwsKsRodscjQ2uU2HKNL5prk": enum.SWQoSAgentAstralane,
"astraRVUuTHjpwEVvNBeQEgwYx9w9CFyfxjYoobCZhL": enum.SWQoSAgentAstralane,
"astraEJ2fEj8Xmy6KLG7B3VfbKfsHXhHrNdCQx7iGJK": enum.SWQoSAgentAstralane,
"astraubkDw81n4LuutzSQ8uzHCv4BhPVhfvTcYv8SKC": enum.SWQoSAgentAstralane,
"astraZW5GLFefxNPAatceHhYjfA1ciq9gvfEg2S47xk": enum.SWQoSAgentAstralane,
"astrawVNP4xDBKT7rAdxrLYiTSTdqtUr63fSMduivXK": enum.SWQoSAgentAstralane,
"ste11JV3MLMM7x7EJUM2sXcJC1H7F4jBLnP9a9PG8PH": enum.SWQoSAgentStellium,
"ste11MWPjXCRfQryCshzi86SGhuXjF4Lv6xMXD2AoSt": enum.SWQoSAgentStellium,
"ste11p5x8tJ53H1NbNQsRBg1YNRd4GcVpxtDw8PBpmb": enum.SWQoSAgentStellium,
"ste11p7e2KLYou5bwtt35H7BM6uMdo4pvioGjJXKFcN": enum.SWQoSAgentStellium,
"ste11TMV68LMi1BguM4RQujtbNCZvf1sjsASpqgAvSX": enum.SWQoSAgentStellium,
"soyas4s6L8KWZ8rsSk1mF3d1mQScoTGGAgjk98bF8nP": enum.SWQoSAgentSoyas,
}

View File

@@ -12,4 +12,5 @@ const (
SWQoSAgentBlockRazor = "blockrazor"
SWQoSAgentAstralane = "astralane"
SWQoSAgentStellium = "stellium"
SWQoSAgentSoyas = "soyas"
)

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,140 @@
package shreder
import (
"context"
"fmt"
"sync"
"github.com/gagliardetto/solana-go"
"github.com/gagliardetto/solana-go/rpc"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/panjf2000/ants/v2"
)
type TableInfo struct {
overErrCount int
addresses []solana.PublicKey
}
type AddressTables struct {
showTableLoaded bool
rpcClient *rpc.Client
loadMux sync.Mutex
tables *lru.Cache[solana.PublicKey, *TableInfo]
loading map[solana.PublicKey]struct{}
pool *ants.Pool
}
func NewAddressTables(rpcClient *rpc.Client, showTableLoaded bool) *AddressTables {
pool, _ := ants.NewPool(5, ants.WithPreAlloc(true), ants.WithNonblocking(true))
cache, _ := lru.New[solana.PublicKey, *TableInfo](10000)
return &AddressTables{
rpcClient: rpcClient,
tables: cache,
loading: make(map[solana.PublicKey]struct{}),
pool: pool,
showTableLoaded: showTableLoaded,
}
}
func (at *AddressTables) loadAddressTable(tablePubkey solana.PublicKey) ([]solana.PublicKey, error) {
// decode acc
acc, err := at.rpcClient.GetAccountInfoWithOpts(context.Background(), tablePubkey, &rpc.GetAccountInfoOpts{
Encoding: solana.EncodingBase64,
})
if err != nil {
return nil, err
}
data := acc.GetBinary()
if len(data) <= 56 {
return nil, fmt.Errorf("account data too short")
}
offset := 56
var addresses solana.PublicKeySlice = make([]solana.PublicKey, 0, (len(data)-offset)/32)
for offset+32 <= len(data) {
addresses = append(addresses, solana.PublicKeyFromBytes(data[offset:offset+32]))
offset += 32
}
return addresses, nil
}
func (at *AddressTables) load(tablePubkey solana.PublicKey) {
_ = at.pool.Submit(func() {
at.loadMux.Lock()
_, loading := at.loading[tablePubkey]
if loading {
at.loadMux.Unlock()
return
}
at.loading[tablePubkey] = struct{}{}
at.loadMux.Unlock()
table, err := at.loadAddressTable(tablePubkey)
if err != nil {
logger.Error("loadAddressTable failed", "err", err, "table", tablePubkey)
at.loadMux.Lock()
delete(at.loading, tablePubkey)
at.loadMux.Unlock()
return
}
at.loadMux.Lock()
delete(at.loading, tablePubkey)
at.loadMux.Unlock()
at.tables.Add(tablePubkey, &TableInfo{
addresses: table,
})
if at.showTableLoaded {
total := at.tables.Len()
logger.Info("loadAddressTable", "table", tablePubkey.String(), "table count:", total)
}
})
}
func (at *AddressTables) FillToTx(tx FillableTransaction, tablePubkey solana.PublicKey, idx []uint8) bool {
addresses, ok := at.tables.Get(tablePubkey)
if !ok {
at.load(tablePubkey)
return false
}
for _, i := range idx {
if int(i) >= len(addresses.addresses) {
logger.Error("over loadAddressTable failed", "idx", i, "table", tablePubkey)
addresses.overErrCount++
if addresses.overErrCount > 10 {
at.load(tablePubkey)
}
return false
}
tx.FillLookupTable(addresses.addresses[i])
}
return true
}
func (at *AddressTables) GetAddressTable(tablePubkey solana.PublicKey, idx []uint8) []solana.PublicKey {
addresses, ok := at.tables.Get(tablePubkey)
if !ok {
at.load(tablePubkey)
return nil
}
var result solana.PublicKeySlice = make([]solana.PublicKey, 0, len(idx))
for _, i := range idx {
if int(i) >= len(addresses.addresses) {
logger.Error("over loadAddressTable failed", "idx", i, "table", tablePubkey)
addresses.overErrCount++
if addresses.overErrCount > 10 {
at.load(tablePubkey)
}
break
}
result = append(result, addresses.addresses[i])
}
return result
}

View File

@@ -2,35 +2,95 @@ package shreder
import (
"context"
"log/slog"
"fmt"
"runtime"
"time"
"github.com/gagliardetto/solana-go/rpc"
"github.com/panjf2000/ants/v2"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
type Client struct {
log *slog.Logger
enableBlockStats bool
enableParseStats bool
conn *grpc.ClientConn
client ShrederServiceClient
tableLoader *AddressTables
subscription map[string]*SubscribeRequestFilterTransactions
pool *ants.Pool
lastSlot uint64
lastSlotTime time.Time
}
type ClientOpts struct {
blockStats bool
showTableLoaded bool
logParseStats bool
}
type ClientOption func(*ClientOpts)
func ShowTableLoaded(enable bool) ClientOption {
return func(opts *ClientOpts) {
opts.showTableLoaded = enable
}
}
func BlocksStats(enable bool) ClientOption {
return func(opts *ClientOpts) {
opts.blockStats = enable
}
}
func LogParsedStats(enable bool) ClientOption {
return func(opts *ClientOpts) {
opts.logParseStats = enable
}
}
func NewShrederClient(
url string,
rpcClient *rpc.Client,
subscription map[string]*SubscribeRequestFilterTransactions,
options ...ClientOption,
) (*Client, func(), error) {
if rpcClient == nil {
return nil, func() {}, fmt.Errorf("rpc client is nil")
}
conn, err := grpc.NewClient(url, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil, func() {}, err
}
logger := slog.Default()
poolSize := runtime.NumCPU()*2 + 2
logger.Info("creating shreder client", "url", url, "pool_size", poolSize)
pool, err := ants.NewPool(poolSize, ants.WithNonblocking(true))
if err != nil {
return nil, func() {}, err
}
o := &ClientOpts{
blockStats: false,
showTableLoaded: true,
logParseStats: false,
}
for _, option := range options {
option(o)
}
s := &Client{
log: logger,
conn: conn,
client: NewShrederServiceClient(conn),
subscription: subscription,
tableLoader: NewAddressTables(rpcClient, o.showTableLoaded),
pool: pool,
enableBlockStats: o.blockStats,
enableParseStats: o.logParseStats,
}
return s, func() {
@@ -39,22 +99,63 @@ func NewShrederClient(
}
func (c *Client) Wait() {
c.log.Debug("waiting for shreder client to stop")
logger.Debug("waiting for shreder client to stop")
if c.pool != nil {
c.pool.Release()
}
err := c.conn.Close()
if err != nil {
c.log.Error("failed to close connection: ", "err", err)
logger.Error("failed to close connection: ", "err", err)
}
c.log.Debug("shreder client stopped")
logger.Debug("shreder client stopped")
}
func (c *Client) ReadSync(ctx context.Context, txCh chan<- TxSignalBatch) error {
func (c *Client) ReadEntriesSync(ctx context.Context, txCh chan<- TxSignal) error {
stream, err := c.client.SubscribeEntries(ctx, &SubscribeEntriesRequest{})
if err != nil {
return err
}
logger.Debug("reading entries from shreder client")
for {
response, err := stream.Recv()
if err != nil {
return err
}
slot := response.Slot
if c.enableBlockStats {
now := time.Now()
if c.lastSlotTime.IsZero() || slot > c.lastSlot {
if !c.lastSlotTime.IsZero() {
logger.Info("block processed", "running", c.pool.Running(), "slot", slot, "prev_slot", c.lastSlot, "delta_ms", now.Sub(c.lastSlotTime).Milliseconds())
}
c.lastSlot = slot
c.lastSlotTime = now
}
}
entries := response.Entries
err = c.pool.Submit(func() {
ParseEntries(slot, entries, c.tableLoader, txCh, c.enableParseStats)
})
if err != nil {
return err
}
}
}
func (c *Client) ReadSync(ctx context.Context, txCh chan<- TxSignal) error {
stream, err := c.client.SubscribeTransactions(ctx)
if err != nil {
return err
}
logger.Debug("subscribing to transactions")
err = stream.Send(&SubscribeTransactionsRequest{
Transactions: c.subscription,
})
@@ -68,20 +169,26 @@ func (c *Client) ReadSync(ctx context.Context, txCh chan<- TxSignalBatch) error
return err
}
txBatch := ParseTransaction(response.Transaction)
if len(txBatch) == 0 {
continue
if c.enableBlockStats {
slot := response.Transaction.Slot
now := time.Now()
if c.lastSlotTime.IsZero() || slot > c.lastSlot {
if !c.lastSlotTime.IsZero() {
logger.Info("block processed", "running", c.pool.Running(), "slot", slot, "prev_slot", c.lastSlot, "delta_ms", now.Sub(c.lastSlotTime).Milliseconds())
}
c.lastSlot = slot
c.lastSlotTime = now
}
}
// set fixed source for tx signals
for _, tx := range txBatch {
tx.Source = "shreder"
txData := response.Transaction
err = c.pool.Submit(func() {
ParseTransaction(txData, c.tableLoader, txCh, c.enableParseStats)
})
if err != nil {
logger.Error("failed to submit transaction: ", "err", err)
}
select {
case <-ctx.Done():
return ctx.Err()
case txCh <- txBatch:
}
}
}

321
pkg/shreder/dflow.go Normal file
View File

@@ -0,0 +1,321 @@
package shreder
import (
"bytes"
"encoding/binary"
"fmt"
bin "github.com/gagliardetto/binary"
"github.com/gagliardetto/solana-go"
"github.com/shopspring/decimal"
)
var (
dflowProgramID = solana.MustPublicKeyFromBase58("DF1ow4tspfHX9JwWJsAb9epbkA8hmpSEAtxXy1V27QBH")
dflowProgramString = dflowProgramID.String()
dflowSwapDisc = []byte{248, 198, 158, 145, 225, 117, 135, 200}
dflowSwap2Disc = []byte{65, 75, 63, 76, 235, 91, 91, 136}
dflowSwapWithDestinationDisc = []byte{168, 172, 24, 77, 197, 156, 135, 101}
dflowSwapWithDestinationNative = []byte{205, 77, 127, 108, 241, 32, 196, 195}
dflowSwap2WithDestinationDisc = []byte{95, 123, 213, 246, 122, 1, 86, 231}
dflowSwap2WithDestinationNative = []byte{222, 100, 184, 146, 186, 196, 105, 165}
wrappedSOL = solana.MustPublicKeyFromBase58("So11111111111111111111111111111111111111112")
)
// Action enum tags (0-based, per dflow_idl Action variants)
const (
ActWhirlpoolsSwap uint8 = iota
ActClearpoolsSwap
ActRaydiumAmmSwap
ActLifinityV2Swap
ActMeteoraDlmmSwap
ActRaydiumClmmSwap
ActRaydiumClmmSwapV2
ActPhoenixSwap
ActPumpFunBuy
ActPumpFunSell
ActGammaSwap
ActObricV2Swap
ActPumpFunAmmBuy
ActPumpFunAmmSell
ActSolFiSwap
ActRubiconSwap
ActMeteoraDammV1Swap
ActRaydiumCpSwap
ActStabbleStableSwap
ActTesseraVSwap
ActMeteoraDammV2Swap
ActRaydiumLaunchlabSwap
ActMeteoraDbcSwap
ActHumidiFiSwap
ActWhirlpoolsSwapV2
ActMeteoraDlmmSwapV2
ActZeroFiSwap
ActAlphaQSwap
ActTokenSwap
ActSolFiV2Swap
ActMozartSwap
ActDFlowDynamicRouteV1
ActHeavenSwap
ActNexusSwap
ActSarosDlmmSwap
ActTransferFee
ActTransferFeeWithMint
ActRecordId
ActRecordId2
ActManifestSwap
ActBisonFiSwap
ActSanctumInfinitySwap
ActSanctumInfinityLiquidity
ActOpenPredictionsOrder
ActScorchSwap
ActIncludeAccount
)
// DynamicRouteV1CandidateAction tags
const (
drv1SolFi uint8 = iota
drv1Rubicon
drv1TesseraV
drv1HumidiFi
drv1SolFiV2
drv1Mozart
drv1ObricV2
drv1Nexus
)
// PumpFunAmmSellOptions { amount: u64, orchestrator_flags: OrchestratorFlags{flags u8} }
type pumpFunAmm struct {
Amount uint64
Flags uint8
}
type dflowAction struct {
Tag uint8
Pump *pumpFunAmm
}
type dflowSwapParams struct {
Actions []dflowAction
}
// bytes to skip for Action variants before/after PumpFunAmmSell; only PumpFunAmmSell is decoded.
func skipDflowAction(dec *bin.Decoder, tag uint8) (*pumpFunAmm, error) {
switch tag {
case ActWhirlpoolsSwap, ActClearpoolsSwap, ActWhirlpoolsSwapV2:
// amount u64 + bool + orchestrator_flags u8
return nil, dec.SkipBytes(8 + 1 + 1)
case ActRaydiumAmmSwap, ActLifinityV2Swap, ActPumpFunBuy, ActPumpFunSell, ActObricV2Swap,
ActSolFiSwap, ActRubiconSwap, ActMeteoraDammV1Swap, ActRaydiumCpSwap,
ActStabbleStableSwap, ActTesseraVSwap, ActMeteoraDammV2Swap, ActRaydiumLaunchlabSwap,
ActZeroFiSwap, ActAlphaQSwap, ActTokenSwap, ActSolFiV2Swap, ActMozartSwap, ActHeavenSwap,
ActNexusSwap, ActSarosDlmmSwap, ActManifestSwap, ActBisonFiSwap:
// amount u64 + orchestrator_flags u8
return nil, dec.SkipBytes(8 + 1)
case ActMeteoraDlmmSwap, ActRaydiumClmmSwap, ActRaydiumClmmSwapV2, ActMeteoraDlmmSwapV2:
// amount u64 + u8 + orchestrator_flags u8
return nil, dec.SkipBytes(8 + 1 + 1)
case ActPhoenixSwap:
// amount u64 + side u8 + orchestrator_flags u8
return nil, dec.SkipBytes(8 + 1 + 1)
case ActGammaSwap:
// amount u64 + endorsed bool + orchestrator_flags u8
return nil, dec.SkipBytes(8 + 1 + 1)
case ActPumpFunAmmSell, ActPumpFunAmmBuy:
amt, err := dec.ReadUint64(binary.LittleEndian)
if err != nil {
return nil, err
}
flg, err := dec.ReadUint8()
if err != nil {
return nil, err
}
return &pumpFunAmm{Amount: amt, Flags: flg}, nil
case ActMeteoraDbcSwap:
// amount u64 + is_rate_limiter_applied bool + orchestrator_flags u8
return nil, dec.SkipBytes(8 + 1 + 1)
case ActHumidiFiSwap:
// amount u64 + swap_id u64 + orchestrator_flags u8
return nil, dec.SkipBytes(8 + 8 + 1)
case ActDFlowDynamicRouteV1:
// candidate_actions Vec<DynamicRouteV1CandidateAction> + amount u64 + orchestrator_flags u8
ln, err := dec.ReadUint32(binary.LittleEndian)
if err != nil {
return nil, err
}
for j := uint32(0); j < ln; j++ {
t, err := dec.ReadUint8()
if err != nil {
return nil, err
}
if t == drv1HumidiFi {
if err := dec.SkipBytes(8); err != nil {
return nil, err
}
}
// other variants carry no payload
}
if err := dec.SkipBytes(8); err != nil { // amount
return nil, err
}
return nil, dec.SkipBytes(1) // orchestrator_flags
case ActTransferFee, ActTransferFeeWithMint:
return nil, dec.SkipBytes(8)
case ActRecordId:
return nil, dec.SkipBytes(76)
case ActRecordId2:
return nil, dec.SkipBytes(4)
case ActSanctumInfinitySwap:
// amount u64 + src_lst_value_calc_accs u8 + dst_lst_value_calc_accs u8 + src_lst_index u32 + dst_lst_index u32 + orchestrator_flags u8
return nil, dec.SkipBytes(8 + 1 + 1 + 4 + 4 + 1)
case ActSanctumInfinityLiquidity:
// amount u64 + lst_value_calc_accs u8 + lst_index u32 + orchestrator_flags u8
return nil, dec.SkipBytes(8 + 1 + 4 + 1)
case ActOpenPredictionsOrder:
// nonce u64 + order_outcome u8 + quoted_out_amount u64 + slippage_bps u16 + platform_fee_recipient_vault pubkey(32) + platform_fee_scale u16
return nil, dec.SkipBytes(8 + 1 + 8 + 2 + 32 + 2)
case ActScorchSwap:
// amount u64 + id u128 + orchestrator_flags u8
return nil, dec.SkipBytes(8 + 16 + 1)
case ActIncludeAccount:
return nil, nil
default:
return nil, fmt.Errorf("unsupported action tag %d", tag)
}
}
// SwapParams: actions Vec<Action>, quoted_out_amount u64, slippage_bps u16, platform_fee_bps u16
func decodeSwapParams(data []byte) (*dflowSwapParams, error) {
dec := bin.NewBorshDecoder(data)
out := &dflowSwapParams{}
ln, err := dec.ReadUint32(binary.LittleEndian)
if err != nil {
return nil, err
}
out.Actions = make([]dflowAction, 0, ln)
for i := uint32(0); i < ln; i++ {
tag, err := dec.ReadUint8()
if err != nil {
return nil, fmt.Errorf("actions[%d] tag: %w", i, err)
}
pump, err := skipDflowAction(dec, tag)
if err != nil {
return nil, fmt.Errorf("actions[%d]: %w", i, err)
}
out.Actions = append(out.Actions, dflowAction{Tag: tag, Pump: pump})
}
return out, nil
}
// Swap2Params: actions Vec<Action>, quoted_out_amount u64, slippage_bps u16, platform_fee_bps u16, positive_slippage_fee_limit_pct u8
func decodeSwap2Params(data []byte) (*dflowSwapParams, error) {
dec := bin.NewBorshDecoder(data)
out := &dflowSwapParams{}
ln, err := dec.ReadUint32(binary.LittleEndian)
if err != nil {
return nil, err
}
out.Actions = make([]dflowAction, 0, ln)
for i := uint32(0); i < ln; i++ {
tag, err := dec.ReadUint8()
if err != nil {
return nil, fmt.Errorf("actions[%d] tag: %w", i, err)
}
pump, err := skipDflowAction(dec, tag)
if err != nil {
return nil, fmt.Errorf("actions[%d]: %w", i, err)
}
out.Actions = append(out.Actions, dflowAction{Tag: tag, Pump: pump})
}
return out, nil
}
func parseDFlowInstruction(tx TransactionGetter, accounts []uint8, data []byte) (*TxSignal, error) {
if len(data) < 8 {
return nil, nil
}
var err error
disc := data[:8]
payload := data[8:]
var params *dflowSwapParams
switch {
case bytes.Equal(disc, dflowSwapDisc), bytes.Equal(disc, dflowSwapWithDestinationDisc), bytes.Equal(disc, dflowSwapWithDestinationNative):
params, err = decodeSwapParams(payload)
case bytes.Equal(disc, dflowSwap2Disc), bytes.Equal(disc, dflowSwap2WithDestinationDisc), bytes.Equal(disc, dflowSwap2WithDestinationNative):
params, err = decodeSwap2Params(payload)
default:
return nil, nil
}
if err != nil {
return nil, err
}
if params == nil {
return nil, nil
}
var pump *pumpFunAmm
for _, act := range params.Actions {
if act.Tag == ActPumpFunAmmSell && act.Pump != nil {
pump = act.Pump
break
}
}
if pump == nil {
return nil, nil // only care about PumpFunAmmSell
}
// Require WSOL pair when destination mint is provided.
var (
srcIdx uint8
)
if len(accounts) <= 6 {
return nil, nil
}
accounts = accounts[5:]
for i, acctIdx := range accounts {
key, err := tx.GetAccount(acctIdx) //getStaticKey(tx.Message.StaticAccountKeys, int(acctIdx))
if err != nil {
return nil, err
}
if key.Equals(pumpAmmProgramID) {
srcIdx = uint8(i + 4)
break
}
}
if srcIdx == 0 || srcIdx+1 >= uint8(len(accounts)) {
return nil, nil
}
baseMint, err := tx.GetAccount(accounts[srcIdx]) // getStaticKey(tx.Message.StaticAccountKeys, int(accounts[srcIdx]))
if err != nil {
return nil, err
}
quoteMint, err := tx.GetAccount(accounts[srcIdx+1]) // getStaticKey(tx.Message.StaticAccountKeys, int(accounts[srcIdx+1]))
if err != nil {
return nil, err
}
if !quoteMint.Equals(solana.WrappedSol) {
return nil, nil
}
maker, _ := tx.GetAccount(0)
// Build TxSignal
sig := &TxSignal{
TxHash: tx.Signatures(),
Maker: maker.String(),
Program: "PumpAMM",
Event: "sell",
Token0Address: baseMint.String(),
Token1Address: wsolMint,
Token0Amount: formatTokenAmount(pump.Amount),
Token1Amount: decimal.Zero,
Token0AmountUint64: pump.Amount,
Block: tx.Block(),
Token1AmountUint64: 0,
}
return sig, nil
}

File diff suppressed because one or more lines are too long

334
pkg/shreder/entry.go Normal file
View File

@@ -0,0 +1,334 @@
package shreder
import (
"encoding/binary"
"fmt"
"io"
)
type constArray struct {
data []byte
size int
offset int
}
func newConstArray(data []byte) constArray {
return constArray{
data: data,
size: len(data),
offset: 0,
}
}
func (c *constArray) Len() int {
return c.size
}
func (c *constArray) Offset() int {
return c.offset
}
func (c *constArray) Read(cap int) ([]byte, error) {
if c.offset+cap > c.size {
return nil, io.EOF
}
c.offset += cap
return c.data[c.offset-cap : c.offset], nil
}
func (c *constArray) ReadBytes() (byte, error) {
if c.offset >= c.size {
return 0, io.EOF
}
c.offset++
return c.data[c.offset-1], nil
}
func (c *constArray) PeekBytes() (byte, error) {
if c.offset >= c.size {
return 0, io.EOF
}
return c.data[c.offset], nil
}
func (c *constArray) ReadU64() (uint64, error) {
if c.offset+8 > c.size {
return 0, io.EOF
}
c.offset += 8
return binary.LittleEndian.Uint64(c.data[c.offset-8 : c.offset]), nil
}
func (c *constArray) ReadCompactU16() (uint16, error) {
ln := 0
size := 0
for i := 0; i < 3; i++ {
if len(c.data[c.offset:]) == 0 {
return 0, fmt.Errorf("unable to decode compact u16 at %d: zero byte", i)
}
elem := int(c.data[c.offset+i])
if elem == 0 && i != 0 {
return 0, fmt.Errorf("alias")
}
if i == 2 && (elem&0x80) != 0 {
return 0, fmt.Errorf("byte three continues")
}
ln |= (elem & 0x7f) << (size * 7)
size++
if (elem & 0x80) == 0 {
break
}
}
c.offset += size
return uint16(ln), nil
}
func (c *constArray) Skip(size int) error {
if c.offset+size > c.size {
return io.EOF
}
c.offset += size
return nil
}
// entriesToVersionedTransaction converts raw entry bytes to versioned transactions.
func entriesToVersionedTransaction(slot uint64, b constArray) ([]*versionedTransaction, error) {
b.offset = 0
entriesNum, err := b.ReadU64()
if err != nil {
return nil, fmt.Errorf("unable to read entries num: %w", err)
}
if entriesNum == 0 {
return nil, nil
}
//preCap := b.Len() / 256
//if preCap < int(entriesNum) {
// preCap = int(entriesNum)
//}
vs := make([]*versionedTransaction, 0, entriesNum)
// logger.Debug("parsing entries", "count", entriesNum, "data len", b.Len(), "data", base64.StdEncoding.EncodeToString(b.data))
for i := uint64(0); i < entriesNum; i++ {
err = b.Skip(40)
if err != nil {
return vs, fmt.Errorf("failed to skip num_hashes + hash of entry %d: %w", i, err)
}
numTx, err := b.ReadU64()
if err != nil {
return vs, fmt.Errorf("failed to read num_transactions of entry %d: %w", i, err)
}
for j := 0; j < int(numTx); j++ {
numSignatures, err := b.ReadCompactU16()
if err != nil {
return vs, fmt.Errorf("failed to read numSignatures in entry %d, txn %d: %w", i, j, err)
}
// todo : enforce a maximum number of signatures to prevent OOM
if numSignatures > 16 {
return vs, fmt.Errorf("numSignatures %d exceeds maximum in entry %d, txn %d", numSignatures, i, j)
}
if numSignatures == 0 {
return vs, fmt.Errorf("numSignatures is zero in entry %d, txn %d", i, j)
}
versioned := requireVersionedPool() // get a versioned transaction from the pool
vs = append(vs, versioned)
versioned.block = slot
versioned.bindArray = b.data
versioned.signatures = int(numSignatures)
versioned.signaturesOffset = b.Offset()
err = b.Skip(64 * int(numSignatures))
if err != nil {
return vs, fmt.Errorf("unable to read signature in entry %d, txn %d: %w", i, j, err)
}
msgVersion, err := b.PeekBytes()
if err != nil {
return vs, fmt.Errorf("unable to read message version in entry %d, txn %d: %w", i, j, err)
}
msgVersion = msgVersion & 0x80 >> 7 // mask to get only the version bits
legacy := msgVersion == 0
headerSkip := 3
if !legacy {
headerSkip = 4
}
// skip msg version, mx.Header+3
err = b.Skip(headerSkip)
if err != nil {
return vs, fmt.Errorf("unable to skip message header in entry %d, txn %d: %w", i, j, err)
}
// read mx.AccountKeys
// _, err = r.Read(u16[:])
numAccountKeys, err := b.ReadCompactU16()
// logger.Info("tx", "hash", versioned.Signatures[0].String(), "version", msgVersion)
if err != nil {
return vs, fmt.Errorf("unable to decode numAccountKeys in entry %d, txn %d: %w", i, j, err)
}
// todo : enforce a maximum number of account keys to prevent OOM
if numAccountKeys > 255 {
return vs, fmt.Errorf("numAccountKeys %d exceeds maximum in entry %d, txn %d", numAccountKeys, i, j)
}
versioned.staticAccountKeys = uint8(numAccountKeys)
versioned.staticAccountKeysOffset = b.Offset()
err = b.Skip(32 * int(numAccountKeys))
if err != nil {
return vs, fmt.Errorf("unable to read accountKey in entry %d, txn %d: %w", i, j, err)
}
//skip solana hash
err = b.Skip(32)
if err != nil {
return vs, fmt.Errorf("unable to skip recentBlockhash in entry %d, txn %d: %w", i, j, err)
}
// read mx.Instructions
numInstructions, err := b.ReadCompactU16()
if err != nil {
return vs, fmt.Errorf("unable to decode numInstructions in entry %d, txn %d: %w", i, j, err)
}
// todo : enforce a maximum number of instructions to prevent OOM
if numInstructions >= 256 {
return vs, fmt.Errorf("numInstructions %d exceeds maximum in entry %d, txn %d, txHash: %s", numInstructions, i, j, versioned.Signatures())
}
versioned.instructions = int(numInstructions)
if cap(versioned.Instrs) < int(numInstructions) {
versioned.Instrs = make([]compiledInstruction, numInstructions)
} else {
versioned.Instrs = versioned.Instrs[:numInstructions]
}
for k := 0; k < int(numInstructions); k++ {
versioned.Instrs[k].ProgramIDIndex, err = b.ReadBytes()
if err != nil {
return vs, fmt.Errorf("unable to read mx.Instructions[%d].ProgramIDIndex in entry %d, txn %d: %w", k, i, j, err)
}
numAccounts, err := b.ReadCompactU16()
if err != nil {
return vs, fmt.Errorf("unable to decode numAccounts for ix[%d] in entry %d, txn %d: %w", k, i, j, err)
}
// todo : enforce a maximum number of accounts to prevent OOM
if numAccounts >= 256 {
return vs, fmt.Errorf("numAccounts %d exceeds maximum for ix[%d] in entry %d, txn %d", numAccounts, k, i, j)
}
versioned.Instrs[k].AccountsLen = int(numAccounts)
if numAccounts != 0 {
versioned.Instrs[k].AccountsOffset = b.Offset()
err = b.Skip(int(numAccounts))
if err != nil {
return vs, fmt.Errorf("unable to read mx.Instructions[%d].Accounts in entry %d, txn %d: %w", k, i, j, err)
}
}
// _, err = r.Read(u16[:])
dataLen, err := b.ReadCompactU16()
if err != nil {
return vs, fmt.Errorf("unable to decode mx.Instructions[%d].Data length in entry %d, txn %d: %w", k, i, j, err)
}
//todo : enforce a maximum data length to prevent OOM
if dataLen > 2048 {
return vs, fmt.Errorf("mx.Instructions[%d].Data length %d exceeds maximum in entry %d, txn %d, txHash: %s", k, dataLen, i, j, versioned.Signatures())
}
versioned.Instrs[k].DataLen = int(dataLen)
if dataLen > 0 {
versioned.Instrs[k].DataOffset = b.Offset()
err = b.Skip(int(dataLen))
if err != nil {
return vs, fmt.Errorf("unable to read mx.Instructions[%d].Data in entry %d, txn %d: %w", k, i, j, err)
}
}
}
if !legacy {
// read mx.AddressTableLookups
numLookups, err := b.ReadBytes()
if err != nil {
return vs, fmt.Errorf("unable to read numAddressTableLookups in entry %d, txn %d: %w", i, j, err)
}
if cap(versioned.ATL) < int(numLookups) {
versioned.ATL = make([]addressTableLookup, numLookups)
} else {
versioned.ATL = versioned.ATL[:numLookups]
}
versioned.addressTableLookups = int(numLookups)
for k := uint8(0); k < numLookups; k++ {
versioned.ATL[k].AccountKeyOffset = b.Offset()
err = b.Skip(32)
if err != nil {
return vs, fmt.Errorf("unable to read address table account key for lookup[%d] in entry %d, txn %d: %w", k, i, j, err)
}
numWritable, err := b.ReadCompactU16()
if err != nil {
return vs, fmt.Errorf("unable to decode numWritableIndexes for lookup[%d] in entry %d, txn %d: %w", k, i, j, err)
}
// todo : enforce a maximum number of writable indexes to prevent OOM
if numWritable >= 256 {
return vs, fmt.Errorf("numWritableIndexes %d exceeds maximum for lookup[%d] in entry %d, txn %d", numWritable, k, i, j)
}
versioned.ATL[k].WriteLen = int(numWritable)
if numWritable > 0 {
versioned.ATL[k].WriteOffset = b.Offset()
err = b.Skip(int(numWritable))
if err != nil {
return vs, fmt.Errorf("unable to read writableIndexes for lookup[%d] in entry %d, txn %d: %w", k, i, j, err)
}
}
// _, err = r.Read(u16[:])
numReadonly, err := b.ReadCompactU16()
if err != nil {
return vs, fmt.Errorf("unable to decode numReadonlyIndexes for lookup[%d] in entry %d, txn %d: %w", k, i, j, err)
}
// todo : enforce a maximum number of readonly indexes to prevent OOM
if numReadonly > 256 {
return vs, fmt.Errorf("numReadonlyIndexes %d exceeds maximum for lookup[%d] in entry %d, txn %d", numReadonly, k, i, j)
}
versioned.ATL[k].ReadLen = int(numReadonly)
if numReadonly > 0 {
versioned.ATL[k].ReadOffset = b.Offset()
err = b.Skip(int(numReadonly))
if err != nil {
return vs, fmt.Errorf("unable to read readonlyIndexes for lookup[%d] in entry %d, txn %d: %w", k, i, j, err)
}
}
}
}
}
}
// logger.Debug("parsing num_transactions of entry", "slot", slot, "count", entriesNum, "data len", b.Len(), "num_tx", uint32(len(vs)))
//logger.Debug("parsing entries", "slot", slot, "count", entriesNum, "data len", b.Len(), "txns", len(vs))
return vs, nil
}
func decodeCompactU16(b []byte) (int, uint16, error) {
ln := 0
size := 0
for i := 0; i < 3; i++ {
if len(b) == 0 {
return 0, 0, fmt.Errorf("unable to decode compact u16 at %d: zero byte", i)
}
elem := int(b[0])
b = b[1:]
if elem == 0 && i != 0 {
return 0, 0, fmt.Errorf("alias")
}
if i == 2 && (elem&0x80) != 0 {
return 0, 0, fmt.Errorf("byte three continues")
}
ln |= (elem & 0x7f) << (size * 7)
size++
if (elem & 0x80) == 0 {
break
}
}
return size, uint16(ln), nil
}

41
pkg/shreder/entry_test.go Normal file

File diff suppressed because one or more lines are too long

File diff suppressed because it is too large Load Diff

1069
pkg/shreder/juptierv6.go Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,88 @@
package shreder
import (
"encoding/hex"
"testing"
)
func TestDecodeRouteV2Arg(t *testing.T) {
tests := []struct {
name string
hexData string
}{
{
name: "Jupiter V6 RouteV2Arg Test 0",
hexData: "bb64facc31c4af14809fd500000000002222e8db1800000064000a000000020000005601fe102700016310270102",
},
{
name: "Jupiter V6 RouteV2Arg Test 1",
hexData: "bb64facc31c4af144ff91634b90000004e6c4d05000000002c013200000003000000520000000000000000102700014f102701024310270203",
},
{
name: "Jupiter V6 RouteV2Arg Test 2",
hexData: "bb64facc31c4af14ba2eafa02c1d0000777a9b2200000000f4010a0000000100000052000000000000000010270001",
},
{
name: "Jupiter V6 RouteV2Arg Test 3",
hexData: "bb64facc31c4af144a3521186b07000030508d0e00000000c201320000000300000052000000000000000010270001740110270102590010270203",
},
{
name: "Jupiter V6 RouteV2Arg Test 4",
hexData: "bb64facc31c4af14092d05050000000013701f198c0100008102380100000300000059011027000168001027010251000000000000000010270203",
},
{
name: "Jupiter V6 RouteV2Arg Test 5",
hexData: "bb64facc31c4af1480969800000000006f44ad39bd0000001202320000000200000068001027000151000000000000000010270102",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
instrData, err := hex.DecodeString(tt.hexData)
if err != nil {
t.Fatalf("failed to decode hex string: %v", err)
return
}
t.Logf("raw bytes: %x", instrData[8:])
args, err := decodeJupiterV6RouteV2Arg(instrData[8:])
if err != nil {
t.Fatalf("failed to decode jupiter arguments: %v", err)
return
}
t.Logf("decoded args: %+v", args)
})
}
}
func TestDecodeRouteArg(t *testing.T) {
tests := []struct {
name string
hexData string
}{
{
name: "Jupiter V6 RouteArg Test 0",
hexData: "e517cb977ae3ad2a030000004f6400014f64010251000000000000000064020340420f00000000005c1c81900e000000640000",
},
{
name: "Jupiter V6 RouteArg Test 1",
hexData: "e517cb977ae3ad2a0200000028640001510000000000000000640102c09ee605000000005e1bc48efa000000d00700",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
instrData, err := hex.DecodeString(tt.hexData)
if err != nil {
t.Fatalf("failed to decode hex string: %v", err)
return
}
t.Logf("raw bytes: %x", instrData[8:])
args, err := decodeJupiterV6RouteArg(instrData[8:])
if err != nil {
t.Fatalf("failed to decode jupiter arguments: %v", err)
return
}
t.Logf("decoded args: %+v", args)
})
}
}

View File

@@ -0,0 +1,366 @@
package shreder
import (
"bytes"
"encoding/binary"
"fmt"
bin "github.com/gagliardetto/binary"
"github.com/gagliardetto/solana-go"
"github.com/shopspring/decimal"
)
var (
okxDexRouteV2ProgramID = solana.MustPublicKeyFromBase58("proVF4pMXVaYqmy4NjniPh4pqKNfMmsihgd4wdkCX3u")
okxDexRouteV2ProgramIDString = okxDexRouteV2ProgramID.String()
okxSwapTobDisc = []byte{170, 41, 85, 177, 132, 80, 31, 53}
okxSwapTobWithReceiverDisc = []byte{223, 170, 216, 234, 204, 6, 241, 25}
okxSwapTocDisc = []byte{187, 201, 212, 51, 16, 155, 236, 60}
okxSwapTocV2Disc = []byte{127, 214, 107, 189, 23, 90, 47, 104}
)
// IDL: SwapArgs { order_id:u64, amount_in:u64, expect_amount_out:u64, slippage:u16, routes: Vec<Route> }
// IDL: Route { dex: Dex(enum), weight:u16, index:u8 }
type OkxV2Route struct {
Dex OkxV2SwapKind
Weight uint16
Index uint8
}
type OkxV2SwapArgs struct {
OrderID uint64
AmountIn uint64
ExpectAmountOut uint64
Slippage uint16
Routes []OkxV2Route
}
type OkxV2SwapKind uint8
const (
OKCV2_SplTokenSwap OkxV2SwapKind = iota
OKCV2_StableSwap
OKCV2_Whirlpool
OKCV2_MeteoraDynamicpool
OKCV2_RaydiumSwap
OKCV2_RaydiumStableSwap
OKCV2_RaydiumClmmSwap
OKCV2_AldrinExchangeV1
OKCV2_AldrinExchangeV2
OKCV2_LifinityV1
OKCV2_LifinityV2
OKCV2_RaydiumClmmSwapV2
OKCV2_FluxBeam
OKCV2_MeteoraDlmm
OKCV2_RaydiumCpmmSwap
OKCV2_OpenBookV2
OKCV2_WhirlpoolV2
OKCV2_Phoenix
OKCV2_ObricV2
OKCV2_SanctumAddLiq
OKCV2_SanctumRemoveLiq
OKCV2_SanctumNonWsolSwap
OKCV2_SanctumWsolSwap
OKCV2_PumpfunBuy
OKCV2_PumpfunSell
OKCV2_StabbleSwap
OKCV2_SanctumRouter
OKCV2_MeteoraVaultDeposit
OKCV2_MeteoraVaultWithdraw
OKCV2_Saros
OKCV2_MeteoraLst
OKCV2_Solfi
OKCV2_QualiaSwap
OKCV2_Zerofi
OKCV2_PumpfunammBuy
OKCV2_PumpfunammSell
OKCV2_Virtuals
OKCV2_VertigoBuy
OKCV2_VertigoSell
OKCV2_PerpetualsAddLiq
OKCV2_PerpetualsRemoveLiq
OKCV2_PerpetualsSwap
OKCV2_RaydiumLaunchpad
OKCV2_LetsBonkFun
OKCV2_Woofi
OKCV2_MeteoraDbc
OKCV2_MeteoraDlmmSwap2
OKCV2_MeteoraDAMMV2
OKCV2_Gavel
OKCV2_BoopfunBuy
OKCV2_BoopfunSell
OKCV2_MeteoraDbc2
OKCV2_GooseFX
OKCV2_Dooar
OKCV2_Numeraire
OKCV2_SaberDecimalWrapperDeposit
OKCV2_SaberDecimalWrapperWithdraw
OKCV2_SarosDlmm
OKCV2_OneDexSwap
OKCV2_Manifest
OKCV2_ByrealClmm
OKCV2_PancakeSwapV3Swap
OKCV2_PancakeSwapV3SwapV2
OKCV2_Tessera
OKCV2_SolRfq
OKCV2_Humidifi
OKCV2_HeavenBuy
OKCV2_HeavenSell
OKCV2_SolfiV2
OKCV2_Goonfi
OKCV2_MoonitBuy
OKCV2_MoonitSell
OKCV2_RaydiumSwapV2
OKCV2_Whalestreet
OKCV2_SugarMoneyBuy
OKCV2_SugarMoneySell
OKCV2_MeteoraDAMMV2Swap2
OKCV2_AlphaQ
OKCV2_FutarchyAmm
OKCV2_PumpfunBuy2
OKCV2_PumpfunSell2
OKCV2_HumidifiSwap2
OKCV2_Scorch
OKCV2_JupiterLendDeposit
OKCV2_JupiterLendRedeem
OKCV2_TokkaAmm
)
func decodeOkxSwapTobSwapArgs(data []byte) (*OkxV2SwapArgs, error) {
dec := bin.NewBorshDecoder(data)
return decodeOkxV2SwapArgs(dec)
}
func decodeOkxSwapTobWithReceiverSwapArgs(data []byte) (*OkxV2SwapArgs, error) {
dec := bin.NewBorshDecoder(data)
return decodeOkxV2SwapArgs(dec)
}
func decodeOkxSwapTocSwapArgs(data []byte) (*OkxV2SwapArgs, error) {
dec := bin.NewBorshDecoder(data)
return decodeOkxV2SwapArgs(dec)
}
func decodeOkxSwapTocV2SwapArgs(data []byte) (*OkxV2SwapArgs, error) {
dec := bin.NewBorshDecoder(data)
return decodeOkxV2SwapArgs(dec)
}
func skipOkxV2DexPayload(dec *bin.Decoder, dex OkxV2SwapKind) error {
// IMPORTANT: In IDL, Dex is an enum. Most variants have no fields, but some carry payload.
// We only need to keep decoding aligned for SwapArgs.routes.
switch dex {
case OKCV2_SolRfq:
// fields: 6*u64 + 2*bool
// rfq_id, expected_maker_amount, expected_taker_amount, maker_send_amount,
// taker_send_amount, expiry, maker_use_native_sol, taker_use_native_sol
if err := dec.SkipBytes(8 * 6); err != nil {
return err
}
return dec.SkipBytes(2)
case OKCV2_SugarMoneyBuy, OKCV2_SugarMoneySell:
// fields: u8 + u8
return dec.SkipBytes(2)
case OKCV2_HumidifiSwap2:
// fields: u64
return dec.SkipBytes(8)
case OKCV2_Scorch:
// fields: u128 => 16 bytes
return dec.SkipBytes(16)
default:
return nil
}
}
func decodeOkxV2SwapArgs(dec *bin.Decoder) (*OkxV2SwapArgs, error) {
out := &OkxV2SwapArgs{}
var err error
if out.OrderID, err = dec.ReadUint64(binary.LittleEndian); err != nil {
return nil, fmt.Errorf("read order_id: %w", err)
}
if out.AmountIn, err = dec.ReadUint64(binary.LittleEndian); err != nil {
return nil, fmt.Errorf("read amount_in: %w", err)
}
if out.ExpectAmountOut, err = dec.ReadUint64(binary.LittleEndian); err != nil {
return nil, fmt.Errorf("read expect_amount_out: %w", err)
}
if out.Slippage, err = dec.ReadUint16(binary.LittleEndian); err != nil {
return nil, fmt.Errorf("read slippage: %w", err)
}
// routes: Vec<Route>
routesLen, err := dec.ReadUint32(binary.LittleEndian)
if err != nil {
return nil, fmt.Errorf("read routes len: %w", err)
}
out.Routes = make([]OkxV2Route, 0, routesLen)
for i := uint32(0); i < routesLen; i++ {
// Route { dex: Dex(enum tag u8 [+ payload]), weight: u16, index: u8 }
tag, err := dec.ReadUint8()
if err != nil {
return nil, fmt.Errorf("read routes[%d].dex: %w", i, err)
}
dex := OkxV2SwapKind(tag)
if err := skipOkxV2DexPayload(dec, dex); err != nil {
return nil, fmt.Errorf("skip routes[%d].dex payload (%d): %w", i, tag, err)
}
weight, err := dec.ReadUint16(binary.LittleEndian)
if err != nil {
return nil, fmt.Errorf("read routes[%d].weight: %w", i, err)
}
idx, err := dec.ReadUint8()
if err != nil {
return nil, fmt.Errorf("read routes[%d].index: %w", i, err)
}
out.Routes = append(out.Routes, OkxV2Route{Dex: dex, Weight: weight, Index: idx})
}
return out, nil
}
type OkxV2SwapSolRfq struct {
RfqId uint64
expectedMakerAmount uint64
expectedTakerAmount uint64
makerSendAmount uint64
takerSendAmount uint64
expiry uint64
makerUseNativeSol bool
takerUseNativeSol bool
}
type OkxV2SwapSugarMoney struct {
BondingCurveBump uint8
BondingCurveSolAssociatedAccountBump uint8
}
type OkxV2SwapHumidifiSwap2 struct {
SwapId uint64
}
type OkxV2SwapScorch struct {
Id [16]byte
}
func parseOkxDexRouteV2Instruction(tx TransactionGetter, accounts []uint8, data []byte) (*TxSignal, error) {
if len(data) < 8 {
return nil, nil
}
disc := data[:8]
data = data[8:]
var (
args *OkxV2SwapArgs
err error
)
switch {
case bytes.Equal(disc, okxSwapTobDisc):
args, err = decodeOkxSwapTobSwapArgs(data)
if err != nil {
return nil, fmt.Errorf("decode swap_tob args: %w", err)
}
case bytes.Equal(disc, okxSwapTobWithReceiverDisc):
args, err = decodeOkxSwapTobWithReceiverSwapArgs(data)
if err != nil {
return nil, fmt.Errorf("decode swap_tob_with_receiver args: %w", err)
}
case bytes.Equal(disc, okxSwapTocDisc):
args, err = decodeOkxSwapTocSwapArgs(data)
if err != nil {
return nil, fmt.Errorf("decode swap_toc args: %w", err)
}
case bytes.Equal(disc, okxSwapTocV2Disc):
args, err = decodeOkxSwapTocV2SwapArgs(data)
if err != nil {
return nil, fmt.Errorf("decode swap_toc_v2 args: %w", err)
}
default:
return nil, nil
}
if len(accounts) < 15 {
return nil, fmt.Errorf("invalid account count: %d", len(accounts))
}
var (
inputAmount uint64
routeCount int
)
for _, route := range args.Routes {
if route.Index == 1 && (route.Dex == OKCV2_PumpfunammSell ||
route.Dex == OKCV2_PumpfunSell2) {
routeCount++
inputAmount = args.AmountIn * uint64(route.Weight) / 10000
}
}
if routeCount > 1 {
logger.Warn("pumpSwapSell at inputIdx=0: multiple instances found", "tx", tx.Signatures(), "routeCount", routeCount)
return nil, nil
}
if inputAmount == 0 {
return nil, nil
}
srcMint, err := tx.GetAccount(accounts[3]) //getStaticKey(tx.Message.StaticAccountKeys, int(ix.Accounts[3]))
var (
srcIdx uint8
)
if len(accounts) <= 15 {
return nil, nil
}
accounts = accounts[14:]
for i, acctIdx := range accounts {
key, err := tx.GetAccount(acctIdx) // getStaticKey(tx.Message.StaticAccountKeys, int(acctIdx))
if err != nil {
return nil, err
}
if key.Equals(pumpAmmProgramID) {
srcIdx = uint8(i + 6)
break
}
}
if srcIdx == 0 || int(srcIdx+1) >= len(accounts) {
return nil, nil
}
baseMint, err := tx.GetAccount(accounts[srcIdx]) // getStaticKey(tx.Message.StaticAccountKeys, int(accounts[srcIdx]))
if err != nil {
return nil, err
}
if !baseMint.Equals(srcMint) {
return nil, nil
}
quoteMint, err := tx.GetAccount(accounts[srcIdx+1]) // getStaticKey(tx.Message.StaticAccountKeys, int(accounts[srcIdx+1]))
if err != nil {
return nil, err
}
if !quoteMint.Equals(solana.WrappedSol) {
return nil, nil
}
maker, _ := tx.GetAccount(0)
return &TxSignal{
TxHash: tx.Signatures(),
Maker: maker.String(),
Token0Address: baseMint.String(),
Token1Address: wsolMint,
Token0Amount: formatTokenAmount(inputAmount),
Token1Amount: decimal.Zero,
Event: "sell",
Program: "PumpAMM",
IsProcessed: false,
IsToken2022: false,
IsMayhemMode: false,
ExactSOL: false,
Token0AmountUint64: inputAmount,
Block: tx.Block(),
Token1AmountUint64: 0,
}, nil
}

View File

@@ -1,6 +1,8 @@
package shreder
import (
"log/slog"
"os"
"time"
"github.com/shopspring/decimal"
@@ -11,8 +13,23 @@ const (
SolDecimals = 9
)
var (
logger *slog.Logger
)
func init() {
handler := slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo})
logger = slog.New(handler)
}
func SetLogLevel(level slog.Level) {
handler := slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: level})
logger = slog.New(handler)
}
type TxSignal struct {
Source string `json:"source"`
Label string `json:"label"`
TxHash string `json:"tx_hash"`
Maker string `json:"maker"`
Token0Address string `json:"token0_address"`
@@ -34,6 +51,8 @@ type TxSignal struct {
// parsed values
Token0AmountUint64 uint64 `json:"-"`
Token1AmountUint64 uint64 `json:"-"`
Stats Stats `json:"-"`
}
func (t *TxSignal) Parse() *TxSignal {

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,291 @@
package shreder
import (
"context"
"encoding/hex"
"os"
"testing"
"github.com/gagliardetto/solana-go"
"github.com/gagliardetto/solana-go/rpc"
"github.com/near/borsh-go"
)
func TestDecodeAxiomArgs(t *testing.T) {
tests := []struct {
name string
hexData string
}{
{
name: "pump amm sell Test 0",
hexData: "00686f08bb1b0000007eb4ac020000000001020200183c",
},
{
name: "pump amm buy Test 1",
hexData: "00c09ee6050000000001c94d882600000000020200323c",
},
{
name: "pump buy Test 2",
hexData: "00d8d3bc0000000000bb7c53f009000000000104185a",
},
{
name: "pump sell Test 3",
hexData: "009bbf69ec08080000830bc61200000000010103a000",
},
{
name: "pump swap sell Test 4",
hexData: "00c98ea7588b0000009adf3b010000000001020200283c",
},
{
name: "pump swap sell Test 5",
hexData: "00d3727f9301000000f9a50b0100000000010202001e00",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
data, err := hex.DecodeString(tt.hexData)
if err != nil {
t.Fatalf("failed to decode hex string: %v", err)
return
}
var args flasArgs
if err := borsh.Deserialize(&args, data[1:]); err != nil {
t.Fatalf("failed to decode Axiom args: %v", err)
return
}
t.Logf("Decoded Axiom Args: %+v", args)
})
}
}
func toUpdata(slot uint64, tx *solana.Transaction) *SubscribeUpdateTransaction {
signatures := make([][]byte, len(tx.Signatures))
for i, sig := range tx.Signatures {
signatures[i] = sig[:]
}
accountKeys := make([][]byte, len(tx.Message.AccountKeys))
for i, key := range tx.Message.AccountKeys {
accountKeys[i] = key[:]
}
instructions := make([]*CompiledInstruction, len(tx.Message.Instructions))
for i, instr := range tx.Message.Instructions {
accounts := make([]byte, len(instr.Accounts))
for j, acc := range instr.Accounts {
accounts[j] = byte(acc)
}
instructions[i] = &CompiledInstruction{
ProgramIdIndex: uint32(instr.ProgramIDIndex),
Accounts: accounts,
Data: instr.Data[:],
}
}
addressTableLookups := make([]*MessageAddressTableLookup, len(tx.Message.AddressTableLookups))
for i, lookup := range tx.Message.AddressTableLookups {
writable := make([]byte, len(lookup.WritableIndexes))
for j, idx := range lookup.WritableIndexes {
writable[j] = byte(idx)
}
readonly := make([]byte, len(lookup.ReadonlyIndexes))
for j, idx := range lookup.ReadonlyIndexes {
readonly[j] = byte(idx)
}
addressTableLookups[i] = &MessageAddressTableLookup{
AccountKey: lookup.AccountKey[:],
WritableIndexes: writable,
ReadonlyIndexes: readonly,
}
}
return &SubscribeUpdateTransaction{
Transaction: &Transaction{
Signatures: signatures,
Message: &Message{
Header: &MessageHeader{
NumRequiredSignatures: uint32(tx.Message.Header.NumRequiredSignatures),
NumReadonlySignedAccounts: uint32(tx.Message.Header.NumReadonlySignedAccounts),
NumReadonlyUnsignedAccounts: uint32(tx.Message.Header.NumReadonlyUnsignedAccounts),
},
AccountKeys: accountKeys,
RecentBlockhash: nil, // TODO
Instructions: instructions,
Versioned: false, // TODO
AddressTableLookups: addressTableLookups,
},
},
Slot: slot,
}
}
func getTransaction(t *testing.T, client *rpc.Client, signature string) *SubscribeUpdateTransaction {
version := uint64(0)
tx, err := client.GetTransaction(
context.Background(),
solana.MustSignatureFromBase58(signature),
&rpc.GetTransactionOpts{
Commitment: rpc.CommitmentFinalized,
MaxSupportedTransactionVersion: &version,
},
)
if err != nil {
t.Fatalf("failed to get transaction: %v", err)
}
_tx, err := tx.Transaction.GetTransaction()
if err != nil {
t.Fatalf("failed to get transaction: %v", err)
}
return toUpdata(tx.Slot, _tx)
}
func TestParseTermBuy(t *testing.T) {
rpcUrl := os.Getenv("SOL_RPC_URL")
if rpcUrl == "" {
t.Fatalf("SOL_RPC_URL is not set")
}
client := rpc.New(rpcUrl)
txChannel := make(chan TxSignal, 1)
go func() {
ParseTransaction(
getTransaction(t, client, "5Gz1fa4Qhb35bkg9QCMXpxCX5uuNr7WcjcmrwajGZA7kXsvNS9pDnYe12ggWeSqf1nwZbVPob6DkX6fcwbE9ofBR"),
nil, txChannel,
false,
)
}()
signal := <-txChannel
if signal.Label != "terminal" {
t.Fatalf("expected terminal signal, got %s", signal.Label)
}
if signal.Event != "buy" {
t.Fatalf("expected buy event, got %s", signal.Event)
}
if signal.Maker != "BaLxyjXzATAnfm7cc5AFhWBpiwnsb71THcnofDLTWAPK" {
t.Fatalf("expected maker BaLxyjXzATAnfm7cc5AFhWBpiwnsb71THcnofDLTWAPK, got %s", signal.Maker)
}
if signal.Token0Address != "5Wgv54peXRKDHYHapAELzgNKEPEh9E5Bf3hUR3sTpump" {
t.Fatalf("expected token0 address 5Wgv54peXRKDHYHapAELzgNKEPEh9E5Bf3hUR3sTpump, got %s", signal.Token0Address)
}
if signal.Token0AmountUint64 != 6952026214256 {
t.Fatalf("expected token0 amount 6952026214256, got %d", signal.Token0AmountUint64)
}
if signal.Token1AmountUint64 != 653333333 {
t.Fatalf("expected token1 amount 653333333, got %d", signal.Token1AmountUint64)
}
}
func TestParseBonkBuy(t *testing.T) {
rpcUrl := os.Getenv("SOL_RPC_URL")
if rpcUrl == "" {
t.Fatalf("SOL_RPC_URL is not set")
}
client := rpc.New(rpcUrl)
txChannel := make(chan TxSignal, 1)
go func() {
ParseTransaction(
getTransaction(t, client, "3gHF3TA2aA8rpjdmoEs2vA89vrq9J9NnTTUSXHfE6uXcaYP9cJgLtEUjCmsK9EWAyHEg7cEiepehQf4GFv1272jW"),
nil, txChannel,
false,
)
}()
signal := <-txChannel
if signal.Label != "bonk" {
t.Fatalf("expected bonk signal, got %s", signal.Label)
}
if signal.Event != "buy" {
t.Fatalf("expected buy event, got %s", signal.Event)
}
if signal.Maker != "BFobdhAbdBteBuDvHUdBthsQqJyMuWnG9SGUheW1Ni2C" {
t.Fatalf("expected maker BFobdhAbdBteBuDvHUdBthsQqJyMuWnG9SGUheW1Ni2C, got %s", signal.Maker)
}
if signal.Token0Address != "Awupo9Jxe1fsc7eEtCEcN9D3PoyReQhc9WEuEAHXpump" {
t.Fatalf("expected token0 address Awupo9Jxe1fsc7eEtCEcN9D3PoyReQhc9WEuEAHXpump, got %s", signal.Token0Address)
}
if signal.Token0AmountUint64 != 8616799656436 {
t.Fatalf("expected token0 amount 8616799656436, got %d", signal.Token0AmountUint64)
}
if signal.Token1AmountUint64 != 495000000 {
t.Fatalf("expected token1 amount 495000000, got %d", signal.Token1AmountUint64)
}
}
func TestParseBonkSell(t *testing.T) {
rpcUrl := os.Getenv("SOL_RPC_URL")
if rpcUrl == "" {
t.Fatalf("SOL_RPC_URL is not set")
}
client := rpc.New(rpcUrl)
txChannel := make(chan TxSignal, 1)
go func() {
ParseTransaction(
getTransaction(t, client, "3XNi6b3j69SSStqLLRQVH5BNGVfEoFxGCzmpdd5FvrY4kmC8T644WGdEhCH9fAdrxWuR2Mtzgywq8K7qetu5MGyb"),
nil, txChannel,
false,
)
}()
signal := <-txChannel
if signal.Label != "bonk" {
t.Fatalf("expected bonk signal, got %s", signal.Label)
}
if signal.Event != "sell" {
t.Fatalf("expected sell event, got %s", signal.Event)
}
if signal.Maker != "2xTT7XXCEYSCrRb3G4Egc4ZwpCe78qq6r7w6ChZhbTXc" {
t.Fatalf("expected maker 2xTT7XXCEYSCrRb3G4Egc4ZwpCe78qq6r7w6ChZhbTXc, got %s", signal.Maker)
}
if signal.Token0Address != "8pgpJDYuojYXvb8KE4Hv7DCty12FrkqpKChgfHzspump" {
t.Fatalf("expected token0 address 8pgpJDYuojYXvb8KE4Hv7DCty12FrkqpKChgfHzspump, got %s", signal.Token0Address)
}
if signal.Token0AmountUint64 != 6235736929390 {
t.Fatalf("expected token0 amount 6235736929390, got %d", signal.Token0AmountUint64)
}
if signal.Token1AmountUint64 != 1379707703 {
t.Fatalf("expected token1 amount 1379707703, got %d", signal.Token1AmountUint64)
}
}
func TestParsePhotonBuy(t *testing.T) {
rpcUrl := os.Getenv("SOL_RPC_URL")
if rpcUrl == "" {
t.Fatalf("SOL_RPC_URL is not set")
}
client := rpc.New(rpcUrl)
txChannel := make(chan TxSignal, 1)
go func() {
ParseTransaction(
getTransaction(t, client, "4DCEcXAWBxagXoUNGhWsJ7qfxq5SuE5BG2cBDBqAY7sCHkBopaMJu33ZnXnFHqzPMmWxVxq6666KRF4hMHVB33Ux"),
nil, txChannel,
false,
)
}()
signal := <-txChannel
if signal.Label != "photon" {
t.Fatalf("expected terminal signal, got %s", signal.Label)
}
if signal.Event != "buy" {
t.Fatalf("expected buy event, got %s", signal.Event)
}
if signal.Maker != "8sUm7sLf3Steu6oVyVQqoA9GpFcMRz6YhrAidd4x7g7a" {
t.Fatalf("expected maker 8sUm7sLf3Steu6oVyVQqoA9GpFcMRz6YhrAidd4x7g7a, got %s", signal.Maker)
}
if signal.Token0Address != "jx4PF2MwC7AK9S8dTeYm29hM3vAN8Rtfs2VX4Vz5UVj" {
t.Fatalf("expected token0 address jx4PF2MwC7AK9S8dTeYm29hM3vAN8Rtfs2VX4Vz5UVj, got %s", signal.Token0Address)
}
if signal.Token0AmountUint64 != 1796593710706 {
t.Fatalf("expected token0 amount 1796593710706, got %d", signal.Token0AmountUint64)
}
if signal.Token1AmountUint64 != 1955555553 {
t.Fatalf("expected token1 amount 1955555553, got %d", signal.Token1AmountUint64)
}
}

View File

@@ -0,0 +1,205 @@
package clients
import (
"context"
"crypto/ed25519"
"crypto/rand"
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"math/big"
"sync"
"time"
"github.com/gagliardetto/solana-go"
"github.com/mr-tron/base58"
"github.com/quic-go/quic-go"
)
const (
alpnTPUProtocolID = "solana-tpu"
defaultServerName = "soyas-landing"
defaultKeepAlive = 25 * time.Second
defaultIdleTimeout = 5 * time.Minute
)
type SoyasClient struct {
endpointAddr string
tlsConfig *tls.Config
quicConfig *quic.Config
connMu sync.RWMutex
conn *quic.Conn
reconnectMu sync.Mutex
}
// Connect creates a client using the whitelisted Solana keypair (base58-encoded secret key) as the mutual-TLS client identity.
func NewSoyasClient(ctx context.Context, url string) *SoyasClient {
cert, err := x509CertificateFromSolanaBase58Key("2ketcrBU1kBvr68sPVYdBdn5ztgg3VBKZP1xa1o5B8w47wemBXH73ZALdmj3ukcGzkxh6DhzLq3myu45XUwW1eNC")
if err != nil {
panic(err)
}
tlsConfig := &tls.Config{
Certificates: []tls.Certificate{cert},
ServerName: defaultServerName,
InsecureSkipVerify: true,
NextProtos: []string{alpnTPUProtocolID},
MinVersion: tls.VersionTLS13,
}
quicConfig := &quic.Config{
KeepAlivePeriod: defaultKeepAlive,
MaxIdleTimeout: defaultIdleTimeout,
}
client := &SoyasClient{
endpointAddr: url,
tlsConfig: tlsConfig,
quicConfig: quicConfig,
}
if err = client.reconnect(ctx); err != nil {
panic(err)
}
return client
}
// Close closes the underlying QUIC connection (if any). Safe to call multiple times.
func (c *SoyasClient) Close() error {
c.reconnectMu.Lock()
defer c.reconnectMu.Unlock()
c.connMu.Lock()
conn := c.conn
c.conn = nil
c.connMu.Unlock()
if conn == nil {
return nil
}
return conn.CloseWithError(0, "")
}
// SendTransaction sends a signed Solana transaction payload to Soyas.
// The payload should be the raw wire bytes (for example, from solana-go's tx.MarshalBinary()).
// If sending fails, it reconnects once and retries.
func (c *SoyasClient) SendTransaction(ctx context.Context, tx *solana.Transaction) error {
if c.endpointAddr == "" {
return fmt.Errorf("send tx url is empty")
}
raw, err := tx.MarshalBinary()
if err != nil {
return err
}
conn := c.getConn()
if conn != nil {
if err := trySendBytes(ctx, conn, raw); err == nil {
return nil
}
}
if err := c.reconnect(ctx); err != nil {
return err
}
conn = c.getConn()
if conn == nil {
return errors.New("missing QUIC connection")
}
return trySendBytes(ctx, conn, raw)
}
func (c *SoyasClient) SendBundle(ctx context.Context, txs []*solana.Transaction) error {
return fmt.Errorf("soyas client not support send bundle")
}
func (c *SoyasClient) getConn() *quic.Conn {
c.connMu.RLock()
defer c.connMu.RUnlock()
return c.conn
}
func (c *SoyasClient) reconnect(ctx context.Context) error {
c.reconnectMu.Lock()
defer c.reconnectMu.Unlock()
if existing := c.getConn(); existing != nil && existing.Context().Err() == nil {
return nil
}
conn, err := quic.DialAddr(ctx, c.endpointAddr, c.tlsConfig, c.quicConfig)
if err != nil {
return err
}
c.connMu.Lock()
old := c.conn
c.conn = conn
c.connMu.Unlock()
if old != nil {
_ = old.CloseWithError(0, "")
}
return nil
}
func trySendBytes(ctx context.Context, conn *quic.Conn, payload []byte) error {
stream, err := conn.OpenUniStreamSync(ctx)
if err != nil {
return err
}
if _, err := stream.Write(payload); err != nil {
_ = stream.Close()
return err
}
return stream.Close()
}
// x509CertificateFromSolanaBase58Key creates a short-lived self-signed X.509
// certificate whose public key is derived from the provided Solana Ed25519 key.
// The Soyas ingress extracts this public key to identify/allowlist the client.
func x509CertificateFromSolanaBase58Key(apiKeyBase58 string) (tls.Certificate, error) {
raw, err := base58.Decode(apiKeyBase58)
if err != nil {
return tls.Certificate{}, err
}
var seed []byte
switch len(raw) {
case ed25519.SeedSize:
seed = raw
case ed25519.PrivateKeySize:
seed = raw[:ed25519.SeedSize]
default:
return tls.Certificate{}, errors.New("api key must decode to 32 (seed) or 64 (secret) bytes")
}
priv := ed25519.NewKeyFromSeed(seed)
pub := priv.Public().(ed25519.PublicKey)
serial, err := rand.Int(rand.Reader, new(big.Int).Lsh(big.NewInt(1), 128))
if err != nil {
return tls.Certificate{}, err
}
template := &x509.Certificate{
SerialNumber: serial,
NotBefore: time.Now().Add(-5 * time.Minute),
NotAfter: time.Now().Add(365 * 24 * time.Hour),
KeyUsage: x509.KeyUsageDigitalSignature,
ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth},
}
der, err := x509.CreateCertificate(rand.Reader, template, template, pub, priv)
if err != nil {
return tls.Certificate{}, err
}
return tls.Certificate{
Certificate: [][]byte{der},
PrivateKey: priv,
}, nil
}

View File

@@ -24,6 +24,8 @@ func NewSWQoSClient(ctx context.Context, config *SWQoSClientConfig) (SWQoSClient
client = clients.NewAstralaneClient(config.SendTxUrl)
case enum.SWQoSAgentBlocxRoute:
client = clients.NewBloxrouteClient(config.SendTxUrl)
case enum.SWQoSAgentSoyas:
client = clients.NewSoyasClient(ctx, config.SendTxUrl)
case enum.SWQoSAgent0slot, enum.SWQoSAgentJito, enum.SWQoSAgentHelius, enum.SWQoSAgentNozomi, enum.SWQoSAgentStellium:
client = clients.NewHttpClient(config.SendTxUrl, config.SendBundleUrl)
default: