go-timeline

module
v0.0.0-...-8ddde9e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 19, 2022 License: MIT

README

go-timeline

a timeline(消息同步模型) service write by golang, thinks aliyun tablestore !!!

depends:

  • mongo: >= 5.0
  • redis: >= 6.2

framework:

architecture

data

mongodb主要有2个文档:

  • chat_sync: 离线消息库(同步库),扩散写。一个消息写N份,当客户端成功拉取消息后,应删除离线消息,释放存储(目前为了调试方便,不会删除)
  • chat_history: 持久化存储库(一般使用mysql,这里为了简单直接使用mongo实现),扩散读,一个消息只写一份。有2个作用:
    • 消息漫游。参考QQ消息漫游功能,可以任意查看几个月前,甚至一年前或者任意时间的历史消息。
    • web支持。因为web没有存储能力(无法缓存timeline同步位点),所以可以不用从 chat_sync 走同步流程,直接通过 chat_history 拉历史消息显示即可。

文档结构:

type Message struct {
	Id      string                 `bson:"id,omitempty"`      // id,非mongo的对象id
	Seq     int64                  `bson:"seq,omitempty"`     // 连续递增序号
	Message map[string]interface{} `bson:"message,omitempty"` // 数据内容
}
  • 离线消息库中:id代表收件人的id,具体的发送者信息、消息内容等需要自己解析Message,timeline服务并不限制存储的结构。
  • 持久存储库:id代表会话(会话关系timeline不存储,需要上游服务自行实现),私聊的会话的id为:samllUserId:bigUserId,群聊会话的ID就是 groupId。故查询的时候,直接按照该规则查询即可。
消息序号生成

timeline 序号生成直接采用 redis 实现,保证同一个id下,seq严格递增即可(且连续)。

example

场景

假设有如下聊天场景:

a -> b: 吃了吗?
b -> a: 吃了

// group_a has member [a, b, c, d]

a -> group_a: 大家好
c -> group_a: 报三围
a -> group_a: 初次见面,多多指教
  • a给b发送私聊消息
  • b回复a
  • a在群中发送一条消息
  • c在群中发送一条消息
  • a在群中发送一条消息

下面介绍主要接口实现。

发消息

message_test.go: TestMessageUseCase_Send:

var (
	user1, user2, user3, user4 = "a", "b", "c", "d"

	group       = "group_a"
)

assert.NoError(t, send(mc, "a", "b", "吃了吗?"))
assert.NoError(t, send(mc, "b", "a", "吃了"))

assert.NoError(t, sendGroup(mc, "a", "group_a", []string{"a", "c", "d"}, "大家好"))
assert.NoError(t, sendGroup(mc, "c", "group_a", []string{"a", "c", "d"}, "报三围"))
assert.NoError(t, sendGroup(mc, "a", "group_a", []string{"a", "c", "d"}, "初次见面,多多指教"))
同步消息(扩散写,存N份)

message_test.go: TestMessageUseCase_GetSyncMessage:

lastRead = 0
msgResult, err := mc.GetSyncMessage(context.Background(), user1, lastRead, math.MaxInt64)
  • user1(a)
message_test.go:132: [seq=20] a -> b: 吃了吗?
message_test.go:132: [seq=21] b -> a: 吃了
message_test.go:132: [seq=22] a -> group_a: 大家好
message_test.go:132: [seq=23] c -> group_a: 报三围
message_test.go:132: [seq=24] a -> group_a: 初次见面,多多指教
  • user2(b)
message_test.go:132: [seq=16] a -> b: 吃了吗?
message_test.go:132: [seq=17] b -> a: 吃了
  • user3(c)
message_test.go:132: [seq=13] a -> group_a: 大家好
message_test.go:132: [seq=14] c -> group_a: 报三围
message_test.go:132: [seq=15] a -> group_a: 初次见面,多多指教
  • user4(d)
message_test.go:132: [seq=13] a -> group_a: 大家好
message_test.go:132: [seq=14] c -> group_a: 报三围
message_test.go:132: [seq=15] a -> group_a: 初次见面,多多指教
查询历史消息(扩散读,只存一份)
lastRead = 0
msgResult, err := mc.GetSingleHistoryMessage(context.Background(), "a", "b", lastRead, 10)
message_test.go:132: [seq=13] a -> b: 吃了吗?
message_test.go:132: [seq=14] b -> a: 吃了
lastRead = 0
msgResult, err := mc.GetGroupHistoryMessage(context.Background(), "group_a", lastRead, 10)
message_test.go:132: [seq=22] a -> group_a: 大家好
message_test.go:132: [seq=23] c -> group_a: 报三围
message_test.go:132: [seq=24] a -> group_a: 初次见面,多多指教

