Skip to content

Commit

Permalink
Add query support to / POST payload (webhook) (#191)
Browse files Browse the repository at this point in the history
* Add query support to / POST payload (webhook)

Signed-off-by: Peter Broadhurst <[email protected]>

* Fix typo

Signed-off-by: Vinod Damle <[email protected]>

Co-authored-by: Vinod Damle <[email protected]>
  • Loading branch information
peterbroadhurst and Vinod Damle authored Jan 24, 2022
1 parent a432244 commit 4a72038
Show file tree
Hide file tree
Showing 16 changed files with 844 additions and 49 deletions.
573 changes: 573 additions & 0 deletions go.sum

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions internal/contractgateway/rest2eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import (
// REST2EthAsyncDispatcher is passed in to process messages over a streaming system with
// a receipt store. Only used for POST methods, when fly-sync is not set to true
type REST2EthAsyncDispatcher interface {
DispatchMsgAsync(ctx context.Context, msg map[string]interface{}, ack, immediateReceipt bool) (*messages.AsyncSentMsg, int, error)
DispatchMsgAsync(ctx context.Context, msg map[string]interface{}, ack, immediateReceipt bool) (messages.WebhookReply, int, error)
}

// rest2EthSyncDispatcher abstracts the processing of the transactions and queries
Expand Down Expand Up @@ -636,7 +636,7 @@ func (r *rest2eth) sendTransaction(res http.ResponseWriter, req *http.Request, f
// We are confident in the re-serialization here as we've deserialized from JSON then built our own structure
msgBytes, _ := json.Marshal(msg)
var mapMsg map[string]interface{}
json.Unmarshal(msgBytes, &mapMsg)
_ = json.Unmarshal(msgBytes, &mapMsg)
if asyncResponse, status, err := r.asyncDispatcher.DispatchMsgAsync(req.Context(), mapMsg, ack, immediateReceipt); err != nil {
r.restErrReply(res, req, err, status)
} else {
Expand Down Expand Up @@ -724,7 +724,7 @@ func (r *rest2eth) lookupTransaction(res http.ResponseWriter, req *http.Request,
return
}

func (r *rest2eth) restAsyncReply(res http.ResponseWriter, req *http.Request, asyncResponse *messages.AsyncSentMsg) {
func (r *rest2eth) restAsyncReply(res http.ResponseWriter, req *http.Request, asyncResponse messages.WebhookReply) {
resBytes, _ := json.Marshal(asyncResponse)
status := 202 // accepted
log.Infof("<-- %s %s [%d]:\n%s", req.Method, req.URL, status, string(resBytes))
Expand Down
14 changes: 7 additions & 7 deletions internal/contractgateway/rest2eth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ type mockREST2EthDispatcher struct {
deployContractSyncError error
}

func (m *mockREST2EthDispatcher) DispatchMsgAsync(ctx context.Context, msg map[string]interface{}, ack, immediateReceipt bool) (*messages.AsyncSentMsg, int, error) {
func (m *mockREST2EthDispatcher) DispatchMsgAsync(ctx context.Context, msg map[string]interface{}, ack, immediateReceipt bool) (messages.WebhookReply, int, error) {
m.asyncDispatchMsg = msg
m.asyncDispatchAck = ack
return m.asyncDispatchReply, m.asyncDispatchStatus, m.asyncDispatchError
Expand Down Expand Up @@ -712,7 +712,7 @@ func TestDeployContractSyncRemoteRegistryGateway(t *testing.T) {
}
dispatcher := &mockREST2EthDispatcher{
sendTransactionSyncReceipt: receipt,
asyncDispatchStatus: 200,
asyncDispatchStatus: 200,
}

r, router, res, _ := newTestREST2EthAndMsg(dispatcher, from, "", bodyMap)
Expand Down Expand Up @@ -746,7 +746,7 @@ func TestSendTransactionSyncFail(t *testing.T) {
from := "0x66c5fe653e7a9ebb628a6d40f0452d1e358baee8"
dispatcher := &mockREST2EthDispatcher{
sendTransactionSyncError: fmt.Errorf("pop"),
asyncDispatchStatus: 500,
asyncDispatchStatus: 500,
}

r, router, res, req := newTestREST2EthAndMsg(dispatcher, from, to, bodyMap)
Expand Down Expand Up @@ -776,7 +776,7 @@ func TestSendTransactionAsyncFail(t *testing.T) {
to := "0x567a417717cb6c59ddc1035705f02c0fd1ab1872"
from := "0x66c5fe653e7a9ebb628a6d40f0452d1e358baee8"
dispatcher := &mockREST2EthDispatcher{
asyncDispatchError: fmt.Errorf("pop"),
asyncDispatchError: fmt.Errorf("pop"),
asyncDispatchStatus: 500,
}

Expand Down Expand Up @@ -805,7 +805,7 @@ func TestDeployContractAsyncFail(t *testing.T) {
bodyMap["s"] = "testing"
from := "0x66c5fe653e7a9ebb628a6d40f0452d1e358baee8"
dispatcher := &mockREST2EthDispatcher{
asyncDispatchError: fmt.Errorf("pop"),
asyncDispatchError: fmt.Errorf("pop"),
asyncDispatchStatus: 500,
}

Expand Down Expand Up @@ -838,7 +838,7 @@ func TestSendTransactionAsyncBadMethod(t *testing.T) {
to := "0x567a417717cb6c59ddc1035705f02c0fd1ab1872"
from := "0x66c5fe653e7a9ebb628a6d40f0452d1e358baee8"
dispatcher := &mockREST2EthDispatcher{
asyncDispatchError: fmt.Errorf("pop"),
asyncDispatchError: fmt.Errorf("pop"),
asyncDispatchStatus: 500,
}

Expand Down Expand Up @@ -869,7 +869,7 @@ func TestSendTransactionBadContract(t *testing.T) {
from := "0x66c5fe653e7a9ebb628a6d40f0452d1e358baee8"
dispatcher := &mockREST2EthDispatcher{
asyncDispatchStatus: 500,
asyncDispatchError: fmt.Errorf("pop"),
asyncDispatchError: fmt.Errorf("pop"),
}

r, router, res, req := newTestREST2EthAndMsg(dispatcher, from, to, bodyMap)
Expand Down
2 changes: 1 addition & 1 deletion internal/eth/compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ var defaultSolc string
func getSolcExecutable(requestedVersion string) (string, error) {
log.Infof("Solidity compiler requested: %s", requestedVersion)
if solcVerChecker == nil {
solcVerChecker, _ = regexp.Compile("^([0-9]+)\\.?([0-9]+)")
solcVerChecker, _ = regexp.Compile(`^([0-9]+)\.?([0-9]+)`)
}
defaultSolc = utils.GetenvOrDefaultLowerCase(utils.GetenvOrDefaultUpperCase("PREFIX_SHORT", "fly")+"_SOLC_DEFAULT", "solc")
solc := defaultSolc
Expand Down
1 change: 0 additions & 1 deletion internal/eth/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ func RPCConnect(conf *RPCConnOpts) (RPCClientAll, error) {
// CobraInitRPC sets the standard command-line parameters for RPC
func CobraInitRPC(cmd *cobra.Command, rconf *RPCConf) {
cmd.Flags().StringVarP(&rconf.RPC.URL, "rpc-url", "r", os.Getenv("ETH_RPC_URL"), "JSON/RPC URL for Ethereum node")
return
}

// rpc.RPCClient methods with original types that we expose - only used within this package.
Expand Down
26 changes: 26 additions & 0 deletions internal/eth/send.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"encoding/hex"
"math/big"
"regexp"
"strings"
"time"

Expand Down Expand Up @@ -100,6 +101,31 @@ func (tx *Txn) Call(ctx context.Context, rpc RPCClient, blocknumber string) (res
return
}

func (tx *Txn) CallAndProcessReply(ctx context.Context, rpc RPCClient, blocknumber string) (map[string]interface{}, error) {
callOption := "latest"
// only allowed values are "earliest/latest/pending", "", a number string "12345" or a hex number "0xab23"
// "latest" and "" (no fly-blocknumber given) are equivalent
if blocknumber != "" && blocknumber != "latest" {
isHex, _ := regexp.MatchString(`^0x[0-9a-fA-F]+$`, blocknumber)
if isHex || blocknumber == "earliest" || blocknumber == "pending" {
callOption = blocknumber
} else {
n := new(big.Int)
n, ok := n.SetString(blocknumber, 10)
if !ok {
return nil, errors.Errorf(errors.TransactionCallInvalidBlockNumber)
}
callOption = ethbind.API.EncodeBig(n)
}
}

retBytes, err := tx.Call(ctx, rpc, callOption)
if err != nil || retBytes == nil {
return nil, err
}
return ProcessRLPBytes(tx.Method.Outputs, retBytes), nil
}

// Send sends an individual transaction, choosing external or internal signing
func (tx *Txn) Send(ctx context.Context, rpc RPCClient) (err error) {
start := time.Now().UTC()
Expand Down
32 changes: 7 additions & 25 deletions internal/eth/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"fmt"
"math/big"
"reflect"
"regexp"
"strconv"
"strings"

Expand All @@ -47,6 +46,7 @@ type Txn struct {
PrivateFor []string
PrivacyGroupID string
Signer TXSigner
Method *ethbinding.ABIMethod
}

// TxnReceipt is the receipt obtained over JSON/RPC from the ethereum client
Expand Down Expand Up @@ -146,28 +146,7 @@ func CallMethod(ctx context.Context, rpc RPCClient, signer TXSigner, from, addr
if err != nil {
return nil, err
}
callOption := "latest"
// only allowed values are "earliest/latest/pending", "", a number string "12345" or a hex number "0xab23"
// "latest" and "" (no fly-blocknumber given) are equivalent
if blocknumber != "" && blocknumber != "latest" {
isHex, _ := regexp.MatchString(`^0x[0-9a-fA-F]+$`, blocknumber)
if isHex || blocknumber == "earliest" || blocknumber == "pending" {
callOption = blocknumber
} else {
n := new(big.Int)
n, ok := n.SetString(blocknumber, 10)
if !ok {
return nil, errors.Errorf(errors.TransactionCallInvalidBlockNumber)
}
callOption = ethbind.API.EncodeBig(n)
}
}

retBytes, err := tx.Call(ctx, rpc, callOption)
if err != nil || retBytes == nil {
return nil, err
}
return ProcessRLPBytes(methodABI.Outputs, retBytes), nil
return tx.CallAndProcessReply(ctx, rpc, blocknumber)
}

// Decode the "input" bytes from a transaction, which are composed of a method ID + encoded arguments
Expand Down Expand Up @@ -391,7 +370,10 @@ func NewNilTX(from string, nonce int64, signer TXSigner) (tx *Txn, err error) {
}

func buildTX(signer TXSigner, msgFrom, msgTo string, msgNonce, msgValue, msgGas, msgGasPrice json.Number, methodABI *ethbinding.ABIMethod, params []interface{}) (tx *Txn, err error) {
tx = &Txn{Signer: signer}
tx = &Txn{
Signer: signer,
Method: methodABI,
}

// Build correctly typed args for the ethereum call
typedArgs, err := tx.generateTypedArgs(params, methodABI)
Expand Down Expand Up @@ -641,7 +623,7 @@ func (tx *Txn) generateTypedArg(requiredType *ethbinding.ABIType, param interfac
if suppliedType.Kind() == reflect.Slice {
paramV := reflect.ValueOf(param)
bSliceLen := paramV.Len()
bSlice = make([]byte, bSliceLen, bSliceLen)
bSlice = make([]byte, bSliceLen)
for i := 0; i < bSliceLen; i++ {
valV := paramV.Index(i)
if valV.Kind() == reflect.Interface {
Expand Down
53 changes: 53 additions & 0 deletions internal/eth/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1908,3 +1908,56 @@ func TestDecodeInputs(t *testing.T) {
assert.NoError(err)
assert.Equal(expectedArgs, args)
}

func TestCallAndProcessReply(t *testing.T) {
qm := &messages.QueryTransaction{
SendTransaction: messages.SendTransaction{
Method: &ethbinding.ABIElementMarshaling{
Name: "method1",
Outputs: []ethbinding.ABIArgumentMarshaling{
{Name: "arg1", Type: "uint256"},
},
},
},
}

rpc := &testRPCClient{
resultWrangler: func(retString interface{}) {
retVal := "0x000000000000000000000000000000000000000000000000000000000003039"
reflect.ValueOf(retString).Elem().Set(reflect.ValueOf(retVal))
},
}

tx, err := NewSendTxn(&qm.SendTransaction, nil)
assert.NoError(t, err)

res, err := tx.CallAndProcessReply(context.Background(), rpc, "latest")
assert.NoError(t, err)
assert.Equal(t, "12345", res["arg1"])

}

func TestCallAndProcessReplyNilRet(t *testing.T) {
qm := &messages.QueryTransaction{
SendTransaction: messages.SendTransaction{
Method: &ethbinding.ABIElementMarshaling{
Name: "method1",
},
},
}

rpc := &testRPCClient{
resultWrangler: func(retString interface{}) {
retVal := ""
reflect.ValueOf(retString).Elem().Set(reflect.ValueOf(retVal))
},
}

tx, err := NewSendTxn(&qm.SendTransaction, nil)
assert.NoError(t, err)

res, err := tx.CallAndProcessReply(context.Background(), rpc, "latest")
assert.NoError(t, err)
assert.Empty(t, res)

}
28 changes: 27 additions & 1 deletion internal/messages/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ const (
MsgTypeDeployContract = "DeployContract"
// MsgTypeSendTransaction - send a transaction
MsgTypeSendTransaction = "SendTransaction"
// MsgTypeQuery - perform a call against the blockchain, and return a result
MsgTypeQuery = "Query"
// MsgTypeTransactionSuccess - a transaction receipt where status is 1
MsgTypeTransactionSuccess = "TransactionSuccess"
// MsgTypeTransactionFailure - a transaction receipt where status is 0
Expand All @@ -37,13 +39,31 @@ const (
RecordHeaderAccessToken = "fly-accesstoken"
)

type WebhookReply interface {
RequestID() string
}

// AsyncSentMsg is a standard response for async requests
type AsyncSentMsg struct {
Sent bool `json:"sent"`
Request string `json:"id"`
Msg string `json:"msg,omitempty"`
}

func (asm *AsyncSentMsg) RequestID() string {
if asm == nil {
return ""
}
return asm.Request
}

// SyncQueryReply payload is constructed by txn.CallMethod
type SyncQueryReply map[string]interface{}

func (sqr SyncQueryReply) RequestID() string {
return "n/a"
}

// CommonHeaders are common to all messages
type CommonHeaders struct {
ID string `json:"id,omitempty"`
Expand Down Expand Up @@ -117,14 +137,20 @@ type TransactionCommon struct {
AckType string `json:"acktype,omitempty"`
}

// SendTransaction message instructs the bridge to install a contract
// SendTransaction message instructs the bridge to invoke a smart contract
type SendTransaction struct {
TransactionCommon
To string `json:"to"`
Method *ethbinding.ABIElementMarshaling `json:"method,omitempty"`
MethodName string `json:"methodName,omitempty"`
}

// QueryTransaction message performs a synchronous invocation call to the blockchain
type QueryTransaction struct {
SendTransaction
BlockNumber string `json:"blockNumber,omitempty"`
}

// DeployContract message instructs the bridge to install a contract
type DeployContract struct {
TransactionCommon
Expand Down
14 changes: 14 additions & 0 deletions internal/messages/messages_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,3 +116,17 @@ func TestIsReceiptForError(t *testing.T) {
m = &ErrorReply{}
assert.Nil(m.IsReceipt())
}

func TestAsyncSentMsgRequestID(t *testing.T) {
var asm *AsyncSentMsg
assert.Empty(t, asm.RequestID())
asm = &AsyncSentMsg{
Request: "abcd12345",
}
assert.Equal(t, "abcd12345", asm.RequestID())
}

func TestSyncQueryReplyRequestID(t *testing.T) {
var sqn SyncQueryReply
assert.Equal(t, "n/a", sqn.RequestID())
}
6 changes: 3 additions & 3 deletions internal/rest/restgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func (g *RESTGateway) sendError(res http.ResponseWriter, msg string, code int) {
}

// DispatchMsgAsync is the rest2eth interface method for async dispatching of messages (via our webhook logic)
func (g *RESTGateway) DispatchMsgAsync(ctx context.Context, msg map[string]interface{}, ack, immediateReceipt bool) (*messages.AsyncSentMsg, int, error) {
func (g *RESTGateway) DispatchMsgAsync(ctx context.Context, msg map[string]interface{}, ack, immediateReceipt bool) (messages.WebhookReply, int, error) {
reply, status, err := g.webhooks.processMsg(ctx, msg, ack, immediateReceipt)
return reply, status, err
}
Expand Down Expand Up @@ -302,10 +302,10 @@ func (g *RESTGateway) Start() (err error) {
g.receipts.addRoutes(router)
if len(g.conf.Kafka.Brokers) > 0 {
wk := newWebhooksKafka(&g.conf.Kafka, g.receipts)
g.webhooks = newWebhooks(wk, g.receipts, g.smartContractGW)
g.webhooks = newWebhooks(wk, g.receipts, g.smartContractGW, rpcClient)
} else {
wd := newWebhooksDirect(&g.conf.WebhooksDirectConf, processor, g.receipts)
g.webhooks = newWebhooks(wd, g.receipts, g.smartContractGW)
g.webhooks = newWebhooks(wd, g.receipts, g.smartContractGW, rpcClient)
}
g.webhooks.addRoutes(router)

Expand Down
2 changes: 1 addition & 1 deletion internal/rest/restgateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ func TestDispatchMsgAsyncPassesThroughToWebhooks(t *testing.T) {
g := NewRESTGateway(&printYAML)
fakeHandler := &mockHandler{}
r, _ := newReceiptsTestStore(nil)
g.webhooks = newWebhooks(fakeHandler, r, nil)
g.webhooks = newWebhooks(fakeHandler, r, nil, nil)

var fakeMsg map[string]interface{}
_, status, err := g.DispatchMsgAsync(context.Background(), fakeMsg, true, true)
Expand Down
Loading

0 comments on commit 4a72038

Please sign in to comment.