gateway

package
v0.0.0-...-077b43f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 27, 2017 License: Apache-2.0 Imports: 70 Imported by: 5

Documentation

Overview

Package gateway implements the full PubSub gateway features. It's the core of PubSub system.

Index

Constants

View Source
const (
	HttpHeaderXForwardedFor   = "X-Forwarded-For"
	HttpHeaderPartition       = "X-Partition"
	HttpHeaderOffset          = "X-Offset"
	HttpHeaderMsgBury         = "X-Bury"
	HttpHeaderMsgKey          = "X-Key"
	HttpHeaderMsgTag          = "X-Tag"
	HttpHeaderJobId           = "X-Job-Id"
	HttpHeaderAcceptEncoding  = "Accept-Encoding"
	HttpHeaderContentEncoding = "Content-Encoding"
	HttpEncodingGzip          = "gzip"

	UrlParamTopic   = "topic"
	UrlParamVersion = "ver"
	UrlParamAppid   = "appid"
	UrlParamGroup   = "group"

	MaxPartitionKeyLen = 256
)
View Source
const (
	TagMarkStart = byte(1) // FIXME conflicts with ProtocolBuffer
	TagMarkEnd   = byte(2)
	TagSeperator = ";" // follow cookie rules a=b;c=d
)

Variables

View Source
var (
	ResponseOk = []byte(`{"ok":1}`)

	HttpHeaderAppid  = "Appid"
	HttpHeaderPubkey = "Pubkey"
	HttpHeaderSubkey = "Subkey"
)
View Source
var (
	ErrClientGone           = errors.New("remote client gone")
	ErrTooBigMessage        = errors.New("too big message")
	ErrTooSmallMessage      = errors.New("too small message")
	ErrIllegalTaggedMessage = errors.New("illegal tagged message")
	ErrClientKilled         = errors.New("client killed")
	ErrBadResponseWriter    = errors.New("ResponseWriter Close not supported")
	ErrPartitionOutOfRange  = errors.New("partition out of range")
	ErrOffsetOutOfRange     = errors.New("offset out of range")
)
View Source
var (
	Options struct {
		Id                         string
		Zone                       string
		ConfigFile                 string
		PubHttpAddr                string
		PubHttpsAddr               string
		SubHttpAddr                string
		SubHttpsAddr               string
		ManHttpAddr                string
		ManHttpsAddr               string
		DebugHttpAddr              string
		Store                      string
		JobStore                   string
		ManagerStore               string
		PidFile                    string
		CertFile                   string
		KeyFile                    string
		LogFile                    string
		LogLevel                   string
		CrashLogFile               string
		DummyCluster               string
		InfluxServer               string
		InfluxDbName               string
		KillFile                   string
		HintedHandoffType          string
		HintedHandoffDir           string
		AllwaysHintedHandoff       bool
		ShowVersion                bool
		Ratelimit                  bool
		PermitStandbySub           bool
		DisableMetrics             bool
		EnableHintedHandoff        bool
		HintedHandoffBufio         bool
		FlushHintedOffOnly         bool
		BadGroupRateLimit          bool
		BadPubAppRateLimit         bool
		RunSwaggerServer           bool
		AuditPub                   bool
		AuditSub                   bool
		EnableGzip                 bool
		DryRun                     bool
		CpuAffinity                bool
		EnableAccessLog            bool
		EnableHttpPanicRecover     bool
		GolangTrace                bool
		PermitUnregisteredGroup    bool
		UseCompress                bool
		Debug                      bool
		EnableRegistry             bool
		HttpHeaderMaxBytes         int
		MaxPubSize                 int64
		MaxJobSize                 int64
		LogRotateSize              int
		MaxMsgTagLen               int
		MinPubSize                 int
		PubQpsLimit                int64
		MaxSubBatchSize            int
		MaxClients                 int
		MaxRequestPerConn          int // to make load balancer distribute request even for persistent conn
		PubPoolCapcity             int
		AssignJobShardId           int // how to assign shard id for new app
		PubPoolIdleTimeout         time.Duration
		SubTimeout                 time.Duration
		OffsetCommitInterval       time.Duration
		BadClientPunishDuration    time.Duration
		InternalServerErrorBackoff time.Duration
		ReporterInterval           time.Duration
		MetaRefresh                time.Duration
		ManagerRefresh             time.Duration
		HttpReadTimeout            time.Duration
		HttpWriteTimeout           time.Duration
		MaxWaitBeforeForceClose    time.Duration
	}
)

Functions

func AddTagToMessage

func AddTagToMessage(m *mpool.Message, tag string)

┌────────────────────────────┐ ┌────────┐ │TagMarkStart Tag TagMarkEnd │ │Message │ └────────────────────────────┘ └────────┘

func EnsureServerUlimit

func EnsureServerUlimit()

func ExtractMessageTag

func ExtractMessageTag(msg []byte) ([]string, int, error)

func FastListener

func FastListener(gw *Gateway, l net.Listener) net.Listener

func IsTaggedMessage

func IsTaggedMessage(msg []byte) bool

func LimitListener

func LimitListener(name string, gw *Gateway, l net.Listener, n int) net.Listener

LimitListener returns a Listener that accepts at most n simultaneous connections from the provided Listener.

func NewPubMetrics

func NewPubMetrics(gw *Gateway) *pubMetrics

func NewServerMetrics

func NewServerMetrics(interval time.Duration, gw *Gateway) *serverMetrics

func NewSubMetrics

func NewSubMetrics(gw *Gateway) *subMetrics

func ParseFlags

func ParseFlags()

func SetupLogging

func SetupLogging(logFile, level, crashLogFile string)

func ShouldUseForwardedProto

func ShouldUseForwardedProto() bool

ShouldUseForwardedProto returns whether to trust the X-Forwarded-Proto header field. DefaultConfig.HTTPSUseForwardedProto is initialized to this value.

This value depends on the particular environment where the package is built. It is currently true iff build constraint "heroku" is satisfied.

func ValidateFlags

func ValidateFlags()

Types

type AccessLogger

type AccessLogger struct {
	// contains filtered or unexported fields
}

AccessLogger is a daily rotating/unblocking logger to record access log.

func NewAccessLogger

func NewAccessLogger(fn string, poolSize int) *AccessLogger

func (*AccessLogger) Discarded

func (this *AccessLogger) Discarded() uint64

func (*AccessLogger) Log

func (this *AccessLogger) Log(line []byte)

Caution: NEVER call Log after Stop is called.

func (*AccessLogger) Start

func (this *AccessLogger) Start() error

func (*AccessLogger) Stop

func (this *AccessLogger) Stop()

type FramePolicy

type FramePolicy string

FramePolicy tells the browser under what circumstances to allow the response to be displayed inside an HTML frame. There are three options:

Deny            do not permit display in a frame
SameOrigin      permit display in a frame from the same origin
AllowFrom(url)  permit display in a frame from the given url
const (
	Deny       FramePolicy = "DENY"
	SameOrigin FramePolicy = "SAMEORIGIN"
)

func AllowFrom

func AllowFrom(url string) FramePolicy

AllowFrom returns a FramePolicy specifying that the requested resource should be included in a frame from only the given url.

type Gateway

type Gateway struct {
	// contains filtered or unexported fields
}

Gateway is a distributed Pub/Sub HTTP endpoint.

Working with ehaproxy, it can form a Pub/Sub cluster system.

func New

func New(id string) *Gateway

func (*Gateway) InstanceInfo

func (this *Gateway) InstanceInfo() []byte

func (*Gateway) ServeForever

func (this *Gateway) ServeForever()

func (*Gateway) Start

func (this *Gateway) Start() (err error)

type Message

type Message struct {
	Partition int32
	Offset    int64
	Value     []byte
}

func DecodeMessageSet

func DecodeMessageSet(messageSet []byte) []Message

type SecurityConfig

type SecurityConfig struct {
	// If true, redirects any request with scheme http to the equivalent https URL.
	HTTPSRedirect          bool
	HTTPSUseForwardedProto bool

	// Allow cleartext (non-HTTPS) HTTP connections to a loopback
	// address, even if HTTPSRedirect is true.
	PermitClearLoopback bool

	// If true, sets X-Content-Type-Options to "nosniff".
	ContentTypeOptions bool

	// If true, sets the HTTP Strict Transport Security header
	// field, which instructs browsers to send future requests
	// over HTTPS, even if the URL uses the unencrypted http
	// scheme.
	HSTS                  bool
	HSTSMaxAge            time.Duration
	HSTSIncludeSubdomains bool

	// If true, sets X-Frame-Options, to control when the request
	// should be displayed inside an HTML frame.
	FrameOptions       bool
	FrameOptionsPolicy FramePolicy

	// If true, sets X-XSS-Protection to "1", optionally with
	// "mode=block". See the official documentation, linked above,
	// for the meaning of these values.
	XSSProtection      bool
	XSSProtectionBlock bool

	// Used by ServeHTTP, after setting any extra headers, to
	// reply to the request. Next is typically nil, in which case
	// http.DefaultServeMux is used instead.
	Next http.Handler
}

SecurityConfig adds some HTTP header fields widely considered to improve safety of HTTP requests. These fields are documented as follows:

Strict Transport Security: https://tools.ietf.org/html/rfc6797
Frame Options:             https://tools.ietf.org/html/draft-ietf-websec-x-frame-options-00
Cross Site Scripting:      http://msdn.microsoft.com/en-us/library/dd565647%28v=vs.85%29.aspx
Content Type Options:      http://msdn.microsoft.com/en-us/library/ie/gg622941%28v=vs.85%29.aspx

func (*SecurityConfig) ServeHTTP

func (c *SecurityConfig) ServeHTTP(w http.ResponseWriter, r *http.Request)

type SubStatus

type SubStatus struct {
	Appid          string `json:"appid,omitempty"`
	Group          string `json:"group"`
	Topic          string `json:"topic,omitempty"`
	Partition      string `json:"partition"`
	ProducedOldest int64  `json:"pold"`
	ProducedNewest int64  `json:"pubd"`
	Consumed       int64  `json:"subd"`
	ClientRealIP   string `json:"realip"`
}

type WriterWrapper

type WriterWrapper interface {
	http.ResponseWriter

	// Status returns the HTTP status of the request, or 0 if one has not
	// yet been sent.
	Status() int

	// BytesWritten returns the total number of bytes sent to the client.
	BytesWritten() int
}

func SniffWriter

func SniffWriter(w http.ResponseWriter) WriterWrapper

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL