rocketmq

package
v1.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 24, 2019 License: Apache-2.0 Imports: 7 Imported by: 0

Documentation

Overview

* Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License.

* Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License.

* Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License.

* Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License.

* Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License.

* Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License.

* Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License.

* Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License.

Index

Constants

View Source
const (
	BroadCasting = MessageModel(1)
	Clustering   = MessageModel(2)
)
View Source
const (
	NIL                        = rmqError(C.OK)
	ErrNullPoint               = rmqError(C.NULL_POINTER)
	ErrMallocFailed            = rmqError(C.MALLOC_FAILED)
	ErrProducerStartFailed     = rmqError(C.PRODUCER_START_FAILED)
	ErrSendSyncFailed          = rmqError(C.PRODUCER_SEND_SYNC_FAILED)
	ErrSendOnewayFailed        = rmqError(C.PRODUCER_SEND_ONEWAY_FAILED)
	ErrSendOrderlyFailed       = rmqError(C.PRODUCER_SEND_ORDERLY_FAILED)
	ErrPushConsumerStartFailed = rmqError(C.PUSHCONSUMER_ERROR_CODE_START)
	ErrPullConsumerStartFailed = rmqError(C.PULLCONSUMER_START_FAILED)
	ErrFetchMQFailed           = rmqError(C.PULLCONSUMER_FETCH_MQ_FAILED)
	ErrFetchMessageFailed      = rmqError(C.PULLCONSUMER_FETCH_MESSAGE_FAILED)
)
View Source
const (
	LogLevelFatal = LogLevel(C.E_LOG_LEVEL_FATAL)
	LogLevelError = LogLevel(C.E_LOG_LEVEL_ERROR)
	LogLevelWarn  = LogLevel(C.E_LOG_LEVEL_WARN)
	LogLevelInfo  = LogLevel(C.E_LOG_LEVEL_INFO)
	LogLevelDebug = LogLevel(C.E_LOG_LEVEL_DEBUG)
	LogLevelTrace = LogLevel(C.E_LOG_LEVEL_TRACE)
	LogLevelNum   = LogLevel(C.E_LOG_LEVEL_LEVEL_NUM)
)

predefined log level

View Source
const (
	SendOK                = SendStatus(C.E_SEND_OK)
	SendFlushDiskTimeout  = SendStatus(C.E_SEND_FLUSH_DISK_TIMEOUT)
	SendFlushSlaveTimeout = SendStatus(C.E_SEND_FLUSH_SLAVE_TIMEOUT)
	SendSlaveNotAvailable = SendStatus(C.E_SEND_SLAVE_NOT_AVAILABLE)
)
View Source
const (
	PullFound         = PullStatus(C.E_FOUND)
	PullNoNewMsg      = PullStatus(C.E_NO_NEW_MSG)
	PullNoMatchedMsg  = PullStatus(C.E_NO_MATCHED_MSG)
	PullOffsetIllegal = PullStatus(C.E_OFFSET_ILLEGAL)
	PullBrokerTimeout = PullStatus(C.E_BROKER_TIMEOUT)
)

predefined pull status

View Source
const (
	ConsumeSuccess = ConsumeStatus(C.E_CONSUME_SUCCESS)
	ReConsumeLater = ConsumeStatus(C.E_RECONSUME_LATER)
)
View Source
const GoClientVersion = "Go Client V1.2.0, Support CPP Core:V1.2.X"

Variables

This section is empty.

Functions

func GetVersion

func GetVersion() (version string)

func Version

func Version() (version string)

Types

type ClientConfig

type ClientConfig struct {
	GroupID          string
	NameServer       string
	NameServerDomain string
	GroupName        string
	InstanceName     string
	Credentials      *SessionCredentials
	LogC             *LogConfig
}

func (*ClientConfig) String

func (config *ClientConfig) String() string

type ConsumeStatus

type ConsumeStatus int

func (ConsumeStatus) String

func (status ConsumeStatus) String() string

type LogConfig

type LogConfig struct {
	Path     string
	FileNum  int
	FileSize int64
	Level    LogLevel
}

LogConfig the log configuration for the pull consumer

func (*LogConfig) String

func (lc *LogConfig) String() string

type LogLevel

type LogLevel int

LogLevel the log level

func (LogLevel) String

func (l LogLevel) String() string

type Message