screenhost

mongodb

./doc/img/mongo.jpg

client

see html client:

./doc/img/screenhost.jpg

Usage

grpc
  1. go mod
go get github.com/gomicroim/go-timeline/api@latest
  1. grpc client example by use etcd
package main

import (
	"context"
	"encoding/json"
	"flag"
	"github.com/go-kratos/kratos/contrib/registry/etcd/v2"
	"github.com/go-kratos/kratos/v2/registry"
	"github.com/go-kratos/kratos/v2/transport/grpc"
	v1 "github.com/gomicroim/go-timeline/api/v1"
	clientv3 "go.etcd.io/etcd/client/v3"
	"log"
)

var (
	endpoints = flag.String("etcd", "127.0.0.1:2379", "-etcd=127.0.0.1:2379")
)

func main() {
	flag.Parse()

	// init etcd conn
	etcdClient, err := clientv3.New(clientv3.Config{
		Endpoints: []string{*endpoints},
	})
	if err != nil {
		panic(err)
	}
	dis := etcd.New(etcdClient)
	log.Println("connect etcd", "etcd", *endpoints)

	// new timeline client
	client, err := NewTimelineClient("go-timeline", dis)
	if err != nil {
		panic(err)
	}

	log.Println("send msg")

	msgData := map[string]interface{}{"text:": "hello"}
	buffer, _ := json.Marshal(msgData)

	// call grpc `send`
	res, err := client.Send(context.Background(), &v1.SendMessageRequest{
		From:    "user_a",
		To:      "user_z",
		Message: string(buffer),
	})
	if err != nil {
		panic(err)
	}
	log.Println("send success,result:", res)
}

func NewTimelineClient(serviceName string, discovery registry.Discovery) (v1.TimelineClient, error) {
	rpcUserEndpoint := "discovery:///" + serviceName
	conn, err := grpc.DialInsecure(context.Background(),
		grpc.WithEndpoint(rpcUserEndpoint),
		grpc.WithDiscovery(discovery),
	)
	if err != nil {
		return nil, err
	}
	return v1.NewTimelineClient(conn), nil
}
  1. discovery
http
/timeline/send

request:

curl -X POST http://localhost:8000/timeline/send \
-H 'Content-Type:application/json' \
-d '{"from":"user_a","to":"user_b","message":"{\"from\": \"user_a\",\"to\":\"user_b\",\"text\":\"在吗?\"}"}'
{
    "from":"user_a",
    "to":"user_b",
    "message":"{\"from\": \"user_a\",\"to\":\"user_b\",\"text\":\"在吗?\"}"
}

response:

{"sequence":"3"}
/timeline/sendGroup

request:

curl -X POST http://localhost:8000/timeline/sendGroup \
-H 'Content-Type:application/json' \
-d '{"group_name":"test_group","group_members":["user_a","user_b","user_c","user_d"],"message":"{\"from\":\"user_a\",\"to\":\"test_group\",\"is_group\":true,\"text\":\"大家好\"}"}'
{
	"group_name": "test_group",
	"group_members": ["user_a", "user_b", "user_c", "user_d"],
	"message": "{\"from\":\"user_a\",\"to\":\"test_group\",\"is_group\":true,\"text\":\"大家好\"}"
}

response:

{"failedMembers":[]}
/timeline/sync

request:

curl -X GET http://localhost:8000/timeline/sync?member=user_a&last_read=0&count=10

response:

{
	"entrySet": [{
		"sequence": "3",
		"message": "{\"from\":\"user_a\",\"text\":\"在吗?\",\"to\":\"user_b\"}"
	}, {
		"sequence": "4",
		"message": "{\"from\":\"user_a\",\"is_group\":true,\"text\":\"大家好\",\"to\":\"test_group\"}"
	}]
}
/timeline/history/single/{from}/{to}

request:

curl -X GET http://localhost:8000/timeline/history/single/user_a/user_b

response:

{
	"entrySet": [{
		"sequence": "7",
		"message": "{\"from\":\"user_a\",\"text\":\"在吗?\",\"to\":\"user_b\"}"
	}]
}
/timeline/history/group/{group}

request:

curl -X GET http://localhost:8000/timeline/history/group/test_group

response:

{
	"entrySet": [{
		"sequence": "1",
		"message": "{\"from\":\"user_a\",\"is_group\":true,\"text\":\"大家好\",\"to\":\"test_group\"}"
	}]
}

Directories

Path Synopsis
api module
cmd
example
internal
biz

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL