Skip to content

Commit

Permalink
Merge branch 'master' into rpc-config-timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
bifurcation committed Nov 22, 2015
2 parents a468722 + 7e89931 commit ab19e33
Showing 1 changed file with 67 additions and 42 deletions.
109 changes: 67 additions & 42 deletions rpc/amqp-rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"crypto/rand"
"crypto/tls"
"crypto/x509"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -214,79 +215,101 @@ type rpcError struct {

// Wraps a error in a rpcError so it can be marshalled to
// JSON.
func wrapError(err error) (rpcError rpcError) {
func wrapError(err error) *rpcError {
if err != nil {
rpcError.Value = err.Error()
wrapped := &rpcError{
Value: err.Error(),
}
switch err.(type) {
case core.InternalServerError:
rpcError.Type = "InternalServerError"
wrapped.Type = "InternalServerError"
case core.NotSupportedError:
rpcError.Type = "NotSupportedError"
wrapped.Type = "NotSupportedError"
case core.MalformedRequestError:
rpcError.Type = "MalformedRequestError"
wrapped.Type = "MalformedRequestError"
case core.UnauthorizedError:
rpcError.Type = "UnauthorizedError"
wrapped.Type = "UnauthorizedError"
case core.NotFoundError:
rpcError.Type = "NotFoundError"
wrapped.Type = "NotFoundError"
case core.SyntaxError:
rpcError.Type = "SyntaxError"
wrapped.Type = "SyntaxError"
case core.SignatureValidationError:
rpcError.Type = "SignatureValidationError"
wrapped.Type = "SignatureValidationError"
case core.CertificateIssuanceError:
rpcError.Type = "CertificateIssuanceError"
wrapped.Type = "CertificateIssuanceError"
case core.NoSuchRegistrationError:
rpcError.Type = "NoSuchRegistrationError"
wrapped.Type = "NoSuchRegistrationError"
case core.TooManyRPCRequestsError:
rpcError.Type = "TooManyRPCRequestsError"
wrapped.Type = "TooManyRPCRequestsError"
case core.RateLimitedError:
rpcError.Type = "RateLimitedError"
wrapped.Type = "RateLimitedError"
case core.ServiceUnavailableError:
rpcError.Type = "ServiceUnavailableError"
wrapped.Type = "ServiceUnavailableError"
}
return wrapped
}
return
return nil
}

// Unwraps a rpcError and returns the correct error type.
func unwrapError(rpcError rpcError) (err error) {
if rpcError.Value != "" {
func unwrapError(rpcError *rpcError) error {
if rpcError != nil {
switch rpcError.Type {
case "InternalServerError":
err = core.InternalServerError(rpcError.Value)
return core.InternalServerError(rpcError.Value)
case "NotSupportedError":
err = core.NotSupportedError(rpcError.Value)
return core.NotSupportedError(rpcError.Value)
case "MalformedRequestError":
err = core.MalformedRequestError(rpcError.Value)
return core.MalformedRequestError(rpcError.Value)
case "UnauthorizedError":
err = core.UnauthorizedError(rpcError.Value)
return core.UnauthorizedError(rpcError.Value)
case "NotFoundError":
err = core.NotFoundError(rpcError.Value)
return core.NotFoundError(rpcError.Value)
case "SyntaxError":
err = core.SyntaxError(rpcError.Value)
return core.SyntaxError(rpcError.Value)
case "SignatureValidationError":
err = core.SignatureValidationError(rpcError.Value)
return core.SignatureValidationError(rpcError.Value)
case "CertificateIssuanceError":
err = core.CertificateIssuanceError(rpcError.Value)
return core.CertificateIssuanceError(rpcError.Value)
case "NoSuchRegistrationError":
err = core.NoSuchRegistrationError(rpcError.Value)
return core.NoSuchRegistrationError(rpcError.Value)
case "TooManyRPCRequestsError":
err = core.TooManyRPCRequestsError(rpcError.Value)
return core.TooManyRPCRequestsError(rpcError.Value)
case "RateLimitedError":
err = core.RateLimitedError(rpcError.Value)
return core.RateLimitedError(rpcError.Value)
case "ServiceUnavailableError":
err = core.ServiceUnavailableError(rpcError.Value)
return core.ServiceUnavailableError(rpcError.Value)
default:
err = errors.New(rpcError.Value)
return errors.New(rpcError.Value)
}
}
return
return nil
}

// rpcResponse is a stuct for wire-representation of response messages
// used by DispatchSync
type rpcResponse struct {
ReturnVal []byte `json:"returnVal,omitempty"`
Error rpcError `json:"error,omitempty"`
ReturnVal []byte `json:"returnVal"`
Error *rpcError `json:"error,omitempty"`
}

// Hack: Some of our RPCs return DER directly. If we log it naively it will
// just be a bunch of numbers. It's easy to detect DER, so we use this function
// before logging to base64-encode anything that looks like DER.
func safeDER(input []byte) string {
if len(input) > 0 && input[0] == 0x30 {
return string(base64.RawStdEncoding.EncodeToString(input))
}
return string(input)
}

// Used for debug logging
func (r rpcResponse) debugString() string {
ret := safeDER(r.ReturnVal)
if r.Error == nil {
return ret
}
return fmt.Sprintf("%s, RPCERR: %s", ret, r.Error)
}

// AmqpChannel sets a AMQP connection up using SSL if configuration is provided
Expand Down Expand Up @@ -365,10 +388,10 @@ func AmqpChannel(conf *cmd.AMQPConfig) (*amqp.Channel, error) {
func (rpc *AmqpRPCServer) processMessage(msg amqp.Delivery) {
// XXX-JWS: jws.Verify(body)
cb, present := rpc.dispatchTable[msg.Type]
rpc.log.Info(fmt.Sprintf(" [s<][%s][%s] received %s(%s) [%s]", rpc.serverQueue, msg.ReplyTo, msg.Type, core.B64enc(msg.Body), msg.CorrelationId))
rpc.log.Info(fmt.Sprintf(" [s<][%s][%s] received %s(%s) [%s]", rpc.serverQueue, msg.ReplyTo, msg.Type, safeDER(msg.Body), msg.CorrelationId))
if !present {
// AUDIT[ Misrouted Messages ] f523f21f-12d2-4c31-b2eb-ee4b7d96d60e
rpc.log.Audit(fmt.Sprintf(" [s<][%s][%s] Misrouted message: %s - %s - %s", rpc.serverQueue, msg.ReplyTo, msg.Type, core.B64enc(msg.Body), msg.CorrelationId))
rpc.log.Audit(fmt.Sprintf(" [s<][%s][%s] Misrouted message: %s - %s - %s", rpc.serverQueue, msg.ReplyTo, msg.Type, safeDER(msg.Body), msg.CorrelationId))
return
}
var response rpcResponse
Expand All @@ -381,10 +404,7 @@ func (rpc *AmqpRPCServer) processMessage(msg amqp.Delivery) {
rpc.log.Audit(fmt.Sprintf(" [s>][%s][%s] Error condition marshalling RPC response %s [%s]", rpc.serverQueue, msg.ReplyTo, msg.Type, msg.CorrelationId))
return
}
if response.Error.Value != "" {
rpc.log.Info(fmt.Sprintf(" [s>][%s][%s] %s failed, replying: %s (%s) [%s]", rpc.serverQueue, msg.ReplyTo, msg.Type, response.Error.Value, response.Error.Type, msg.CorrelationId))
}
rpc.log.Debug(fmt.Sprintf(" [s>][%s][%s] replying %s(%s) [%s]", rpc.serverQueue, msg.ReplyTo, msg.Type, core.B64enc(jsonResponse), msg.CorrelationId))
rpc.log.Debug(fmt.Sprintf(" [s>][%s][%s] replying %s: %s [%s]", rpc.serverQueue, msg.ReplyTo, msg.Type, response.debugString(), msg.CorrelationId))
rpc.connection.publish(
msg.ReplyTo,
msg.CorrelationId,
Expand Down Expand Up @@ -589,7 +609,6 @@ func NewAmqpRPCClient(
continue
}

rpc.log.Debug(fmt.Sprintf(" [c<][%s] response %s(%s) [%s]", clientQueue, msg.Type, core.B64enc(msg.Body), corrID))
responseChan <- msg.Body
rpc.mu.Lock()
delete(rpc.pending, corrID)
Expand Down Expand Up @@ -617,13 +636,18 @@ func (rpc *AmqpRPCCLient) dispatch(method string, body []byte) (string, chan []b
// At least in some cases, it's important that this channel
// be buffered to avoid deadlock
responseChan := make(chan []byte, 1)
corrID := core.NewToken()
corrIDBytes := make([]byte, 8)
_, err := rand.Read(corrIDBytes)
if err != nil {
panic("randomness failed")
}
corrID := base64.RawURLEncoding.EncodeToString(corrIDBytes)
rpc.mu.Lock()
rpc.pending[corrID] = responseChan
rpc.mu.Unlock()

// Send the request
rpc.log.Debug(fmt.Sprintf(" [c>][%s] requesting %s(%s) [%s]", rpc.clientQueue, method, core.B64enc(body), corrID))
rpc.log.Debug(fmt.Sprintf(" [c>][%s] requesting %s(%s) [%s]", rpc.clientQueue, method, safeDER(body), corrID))
rpc.connection.publish(
rpc.serverQueue,
corrID,
Expand All @@ -644,6 +668,7 @@ func (rpc *AmqpRPCCLient) DispatchSync(method string, body []byte) (response []b
case jsonResponse := <-responseChan:
var rpcResponse rpcResponse
err = json.Unmarshal(jsonResponse, &rpcResponse)
rpc.log.Debug(fmt.Sprintf(" [c<][%s] response %s: %s [%s]", rpc.clientQueue, method, rpcResponse.debugString(), corrID))
if err != nil {
return
}
Expand Down

0 comments on commit ab19e33

Please sign in to comment.