Skip to content

Commit

Permalink
Switched go-cdn-booster from net/http server to fasthttp server
Browse files Browse the repository at this point in the history
  • Loading branch information
valyala committed Oct 19, 2015
1 parent b2811c8 commit 07f1098
Showing 1 changed file with 75 additions and 133 deletions.
208 changes: 75 additions & 133 deletions apps/go/cdn-booster/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,10 @@
package main

import (
"bufio"
"bytes"
"crypto/tls"
"flag"
"fmt"
"github.com/valyala/ybc/bindings/go/ybc"
"github.com/vharitonsky/iniflags"
"io"
"io/ioutil"
"log"
Expand All @@ -38,6 +36,10 @@ import (
"sync"
"sync/atomic"
"time"

"github.com/valyala/fasthttp"
"github.com/valyala/ybc/bindings/go/ybc"
"github.com/vharitonsky/iniflags"
)

var (
Expand All @@ -52,7 +54,6 @@ var (
httpsKeyFile = flag.String("httpsKeyFile", "/etc/ssl/private/ssl-cert-snakeoil.key", "Path to HTTPS server key. Used only if listenHttpsAddr is set")
httpsListenAddrs = flag.String("httpsListenAddrs", "", "A list of TCP addresses to listen to HTTPS requests. Leave empty if you don't need https")
listenAddrs = flag.String("listenAddrs", ":8098", "A list of TCP addresses to listen to HTTP requests. Leave empty if you don't need http")
maxConnsPerIp = flag.Int("maxConnsPerIp", 32, "The maximum number of concurrent connections from a single ip")
maxIdleUpstreamConns = flag.Int("maxIdleUpstreamConns", 50, "The maximum idle connections to upstream host")
maxItemsCount = flag.Int("maxItemsCount", 100*1000, "The maximum number of items in the cache")
readBufferSize = flag.Int("readBufferSize", 1024, "The size of read buffer for incoming connections")
Expand All @@ -73,15 +74,16 @@ var (
)

var (
cache ybc.Cacher
perIpConnTracker = createPerIpConnTracker()
stats Stats
upstreamClient http.Client
cache ybc.Cacher
stats Stats
upstreamClient http.Client
)

func main() {
iniflags.Parse()

upstreamHostBytes = []byte(*upstreamHost)

runtime.GOMAXPROCS(*goMaxProcs)

cache = createCache()
Expand Down Expand Up @@ -180,132 +182,99 @@ func listen(addr string) net.Listener {
}

func serve(ln net.Listener) {
for {
conn, err := ln.Accept()
if err != nil {
if ne, ok := err.(net.Error); ok && ne.Temporary() {
logMessage("Cannot accept connections due to temporary network error: [%s]", err)
time.Sleep(time.Second)
continue
}
logFatal("Cannot accept connections due to permanent error: [%s]", err)
}
go handleConnection(conn)
s := &fasthttp.Server{
Handler: requestHandler,
Name: "go-cdn-booster",
}
s.Serve(ln)
}

func handleConnection(conn net.Conn) {
defer conn.Close()
var keyPool sync.Pool

clientAddr := conn.RemoteAddr().(*net.TCPAddr).IP.To4()
ipUint32 := ip4ToUint32(clientAddr)
if perIpConnTracker.registerIp(ipUint32) > *maxConnsPerIp {
logMessage("Too many concurrent connections (more than %d) from ip=%s. Denying new connection from the ip", *maxConnsPerIp, clientAddr)
perIpConnTracker.unregisterIp(ipUint32)
func requestHandler(ctx *fasthttp.ServerCtx) {
h := &ctx.Request.Header
if !h.IsMethodGet() {
ctx.Error("Method not allowed", 405)
return
}
defer perIpConnTracker.unregisterIp(ipUint32)

r := bufio.NewReaderSize(conn, *readBufferSize)
w := bufio.NewWriterSize(conn, *writeBufferSize)
clientAddrStr := clientAddr.String()
for {
req, err := http.ReadRequest(r)
if err != nil {
if err != io.EOF {
logMessage("Error when reading http request from ip=%s: [%s]", clientAddr, err)
}
return
}
req.RemoteAddr = clientAddrStr
ok := handleRequest(req, w)
w.Flush()
if !ok || !req.ProtoAtLeast(1, 1) || req.Header.Get("Connection") == "close" {
return
}
}
}

func handleRequest(req *http.Request, w io.Writer) bool {
if req.Method != "GET" {
w.Write(notAllowedResponseHeader)
return false
}

if req.RequestURI == *statsRequestPath {
w.Write(statsResponseHeader)
stats.WriteToStream(w)
return false
if fasthttp.EqualBytesStr(h.RequestURI, *statsRequestPath) {
var w bytes.Buffer
stats.WriteToStream(&w)
ctx.Success("text/plain", w.Bytes())
return
}

if req.Header.Get("If-None-Match") != "" {
_, err := w.Write(ifNoneMatchResponseHeader)
if len(h.Peek("If-None-Match")) > 0 {
ctx.Response.Header.StatusCode = 304
ctx.Response.Header.Set("Etag", "W/\"CacheForever\"")
atomic.AddInt64(&stats.IfNoneMatchHitsCount, 1)
return err == nil
return
}

key := append([]byte(getRequestHost(req)), []byte(req.RequestURI)...)
v := keyPool.Get()
if v == nil {
v = make([]byte, 128)
}
key := v.([]byte)
key = append(key[:0], getRequestHost(h)...)
key = append(key, h.RequestURI...)
item, err := cache.GetDeItem(key, time.Second)
if err != nil {
if err != ybc.ErrCacheMiss {
logFatal("Unexpected error when obtaining cache value by key=[%s]: [%s]", key, err)
}

atomic.AddInt64(&stats.CacheMissesCount, 1)
item = fetchFromUpstream(req, key)
item = fetchFromUpstream(h, key)
if item == nil {
w.Write(serviceUnavailableResponseHeader)
return false
ctx.Error("Service unavailable", 503)
return
}
} else {
atomic.AddInt64(&stats.CacheHitsCount, 1)
}
defer item.Close()
keyPool.Put(v)

contentType, err := loadContentType(req, item)
contentType, err := loadContentType(h, item)
if err != nil {
w.Write(internalServerErrorResponseHeader)
return false
ctx.Error("Internal Server Error", 500)
return
}

if _, err = w.Write(okResponseHeader); err != nil {
return false
}
if _, err = fmt.Fprintf(w, "Content-Type: %s\r\nContent-Length: %d\r\n\r\n", contentType, item.Available()); err != nil {
return false
}
var bytesSent int64
if bytesSent, err = item.WriteTo(w); err != nil {
logRequestError(req, "Cannot send file with key=[%s] to client: %s", key, err)
return false
}
atomic.AddInt64(&stats.BytesSentToClients, bytesSent)
return true
ctx.Response.Header.Set("Etag", "W/\"CacheForever\"")
ctx.Response.Header.Set("Cache-Control", "public, max-age=31536000")
ctx.Response.Header.Set("Content-Type", contentType)
buf := item.Value()
buf = buf[len(buf)-item.Available():]
ctx.Response.Body = append(ctx.Response.Body[:0], buf...)
atomic.AddInt64(&stats.BytesSentToClients, int64(len(ctx.Response.Body)))
}

func fetchFromUpstream(req *http.Request, key []byte) *ybc.Item {
upstreamUrl := fmt.Sprintf("%s://%s%s", *upstreamProtocol, *upstreamHost, req.RequestURI)
func fetchFromUpstream(h *fasthttp.RequestHeader, key []byte) *ybc.Item {
upstreamUrl := fmt.Sprintf("%s://%s%s", *upstreamProtocol, *upstreamHost, h.RequestURI)
upstreamReq, err := http.NewRequest("GET", upstreamUrl, nil)
if err != nil {
logRequestError(req, "Cannot create request structure for [%s]: [%s]", key, err)
logRequestError(h, "Cannot create request structure for [%s]: [%s]", key, err)
return nil
}
upstreamReq.Host = getRequestHost(req)
upstreamReq.Host = string(getRequestHost(h))
resp, err := upstreamClient.Do(upstreamReq)
if err != nil {
logRequestError(req, "Cannot make request for [%s]: [%s]", key, err)
logRequestError(h, "Cannot make request for [%s]: [%s]", key, err)
return nil
}
defer resp.Body.Close()

body, err := ioutil.ReadAll(resp.Body)
if err != nil {
logRequestError(req, "Cannot read response for [%s]: [%s]", key, err)
logRequestError(h, "Cannot read response for [%s]: [%s]", key, err)
return nil
}

if resp.StatusCode != http.StatusOK {
logRequestError(req, "Unexpected status code=%d for the response [%s]", resp.StatusCode, key)
logRequestError(h, "Unexpected status code=%d for the response [%s]", resp.StatusCode, key)
return nil
}

Expand All @@ -317,82 +286,84 @@ func fetchFromUpstream(req *http.Request, key []byte) *ybc.Item {
itemSize := contentLength + len(contentType) + 1
txn, err := cache.NewSetTxn(key, itemSize, ybc.MaxTtl)
if err != nil {
logRequestError(req, "Cannot start set txn for response [%s], itemSize=%d: [%s]", key, itemSize, err)
logRequestError(h, "Cannot start set txn for response [%s], itemSize=%d: [%s]", key, itemSize, err)
return nil
}

if err = storeContentType(req, txn, contentType); err != nil {
if err = storeContentType(h, txn, contentType); err != nil {
txn.Rollback()
return nil
}

n, err := txn.Write(body)
if err != nil {
logRequestError(req, "Cannot read response [%s] body with size=%d to cache: [%s]", key, contentLength, err)
logRequestError(h, "Cannot read response [%s] body with size=%d to cache: [%s]", key, contentLength, err)
txn.Rollback()
return nil
}
if n != contentLength {
logRequestError(req, "Unexpected number of bytes copied=%d from response [%s] to cache. Expected %d", n, key, contentLength)
logRequestError(h, "Unexpected number of bytes copied=%d from response [%s] to cache. Expected %d", n, key, contentLength)
txn.Rollback()
return nil
}
item, err := txn.CommitItem()
if err != nil {
logRequestError(req, "Cannot commit set txn for response [%s], size=%d: [%s]", key, contentLength, err)
logRequestError(h, "Cannot commit set txn for response [%s], size=%d: [%s]", key, contentLength, err)
return nil
}
atomic.AddInt64(&stats.BytesReadFromUpstream, int64(n))
return item
}

func storeContentType(req *http.Request, w io.Writer, contentType string) (err error) {
func storeContentType(h *fasthttp.RequestHeader, w io.Writer, contentType string) (err error) {
strBuf := []byte(contentType)
strSize := len(strBuf)
if strSize > 255 {
logRequestError(req, "Too long content-type=[%s]. Its' length=%d should fit one byte", contentType, strSize)
logRequestError(h, "Too long content-type=[%s]. Its' length=%d should fit one byte", contentType, strSize)
err = fmt.Errorf("Too long content-type")
return
}
var sizeBuf [1]byte
sizeBuf[0] = byte(strSize)
if _, err = w.Write(sizeBuf[:]); err != nil {
logRequestError(req, "Cannot store content-type length in cache: [%s]", err)
logRequestError(h, "Cannot store content-type length in cache: [%s]", err)
return
}
if _, err = w.Write(strBuf); err != nil {
logRequestError(req, "Cannot store content-type string with length=%d in cache: [%s]", strSize, err)
logRequestError(h, "Cannot store content-type string with length=%d in cache: [%s]", strSize, err)
return
}
return
}

func loadContentType(req *http.Request, r io.Reader) (contentType string, err error) {
func loadContentType(h *fasthttp.RequestHeader, r io.Reader) (contentType string, err error) {
var sizeBuf [1]byte
if _, err = r.Read(sizeBuf[:]); err != nil {
logRequestError(req, "Cannot read content-type length from cache: [%s]", err)
logRequestError(h, "Cannot read content-type length from cache: [%s]", err)
return
}
strSize := int(sizeBuf[0])
strBuf := make([]byte, strSize)
if _, err = r.Read(strBuf); err != nil {
logRequestError(req, "Cannot read content-type string with length=%d from cache: [%s]", strSize, err)
logRequestError(h, "Cannot read content-type string with length=%d from cache: [%s]", strSize, err)
return
}
contentType = string(strBuf)
return
}

func getRequestHost(req *http.Request) string {
var upstreamHostBytes []byte

func getRequestHost(h *fasthttp.RequestHeader) []byte {
if *useClientRequestHost {
return req.Host
return h.Host
}
return *upstreamHost
return upstreamHostBytes
}

func logRequestError(req *http.Request, format string, args ...interface{}) {
func logRequestError(h *fasthttp.RequestHeader, format string, args ...interface{}) {
msg := fmt.Sprintf(format, args...)
logMessage("%s - %s - %s - %s. %s", req.RemoteAddr, req.RequestURI, req.Referer(), req.UserAgent(), msg)
logMessage("%s - %s - %s. %s", h.RequestURI, h.Referer, h.UserAgent, msg)
}

func logMessage(format string, args ...interface{}) {
Expand All @@ -405,35 +376,6 @@ func logFatal(format string, args ...interface{}) {
log.Fatalf("%s\n", msg)
}

func ip4ToUint32(ip4 net.IP) uint32 {
return (uint32(ip4[0]) << 24) | (uint32(ip4[1]) << 16) | (uint32(ip4[2]) << 8) | uint32(ip4[3])
}

type PerIpConnTracker struct {
mutex sync.Mutex
perIpConnCount map[uint32]int
}

func (ct *PerIpConnTracker) registerIp(ipUint32 uint32) int {
ct.mutex.Lock()
ct.perIpConnCount[ipUint32] += 1
connCount := ct.perIpConnCount[ipUint32]
ct.mutex.Unlock()
return connCount
}

func (ct *PerIpConnTracker) unregisterIp(ipUint32 uint32) {
ct.mutex.Lock()
ct.perIpConnCount[ipUint32] -= 1
ct.mutex.Unlock()
}

func createPerIpConnTracker() *PerIpConnTracker {
return &PerIpConnTracker{
perIpConnCount: make(map[uint32]int),
}
}

type Stats struct {
CacheHitsCount int64
CacheMissesCount int64
Expand Down

0 comments on commit 07f1098

Please sign in to comment.