type Message struct {
	Topic          string
	Tags           string
	Keys           string
	Body           string
	DelayTimeLevel int
	Property       map[string]string
}

func (*Message) String

func (msg *Message) String() string

type MessageExt

type MessageExt struct {
	Message
	MessageID                 string
	QueueId                   int
	ReconsumeTimes            int
	StoreSize                 int
	BornTimestamp             int64
	StoreTimestamp            int64
	QueueOffset               int64
	CommitLogOffset           int64
	PreparedTransactionOffset int64
	// contains filtered or unexported fields
}

func (*MessageExt) GetProperty

func (msgExt *MessageExt) GetProperty(key string) string

func (*MessageExt) String

func (msgExt *MessageExt) String() string

type MessageModel

type MessageModel int

func (MessageModel) String

func (mode MessageModel) String() string

type MessageQueue

type MessageQueue struct {
	Topic  string
	Broker string
	ID     int
}

MessageQueue the queue of the message

func (*MessageQueue) String

func (q *MessageQueue) String() string

type MessageQueueSelector

type MessageQueueSelector interface {
	Select(size int, m *Message, arg interface{}) int
}

MessageQueueSelector select one message queue

type Producer

type Producer interface {

	// SendMessageSync send a message with sync
	SendMessageSync(msg *Message) (*SendResult, error)

	// SendMessageOrderly send the message orderly
	SendMessageOrderly(
		msg *Message,
		selector MessageQueueSelector,
		arg interface{},
		autoRetryTimes int) (*SendResult, error)

	// SendMessageOneway send a message with oneway
	SendMessageOneway(msg *Message) error
	// contains filtered or unexported methods
}

func NewProducer

func NewProducer(config *ProducerConfig) (Producer, error)

NewProducer create a new producer with config

type ProducerConfig

type ProducerConfig struct {
	ClientConfig
	SendMsgTimeout int
	CompressLevel  int
	MaxMessageSize int
}

ProducerConfig define a producer

func (*ProducerConfig) String

func (config *ProducerConfig) String() string

type PullConsumer

type PullConsumer interface {

	// Pull returns the messages from the consume queue by specify the offset and the max number
	Pull(mq MessageQueue, subExpression string, offset int64, maxNums int) PullResult

	// FetchSubscriptionMessageQueues returns the consume queue of the topic
	FetchSubscriptionMessageQueues(topic string) []MessageQueue
	// contains filtered or unexported methods
}

PullConsumer consumer pulling the message

func NewPullConsumer

func NewPullConsumer(config *PullConsumerConfig) (PullConsumer, error)

NewPullConsumer creates a pull consumer

type PullConsumerConfig

type PullConsumerConfig struct {
	ClientConfig
}

PullConsumerConfig the configuration for the pull consumer

func (*PullConsumerConfig) String

func (config *PullConsumerConfig) String() string

type PullResult

type PullResult struct {
	NextBeginOffset int64
	MinOffset       int64
	MaxOffset       int64
	Status          PullStatus
	Messages        []*MessageExt
}

PullResult the pull result

func (*PullResult) String

func (pr *PullResult) String() string

type PullStatus

type PullStatus int

PullStatus pull status

func (PullStatus) String

func (ps PullStatus) String() string

type PushConsumer

type PushConsumer interface {

	// Subscribe a new topic with specify filter expression and consume function.
	Subscribe(topic, expression string, consumeFunc func(msg *MessageExt) ConsumeStatus) error
	// contains filtered or unexported methods
}

func NewPushConsumer

func NewPushConsumer(config *PushConsumerConfig) (PushConsumer, error)

NewPushConsumer create a new consumer with config.

type PushConsumerConfig

type PushConsumerConfig struct {
	ClientConfig
	ThreadCount         int
	MessageBatchMaxSize int
	Model               MessageModel
}

PushConsumerConfig define a new consumer.

func (*PushConsumerConfig) String

func (config *PushConsumerConfig) String() string

type SendResult

type SendResult struct {
	Status SendStatus
	MsgId  string
	Offset int64
}

func (*SendResult) String

func (result *SendResult) String() string

type SendStatus

type SendStatus int

func (SendStatus) String

func (status SendStatus) String() string

type SessionCredentials

type SessionCredentials struct {
	AccessKey string
	SecretKey string
	Channel   string
}

func (*SessionCredentials) String

func (session *SessionCredentials) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL