orm

package module
v1.79.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 5, 2021 License: MIT Imports: 32 Imported by: 1

README

orm

codecov Go Report Card MIT license

ORM that delivers support for full stack data access:

  • MySQL - for relational data
  • Redis - for NoSQL in memory shared cache
  • Elastic Search - for full text search
  • Local Cache - in memory local (not shared) cache
  • ClickHouse - time series database

Menu:

Configuration

First you need to define Registry object and register all connection pools to MySQL, Redis and local cache. Use this object to register queues, and entities. You should create this object once when application starts.

package main

import "github.com/summer-solutions/orm"

func main() {

    registry := &Registry{}

    /*MySQL */
    registry.RegisterMySQLPool("root:root@tcp(localhost:3306)/database_name?limit_connections=10") // you should define max connections, default 100
    //optionally you can define pool name as second argument
    registry.RegisterMySQLPool("root:root@tcp(localhost:3307)/database_name", "second_pool")
    registry.DefaultEncoding("utf8") //optional, default is utf8mb4

    /* Redis */
    registry.RegisterRedis("localhost:6379", 0)
    //optionally you can define pool name as second argument
    registry.RegisterRedis("localhost:6379", 1, "second_pool")

    /* Redis sentinel */
    registry.RegisterRedisSentinel("mymaster", 0, []string{":26379", "192.23.12.33:26379", "192.23.12.35:26379"})
    // redis database number set to 2
    registry.RegisterRedisSentinel("mymaster", 2, []string{":26379", "192.23.12.11:26379", "192.23.12.12:26379"}, "second_pool") 

    /* Local cache (in memory) */
    registry.RegisterLocalCache(1000) //you need to define cache size
    //optionally you can define pool name as second argument
    registry.RegisterLocalCache(100, "second_pool")

    /* Redis used to handle locks (explained later) */
    registry.RegisterRedis("localhost:6379", 4, "lockers_pool")
    registry.RegisterLocker("default", "lockers_pool")

    /* ElasticSearch */
    registry.RegisterElastic("http://127.0.0.1:9200")
    //optionally you can define pool name as second argument
    registry.RegisterElastic("http://127.0.0.1:9200", "second_pool")
    // you can enable trace log
    registry.RegisterElasticWithTraceLog("http://127.0.0.1:9200", "second_pool")

    /* ClickHouse */
    registry.RegisterClickHouse("http://127.0.0.1:9000")
    //optionally you can define pool name as second argument
    registry.RegisterClickHouse("http://127.0.0.1:9000", "second_pool")
}

You can also create registry using yaml configuration file:
default:
    mysql: root:root@tcp(localhost:3310)/db
    mysqlEncoding: utf8 //optional, default is utf8mb4
    redis: localhost:6379:0
    streams:
      stream-1:
        - test-group-1
        - test-group-2
    elastic: http://127.0.0.1:9200
    elastic_trace: http://127.0.0.1:9201 //with trace log
    clickhouse: http://127.0.0.1:9000
    locker: default
    local_cache: 1000
second_pool:
    mysql: root:root@tcp(localhost:3311)/db2
      sentinel:
        master:1:
          - :26379
          - 192.156.23.11:26379
          - 192.156.23.12:26379
package main

import (
    "github.com/summer-solutions/orm"
    "gopkg.in/yaml.v2"
    "io/ioutil"
)

func main() {

    yamlFileData, err := ioutil.ReadFile("./yaml")
    if err != nil {
        //...
    }
    
    var parsedYaml map[string]interface{}
    err = yaml.Unmarshal(yamlFileData, &parsedYaml)
    registry := InitByYaml(parsedYaml)
}

Defining entities

package main

import (
	"github.com/summer-solutions/orm"
	"time"
)

func main() {

    type AddressSchema struct {
        Street   string
        Building uint16
    }
    
    type colors struct {
        Red    string
        Green  string
        Blue   string
        Yellow string
        Purple string
    }
    var Colors = &colors{
        orm.EnumModel,
    	Red:    "Red",
    	Green:  "Green",
    	Blue:   "Blue",
    	Yellow: "Yellow",
    	Purple: "Purple",
    }

    type testEntitySchema struct {
        orm.ORM
        ID                   uint
        Name                 string `orm:"length=100;index=FirstIndex"`
        NameNullable         string `orm:"length=100;index=FirstIndex"`
        BigName              string `orm:"length=max;required"`
        Uint8                uint8  `orm:"unique=SecondIndex:2,ThirdIndex"`
        Uint24               uint32 `orm:"mediumint=true"`
        Uint32               uint32
        Uint32Nullable       *uint32
        Uint64               uint64 `orm:"unique=SecondIndex"`
        Int8                 int8
        Int16                int16
        Int32                int32
        Int64                int64
        Rune                 rune
        Int                  int
        IntNullable          *int
        Bool                 bool
        BoolNullable         *bool
        Float32              float32
        Float64              float64
        Float64Nullable      *float64
        Float32Decimal       float32  `orm:"decimal=8,2"`
        Float64DecimalSigned float64  `orm:"decimal=8,2;unsigned=false"`
        Enum                 string   `orm:"enum=orm.colorEnum"`
        EnumNotNull          string   `orm:"enum=orm.colorEnum;required"`
        Set                  []string `orm:"set=orm.colorEnum"`
        YearNullable         *uint16   `orm:"year=true"`
        YearNotNull          uint16   `orm:"year=true"`
        Date                 *time.Time
        DateNotNull          time.Time
        DateTime             *time.Time `orm:"time=true"`
        DateTimeNotNull      time.Time  `orm:"time=true"`
        Address              AddressSchema
        Json                 interface{}
        ReferenceOne         *testEntitySchemaRef
        ReferenceOneCascade  *testEntitySchemaRef `orm:"cascade"`
        ReferenceMany        []*testEntitySchemaRef
        IgnoreField          []time.Time       `orm:"ignore"`
        Blob                 []byte
        MediumBlob           []byte `orm:"mediumblob=true"`
        LongBlob             []byte `orm:"longblob=true"`
        FieldAsJson          map[string]string
    }
    
    type testEntitySchemaRef struct {
        orm.ORM
        ID   uint
        Name string
    }
    type testEntitySecondPool struct {
    	orm.ORM `orm:"mysql=second_pool"`
    	ID                   uint
    }

    registry := &Registry{}
    var testEntitySchema testEntitySchema
    var testEntitySchemaRef testEntitySchemaRef
    var testEntitySecondPool testEntitySecondPool
    registry.RegisterEntity(testEntitySchema, testEntitySchemaRef, testEntitySecondPool)
    registry.RegisterEnumStruct("color", Colors)

    // now u can use:
    Colors.GetDefault() // "Red" (first field)
    Colors.GetFields() // ["Red", "Blue" ...]
    Colors.GetMapping() // map[string]string{"Red": "Red", "Blue": "Blue"}
    Colors.Has("Red") //true
    Colors.Has("Orange") //false
    
    //or register enum from slice
    registry.RegisterEnumSlice("color", []string{"Red", "Blue"})
    validatedRegistry.GetEnum("color").GetFields()
    validatedRegistry.GetEnum("color").Has("Red")
    
    //or register enum from map
    registry.RegisterEnumMap("color", map[string]string{"red": "Red", "blue": "Blue"}, "red")
}

There are only two golden rules you need to remember defining entity struct:

  • first field must be type of "ORM"
  • second argument must have name "ID" and must be type of one of uint, uint16, uint32, uint24, uint64, rune

By default entity is not cached in local cache or redis, to change that simply use key "redisCache" or "localCache" in "orm" tag for "ORM" field:

package main

import (
	"github.com/summer-solutions/orm"
	"time"
)

func main() {

    type testEntityLocalCache struct {
    	orm.ORM `orm:"localCache"` //default pool
       //...
    }
   
   type testEntityLocalCacheSecondPool struct {
    	orm.ORM `orm:"localCache=second_pool"`
       //...
    }
   
   type testEntityRedisCache struct {
    	orm.ORM `orm:"redisCache"` //default pool
       //...
    }
   
   type testEntityRedisCacheSecondPool struct {
    	orm.ORM `orm:"redisCache=second_pool"`
       //...
    }

   type testEntityLocalAndRedisCache struct {
    	orm.ORM `orm:"localCache;redisCache"`
       //...
    }
}

Validated registry

Once you created your registry and registered all pools and entities you should validate it. You should also run it once when your application starts.

package main

import "github.com/summer-solutions/orm"

func main() {
   registry := &Registry{}
   //register pools and entities
   validatedRegistry, err := registry.Validate()
}

Creating engine

You need to crete engine to start working with entities (searching, saving). You must create engine for each http request and thread.

package main

import "github.com/summer-solutions/orm"

func main() {
   registry := &Registry{}
   //register pools and entities
   validatedRegistry, err := registry.Validate()
   engine := validatedRegistry.CreateEngine()
}

Checking and updating table schema

ORM provides useful object that describes entity structrure called TabelSchema:

package main

import "github.com/summer-solutions/orm"

func main() {
   
   registry := &Registry{}
   // register
   validatedRegistry, err := registry.Validate() 
   engine := validatatedRegistry.CreateEngine()
   alters := engine.GetAlters()
   
   /*optionally you can execute alters for each model*/
   var userEntity UserEntity
   tableSchema := engine.GetRegistry().GetTableSchemaForEntity(userEntity)
   //or
   tableSchema := validatedRegistry.GetTableSchemaForEntity(userEntity)

   /*checking table structure*/
   tableSchema.UpdateSchema(engine) //it will create or alter table if needed
   tableSchema.DropTable(engine) //it will drop table if exist
   tableSchema.TruncateTable(engine)
   tableSchema.UpdateSchemaAndTruncateTable(engine)
   has, alters := tableSchema.GetSchemaChanges(engine)

   /*getting table structure*/
   db := tableSchema.GetMysql(engine)
   localCache, has := tableSchema.GetLocalCache(engine) 
   redisCache, has := tableSchema.GetRedisCache(engine)
   columns := tableSchema.GetColumns()
   tableSchema.GetTableName()
}

Adding, editing, deleting entities

package main

import "github.com/summer-solutions/orm"

func main() {

     /* adding */

    entity := testEntity{Name: "Name 1"}
    engine.Flush(&entity)

    entity2 := testEntity{Name: "Name 1"}
    entity2.SetOnDuplicateKeyUpdate(orm.Bind{"Counter": 2}, entity2)
    engine.Flush(&entity2)

    entity2 = testEntity{Name: "Name 1"}
    engine.SetOnDuplicateKeyUpdate(orm.Bind{}, entity2) //it will change nothing un row
    engine.Flush(&entity)

    /*if you need to add more than one entity*/
    entity = testEntity{Name: "Name 2"}
    entity2 := testEntity{Name: "Name 3"}
    flusher := engine.NewFlusher()
    flusher.Track(&entity, &entity2)
    //it will execute only one query in MySQL adding two rows at once (atomic)
    flusher.Flush()
 
    /* editing */

    flusher := engine.NewFlusher().Track(&entity, &entity2)
    entity.Name = "New name 2"
    //you can also use (but it's slower):
    entity.SetField("Name", "New name 2")
    entity.IsDirty() //returns true
    entity2.IsDirty() //returns false
    flusher.Flush() //it will save data in DB for all dirty tracked entities
    engine.IsDirty(entity) //returns false
    
    /* deleting */
    entity2.Delete()
    //or
    flusher.Delete(&entity, &entity2).Flush()

    /* flush will panic if there is any error. You can catch 2 special errors using this method  */
    err := flusher.FlushWithCheck()
    //or
    err := flusher.FlushInTransactionWithCheck()
    orm.DuplicatedKeyError{} //when unique index is broken
    orm.ForeignKeyError{} //when foreign key is broken
    
    /* You can catch all errors using this method  */
    err := flusher.FlushWithFullCheck()
}

Transactions

package main

import "github.com/summer-solutions/orm"

func main() {
	
    entity = testEntity{Name: "Name 2"}
    entity2 := testEntity{Name: "Name 3"}
    flusher := engine.NewFlusher().Track(&entity, &entity2)

    // DB transcation
    flusher.FlushInTransaction()
    // or redis lock
    flusher.FlushWithLock("default", "lock_name", 10 * time.Second, 10 * time.Second)
    // or DB transcation nad redis lock
    flusher.FlushInTransactionWithLock("default", "lock_name", 10 * time.Second, 10 * time.Second)
 
    //manual transaction
    db := engine.GetMysql()
    db.Begin()
    defer db.Rollback()
    //run queries
    db.Commit()

Loading entities using primary key

package main

import "github.com/summer-solutions/orm"

func main() {

    var entity testEntity
    has := engine.LoadByID(1, &entity)

    var entities []*testEntity
    missing := engine.LoadByIDs([]uint64{1, 3, 4}, &entities) //missing contains IDs that are missing in database

}

package main

import "github.com/summer-solutions/orm"

func main() {

    var entities []*testEntity
    pager := orm.NewPager(1, 1000)
    where := orm.NewWhere("`ID` > ? AND `ID` < ?", 1, 8)
    engine.Search(where, pager, &entities)
    
    //or if you need number of total rows
    totalRows := engine.SearchWithCount(where, pager, &entities)
    
    //or if you need only one row
    where := onm.NewWhere("`Name` = ?", "Hello")
    var entity testEntity
    found := engine.SearchOne(where, &entity)
    
    //or if you need only primary keys
    ids := engine.SearchIDs(where, pager, entity)
    
    //or if you need only primary keys and total rows
    ids, totalRows = engine.SearchIDsWithCount(where, pager, entity)
}

Reference one to one

package main

import "github.com/summer-solutions/orm"

func main() {

    type UserEntity struct {
        ORM
        ID                   uint64
        Name                 string
        School               *SchoolEntity `orm:"required"` // key is "on delete restrict" by default not not nullable
        SecondarySchool      *SchoolEntity // key is nullable
    }
    
    type SchoolEntity struct {
        ORM
        ID                   uint64
        Name                 string
    }

    type UserHouse struct {
        ORM
        ID                   uint64
        User                 *UserEntity  `orm:"cascade;required"` // on delete cascade and is not nullable
    }
    
    // saving in DB:

    user := UserEntity{Name: "John"}
    school := SchoolEntity{Name: "Name of school"}
    house := UserHouse{Name: "Name of school"}
    engine.Track(&user, &school, &house)
    user.School = school
    house.User = user
    engine.Flush()

    // loading references: 

    _ = engine.LoadById(1, &user)
    user.School != nil //returns true, School has ID: 1 but other fields are nof filled
    user.School.ID == 1 //true
    user.School.Loaded() //false
    user.Name == "" //true
    user.School.Load(engine) //it will load school from db
    user.School.Loaded() //now it's true, you can access school fields like user.School.Name
    user.Name == "Name of school" //true
    
    //If you want to set reference and you have only ID:
    user.School = &SchoolEntity{ID: 1}

    // detaching reference
    user.School = nil

    // preloading references
    engine.LoadByID(1, &user, "*") //all references
    engine.LoadByID(1, &user, "School") //only School
    engine.LoadByID(1, &user, "School", "SecondarySchool") //only School and SecondarySchool
    engine.LoadByID(1, &userHouse, "User/School", "User/SecondarySchool") //User, School and SecondarySchool in each User
    engine.LoadByID(1, &userHouse, "User/*") // User, all references in User
    engine.LoadByID(1, &userHouse, "User/*/*") // User, all references in User and all references in User subreferences
    //You can have as many levels you want: User/School/AnotherReference/EvenMore/
    
    //You can preload referenes in all search and load methods:
    engine.LoadByIDs()
    engine.Search()
    engine.SearchOne()
    engine.CachedSearch()
    ...
}

Cached queries

package main

import "github.com/summer-solutions/orm"

func main() {

    //Fields that needs to be tracked for changes should start with ":"

    type UserEntity struct {
        ORM
        ID                   uint64
        Name                 string
        Age                  uint16
        IndexAge             *CachedQuery `query:":Age = ? ORDER BY :ID"`
        IndexAll             *CachedQuery `query:""` //cache all rows
        IndexName            *CachedQuery `queryOne:":Name = ?"`
    }

    pager := orm.NewPager(1, 1000)
    var users []*UserEntity
    var user  UserEntity
    totalRows := engine.CachedSearch(&users, "IndexAge", pager, 18)
    totalRows = engine.CachedSearch(&users, "IndexAll", pager)
    has := engine.CachedSearchOne(&user, "IndexName", "John")

}

Lazy flush

Sometimes you want to flush changes in database, but it's ok if data is flushed after some time. For example when you want to save some logs in database.

package main

import "github.com/summer-solutions/orm"

func main() {
    
    // you need to register redis  
    registry.RegisterRedis("localhost:6379", 0)
    registry.RegisterRedis("localhost:6380", 0, "another_redis")
    
    // .. create engine

    type User struct {
       ORM  `orm:"log"`
       ID   uint
       Name string
       Age  int `orm:"skip-log"` //Don't track this field
    }
   
    // optionally you can set optional redis pool used to queue all events
    type Dog struct {
       ORM  `orm:"asyncRedisLazyFlush=another_redis"`
       ID   uint
       Name string
    }
       
    // now in code you can use FlushLazy() methods instead of Flush().
    // it will send changes to queue (database and cached is not updated yet)
    engine.FlushLazy()
    
    // you need to run code that will read data from queue and execute changes
    // run in separate goroutine (cron script)
    consumer := NewAsyncConsumer(engine, "my-consumer", 1) // you can run maximum one consumer
    consumer.Digest() //It will wait for new messages in a loop, run receiver.DisableLoop() to run loop once

    consumerAnotherPool := NewAsyncConsumer(engine,  "my-consumer", 5) // you can run up to 5 consumers at the same time
    consumerAnotherPool.Digets()
}

Request cache

It's a good practice to cache entities in one short request (e.g. http request) to reduce number of requests to databases.

If you are using more than one goroutine (for example in GraphQL backend implementation) you can enable Data loader in engine to group many queries into one and reduce number of queries. You can read more about idea behind it here.

package main

import "github.com/summer-solutions/orm"

func main() {
    engine.EnableDataLoader(true)
}

Otherwise set false and all entities will be cached in a simple temporary cache:

package main

import "github.com/summer-solutions/orm"

func main() {
    engine.EnableDataLoader(false) 
}

Log entity changes

ORM can store in database every change of entity in special log table.

package main

import "github.com/summer-solutions/orm"

func main() {

    //it's recommended to keep logs in separated DB
    registry.RegisterMySQLPool("root:root@tcp(localhost:3306)/log_database", "log_db_pool")
    // you need to register default Redis   
    registry.RegisterRedis("localhost:6379", 0)
    registry.RegisterRedis("localhost:6380", 0, "another_redis")

    //next you need to define in Entity that you want to log changes. Just add "log" tag or define mysql pool name
    type User struct {
        ORM  `orm:"log"`
        ID   uint
        Name string
        Age  int `orm:"skip-log"` //Don't track this field
    }
    
    // optionally you can set optional redis pool used to queue all events
     type Dog struct {
        ORM  `orm:"log=log_db_pool;asyncRedisLogs=another_redis"`
        ID   uint
        Name string
     }

    // Now every change of User will be saved in log table
    
    // You can add extra data to log, simply use this methods before Flush():
    engine.SetLogMetaData("logged_user_id", 12) 
    engine.SetLogMetaData("ip", request.GetUserIP())
    // you can set meta only in specific entity
    engine.SetEntityLogMeta("user_name", "john", entity)
    
    consumer := NewAsyncConsumer(engine, "my-consumer",  1)
    consumer.Digets() //it will wait for new messages in queue

    consumerAnotherPool := NewAsyncConsumer(engine, "my-consumer", 1)
    consumerAnotherPool.Digets()
}

Dirty stream

You can send event to event broker if any specific data in entity was changed.

package main

import "github.com/summer-solutions/orm"

func main() {

	//define at least one redis pool
    registry.RegisterRedis("localhost:6379", 0, "event-broker-pool")
    //define stream with consumer groups for events
    registry.RegisterRedisStream("user_changed", "event-broker-pool", []string{"my-consumer-group"})
    registry.RegisterRedisStream("age_name_changed", "event-broker-pool", []string{"my-consumer-group"})
    registry.RegisterRedisStream("age_changed", "event-broker-pool", []string{"my-consumer-group"})

    // create engine
    
    // next you need to define in Entity that you want to log changes. Just add "dirty" tag
    type User struct {
        orm.ORM  `orm:"dirty=user_changed"` //define dirty here to track all changes
        ID       uint
        Name     string `orm:"dirty=age_name_changed"` //event will be send to age_name_changed if Name or Age changed
        Age      int `orm:"dirty=age_name_changed,age_changed"` //event will be send to age_changed if Age changed
    }
 
    consumer := engine.GetEventBroker().Consume("my-consumer", "my-consumer-group", 1)

    consumer.Consume(context.Background(), 100, func(events []orm.Event) {
        for _, event := range events {
           dirty := orm.EventDirtyEntity(event) // created wrapper around event to easily access data
           if dirty.Added() {
           	 fmt.Printf("Entity %s with ID %d was added", dirty.TableSchema().GetType().String(), dirty.ID())
           } else if dirty.Updated() {
           	 fmt.Printf("Entity %s with ID %d was updated", dirty.TableSchema().GetType().String(), dirty.ID())
           } else if dirty.Deleted() {
             fmt.Printf("Entity %s with ID %d was deleted", dirty.TableSchema().GetType().String(), dirty.ID())
           }
           event.Ack()
        }
    })
}


Fake delete

If you want to keep deleted entity in database but ny default this entity should be excluded from all engine.Search() and engine.CacheSearch() queries you can use FakeDelete column. Simply create field bool with name "FakeDelete".

func main() {

    type UserEntity struct {
        ORM
        ID                   uint64
        Name                 string
        FakeDelete           bool
    }

    //you can delete in two ways:
    engine.Delete(user) -> will set user.FakeDelete = true
    //or:
    user.FakeDelete = true

    engine.Flush(user) //it will save entity id in Column `FakeDelete`.

    //will return all rows where `FakeDelete` = 0
    total, err = engine.SearchWithCount(NewWhere("1"), nil, &rows)

    //To force delete (remove row from DB):
    engine.ForceDelete(user)
}


Working with Redis

package main

import "github.com/summer-solutions/orm"

func main() {

    config.RegisterRedis("localhost:6379", 0)
    
    //storing data in cached for x seconds
    val := engine.GetRedis().GetSet("key", 1, func() interface{} {
		return "hello"
	})
    
    //standard redis api
    keys := engine.GetRedis().LRange("key", 1, 2)
    engine.GetRedis().LPush("key", "a", "b")
    //...

    //rete limiter
    valid := engine.GetRedis().RateLimit("resource_name", redis_rate.PerMinute(10))
}

Working with local cache

package main

import "github.com/summer-solutions/orm"

func main() {
    
    registry.RegisterLocalCache(1000)
    
    //storing data in cached for x seconds
    val := engine.GetLocalCache().GetSet("key", 1, func() interface{} {
        return "hello"
    })
    
    //getting value
    value, has := engine.GetLocalCache().Get("key")
    
    //getting many values
    values := engine.GetLocalCache().MGet("key1", "key2")
    
    //setting value
    engine.GetLocalCache().Set("key", "value")
    
    //setting values
    engine.GetLocalCache().MSet("key1", "value1", "key2", "value2" /*...*/)
    
    //getting values from hash set (like redis HMGET)
    values = engine.GetLocalCache().HMget("key")
    
    //setting values in hash set
    engine.GetLocalCache().HMset("key", map[string]interface{}{"key1" : "value1", "key2": "value 2"})

    //deleting value
    engine.GetLocalCache().Remove("key1", "key2" /*...*/)
    
    //clearing cache
    engine.GetLocalCache().Clear()

}

Working with mysql

package main

import (
    "database/sql"
    "github.com/summer-solutions/orm"
)

func main() {
    
    // register mysql pool
    registry.RegisterMySQLPool("root:root@tcp(localhost:3306)/database_name")

    res := engine.GetMysql().Exec("UPDATE `table_name` SET `Name` = ? WHERE `ID` = ?", "Hello", 2)

    var row string
    found := engine.GetMysql().QueryRow(orm.NewWhere("SELECT * FROM `table_name` WHERE  `ID` = ?", 1), &row)
    
    results, def := engine.GetMysql().Query("SELECT * FROM `table_name` WHERE  `ID` > ? LIMIT 100", 1)
    defer def()
    for results.Next() {
    	var row string
        results.Scan(&row)
    }
    def() //if it's not last line in this method
}

package main

import (
    "github.com/summer-solutions/orm"
)

func main() {

    type TestIndex struct {
    }
    
    func (i *TestIndex) GetName() string {
    	return "test_index"
    }
    
    func (i *TestIndex) GetDefinition() map[string]interface{} {
        return map[string]interface{}{
            "settings": map[string]interface{}{
                "number_of_replicas": "1",
                "number_of_shards":   "1",
            },
            "mappings": map[string]interface{}{
                "properties": map[string]interface{}{
                    "Name": map[string]interface{}{
                        "type":       "keyword",
                        "normalizer": "case_insensitive",
                    },
                },
            },
        }
    }

    
    // register elastic search pool and index
    registry.RegisterElastic("http://127.0.0.1:9200")
    registry.RegisterElasticIndex(&TestIndex{})


    e := engine.GetElastic()

    // create indices
	for _, alter := range engine.GetElasticIndexAlters() {
        // alter.Safe is true if index does not exists or is not empty
		engine.GetElastic(alter.Pool).CreateIndex(alter.Index)
	}

    query := elastic.NewBoolQuery()
	query.Must(elastic.NewTermQuery("user_id", 12))
    options := &orm.SearchOptions{}
    options.AddSort("created_at", true).AddSort("name", false)
	results := e.Search("users", query, orm.NewPager(1, 10), options)
}

Working with ClickHouse

package main

import (
    "github.com/summer-solutions/orm"
)

func main() {
    
    // register elastic search pool
    registry.RegisterClickHouse("http://127.0.0.1:9000")

    ch := engine.GetClickHouse()

    ch.Exec("INSERT INTO `table` (name) VALUES (?)", "hello")

    statement, def := ch.Prepare("INSERT INTO `table` (name) VALUES (?)")
    defer def()
    statement.Exec("hello")
    statement.Exec("hello 2")

    rows, def := ch.Queryx("SELECT FROM `table` WHERE x = ? AND y = ?", 1, "john")
    defer def()
    for rows.Next() {
    	m := &MyStruct{}
        err := rows.StructScan(m)
    }

    ch.Begin()
    defer ch.Rollback()
    // run queries
    defer ch.Commit()
}

Working with Locker

Shared cached that is using redis

package main

import "github.com/summer-solutions/orm"

func main() {

    // register redis and locker
    registry.RegisterRedis("localhost:6379", 0, "my_pool")
    registry.RegisterLocker("default", "my_pool")
    
    locker, _ := engine.GetLocker()
    lock := locker.Obtain("my_lock", 5 * Time.Second, 1 * Time.Second)

    defer lock.Release()
    
    // do smth
    
    ttl := lock.TTL()
    if ttl == 0 {
        panic("lock lost")
    }
}

Query logging

You can log all queries:

  • queries to MySQL database (insert, select, update)
  • requests to Redis
  • requests to Elastic Search
  • queries to CickHouse
package main

import "github.com/summer-solutions/orm"

func main() {
	
    //enable human friendly console log
    engine.EnableQueryDebug() //MySQL, redis, Elastic Search, ClickHouse queries (local cache in excluded bt default)
    engine.EnableQueryDebug(orm.QueryLoggerSourceRedis, orm.QueryLoggerSourceLocalCache)

    //adding custom logger example:
    engine.AddQueryLogger(json.New(os.Stdout), log.LevelWarn) //MySQL, redis warnings and above
    engine.AddQueryLogger(es.New(os.Stdout), log.LevelError, orm.QueryLoggerSourceRedis)
}    

Logger

package main

import "github.com/summer-solutions/orm"

func main() {
	
    //enable json logger with proper level
    engine.EnableLogger(log.InfoLevel)
    //or enable human friendly console logger
    engine.EnableDebug()
    
    //you can add special fields to all logs
    engine.Log().AddFields(log.Fields{"user_id": 12, "session_id": "skfjhfhhs1221"})

    //printing logs
    engine.Log().Warn("message", nil)
    engine.Log().Debug("message", log.Fields{"user_id": 12})
    engine.Log().Error(err, nil)
    engine.Log().ErrorMessage("ups, that is strange", nil)


    //handling recovery
    if err := recover(); err != nil {
    	engine.Log().Error(err, nil)
    }

    //filling log with data from http.Request
    engine.Log().AddFieldsFromHTTPRequest(request, "197.21.34.22")

}    

Event broker

ORM provides easy way to use event broker.

First yuo need to define streams and consumer groups:

#YAML config file
default:
  redis: localhost:6381:0 // redis is required
  streams:
    stream-1:
      - test-group-1
      - test-group-2
    stream-2:
      - test-group-1
    stream-3:
      - test-group-3

or using go:

package main

import "github.com/summer-solutions/orm"

func main() {
 registry := &orm.Registry{}
 registry.RegisterRedisStream("stream-1", "default", []string{"test-group-1", "test-group-2"})
 registry.RegisterRedisStream("stream-2", "default", []string{"test-group-1"})
 registry.RegisterRedisStream("stream-3", "default", []string{"test-group-3"})
}    

Publishing and receiving events :

package main

import (
 "context"
 "github.com/summer-solutions/orm"
)

func main() {

 // .. create engine

 type Person struct {
  Name string
  Age  int
 }

 // fastest, no serialization
 engine.GetEventBroker().PublishMap("stream-1", orm.EventAsMap{"key": "value", "anotherKey": "value 2"})
 
 // using serialization
 engine.GetEventBroker().Publish("stream-3", Person{Name: "Adam", Age: 18})

 // publishing many at once, recommended because it's much faster than one by one
 flusher :=  engine.GetEventBroker().NewFlusher()
 flusher.PublishMap("stream-1", orm.EventAsMap{"key": "value", "anotherKey": "value 2"})
 flusher.Publish("stream-1", Person{Name: "Adam", Age: 18})
 flusher.Flush()
 
 // reading from "stream-1" and "stream-2" streams, you can run max one consumer at once
 consumerTestGroup1 := engine.GetEventBroker().Consume("my-consumer", "test-group-1", 1)
 
 // reading max 100 events in one loop, this line stop execution, waiting for new events
 consumerTestGroup1.Consume(context.Background(), 100, func(events []orm.Event) {
 	for _, event := range events {
            values := event.RawData() // map[string]interface{}{"key": "value", "anotherKey": "value 2"}
            //do some work
            event.Ack() // this line is acknowledging event
 	}
 })

 // auto acknowledging
 consumerTestGroup1.Consume(context.Background(), 100, func(events []orm.Event) { 
 	//do some work, for example put all events at once to DB
 	// in this example all events will be acknowledge when this method is finished 
 })

 // skipping some events
 consumerTestGroup1.Consume(context.Background(), 100, func(events []orm.Event) {
  for _, event := range events {
        if someCondition {
             event.Ack()
        } else {
             event.Skip() //this event will be consumed again later
        }
    }
 })

 // reading from "stream-3" stream, you can run max to two consumers at once
 consumerTestGroup3 := engine.GetEventBroker().Consume("my-consumer", "test-group-2", 2)
 consumerTestGroup3.DisableLoop() // all events will be consumed once withour waiting for new events   

 consumerTestGroup3.Consume(context.Background(), 100, func(events []orm.Event) {
    var person Person
 	for _, event := range events {
        err := event.Unserialize(&person)
        if err != nil {
        	// ...
        }
        //do some work
        event.Ack() // this line is acknowledging event
    }
 })

}    

Setting another redis pool for AsyncConsumer:

another_pool:
  redis: localhost:6381:0
  streams:
   orm-lazy-channel: # FlushLazy()
      - default-group # group name for AsyncConsumer
    orm-log-channel: # adding changes to logs
      - default-group # group name for AsyncConsumer
      - test-group-2 # you can register another consumers to read logs  

Tools

Redis streams statistics

package main

import "github.com/summer-solutions/orm/tools"

func main() {
   stats := tools.GetRedisStreamsStatistics(engine) 
}    

Documentation

Index

Constants

View Source
const (
	QueryLoggerSourceDB = iota
	QueryLoggerSourceRedis
	QueryLoggerSourceElastic
	QueryLoggerSourceClickHouse
	QueryLoggerSourceLocalCache
	QueryLoggerSourceStreams
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Alter

type Alter struct {
	SQL  string
	Safe bool
	Pool string
}

type AsyncConsumer added in v1.75.3

type AsyncConsumer struct {
	// contains filtered or unexported fields
}

func NewAsyncConsumer added in v1.75.3

func NewAsyncConsumer(engine *Engine, name string) *AsyncConsumer

func (*AsyncConsumer) Digest added in v1.75.3

func (r *AsyncConsumer) Digest(ctx context.Context, count int)

func (*AsyncConsumer) DisableLoop added in v1.75.3

func (r *AsyncConsumer) DisableLoop()

func (*AsyncConsumer) RegisterErrorHandler added in v1.78.5

func (r *AsyncConsumer) RegisterErrorHandler(handler func(err interface{}))

func (*AsyncConsumer) SetHeartBeat added in v1.75.3

func (r *AsyncConsumer) SetHeartBeat(duration time.Duration, beat func())

func (*AsyncConsumer) SetLogLogger added in v1.75.3

func (r *AsyncConsumer) SetLogLogger(logger func(log *LogQueueValue))

type Bind added in v1.75.3

type Bind map[string]interface{}

type CachedQuery

type CachedQuery struct{}

type ClickHouse

type ClickHouse struct {
	// contains filtered or unexported fields
}

func (*ClickHouse) Begin

func (c *ClickHouse) Begin()

func (*ClickHouse) Commit

func (c *ClickHouse) Commit()

func (*ClickHouse) Exec

func (c *ClickHouse) Exec(query string, args ...interface{}) sql.Result

func (*ClickHouse) Prepare

func (c *ClickHouse) Prepare(query string) (preparedStatement *PreparedStatement, deferF func())

func (*ClickHouse) Queryx

func (c *ClickHouse) Queryx(query string, args ...interface{}) (rows *sqlx.Rows, deferF func())

func (*ClickHouse) Rollback

func (c *ClickHouse) Rollback()

type ClickHouseConfig

type ClickHouseConfig struct {
	// contains filtered or unexported fields
}

type DB

type DB struct {
	// contains filtered or unexported fields
}

func (*DB) Begin

func (db *DB) Begin()

func (*DB) Commit

func (db *DB) Commit()

func (*DB) Exec

func (db *DB) Exec(query string, args ...interface{}) ExecResult

func (*DB) GetDatabaseName

func (db *DB) GetDatabaseName() string

func (*DB) GetPoolCode

func (db *DB) GetPoolCode() string

func (*DB) Query

func (db *DB) Query(query string, args ...interface{}) (rows Rows, deferF func())

func (*DB) QueryRow

func (db *DB) QueryRow(query *Where, toFill ...interface{}) (found bool)

func (*DB) Rollback

func (db *DB) Rollback()

type DBConfig

type DBConfig struct {
	// contains filtered or unexported fields
}

type DirtyEntityEvent added in v1.75.3

type DirtyEntityEvent interface {
	ID() uint64
	TableSchema() TableSchema
	Added() bool
	Updated() bool
	Deleted() bool
}

func EventDirtyEntity added in v1.75.3

func EventDirtyEntity(e Event) DirtyEntityEvent

type DuplicatedKeyError

type DuplicatedKeyError struct {
	Message string
	Index   string
}

func (*DuplicatedKeyError) Error

func (err *DuplicatedKeyError) Error() string

type Elastic

type Elastic struct {
	// contains filtered or unexported fields
}

func (*Elastic) Client

func (e *Elastic) Client() *elastic.Client

func (*Elastic) CreateIndex

func (e *Elastic) CreateIndex(index ElasticIndexDefinition)

func (*Elastic) DropIndex

func (e *Elastic) DropIndex(index ElasticIndexDefinition)

func (*Elastic) Search

func (e *Elastic) Search(index string, query elastic.Query, pager *Pager, options *SearchOptions) *elastic.SearchResult

type ElasticConfig

type ElasticConfig struct {
	// contains filtered or unexported fields
}

type ElasticIndexAlter

type ElasticIndexAlter struct {
	Index      ElasticIndexDefinition
	Safe       bool
	Pool       string
	NewMapping map[string]interface{}
	OldMapping map[string]interface{}
}

type ElasticIndexDefinition

type ElasticIndexDefinition interface {
	GetName() string
	GetDefinition() map[string]interface{}
}

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

func (*Engine) AddQueryLogger

func (e *Engine) AddQueryLogger(handler logApex.Handler, level logApex.Level, source ...QueryLoggerSource)

func (*Engine) CachedSearch

func (e *Engine) CachedSearch(entities interface{}, indexName string, pager *Pager, arguments ...interface{}) (totalRows int)

func (*Engine) CachedSearchCount added in v1.75.3

func (e *Engine) CachedSearchCount(entity Entity, indexName string, arguments ...interface{}) int

func (*Engine) CachedSearchIDs

func (e *Engine) CachedSearchIDs(entity Entity, indexName string, pager *Pager, arguments ...interface{}) (totalRows int, ids []uint64)

func (*Engine) CachedSearchOne

func (e *Engine) CachedSearchOne(entity Entity, indexName string, arguments ...interface{}) (found bool)

func (*Engine) CachedSearchOneWithReferences

func (e *Engine) CachedSearchOneWithReferences(entity Entity, indexName string, arguments []interface{}, references []string) (found bool)

func (*Engine) CachedSearchWithReferences

func (e *Engine) CachedSearchWithReferences(entities interface{}, indexName string, pager *Pager,
	arguments []interface{}, references []string) (totalRows int)

func (*Engine) ClearByIDs

func (e *Engine) ClearByIDs(entity Entity, ids ...uint64)

func (*Engine) Delete added in v1.75.3

func (e *Engine) Delete(entity Entity)

func (*Engine) DeleteMany added in v1.75.3

func (e *Engine) DeleteMany(entities ...Entity)

func (*Engine) EnableDebug

func (e *Engine) EnableDebug()

func (*Engine) EnableLogger

func (e *Engine) EnableLogger(level logApex.Level, handlers ...logApex.Handler)

func (*Engine) EnableQueryDebug

func (e *Engine) EnableQueryDebug(source ...QueryLoggerSource)

func (*Engine) EnableRequestCache added in v1.75.3

func (e *Engine) EnableRequestCache(goroutines bool)

func (*Engine) Flush

func (e *Engine) Flush(entity Entity)

func (*Engine) FlushLazy

func (e *Engine) FlushLazy(entity Entity)

func (*Engine) FlushLazyMany added in v1.75.3

func (e *Engine) FlushLazyMany(entities ...Entity)

func (*Engine) FlushMany added in v1.75.3

func (e *Engine) FlushMany(entities ...Entity)

func (*Engine) FlushWithCheck

func (e *Engine) FlushWithCheck(entity Entity) error

func (*Engine) FlushWithCheckMany added in v1.75.3

func (e *Engine) FlushWithCheckMany(entities ...Entity) error

func (*Engine) ForceDelete added in v1.75.3

func (e *Engine) ForceDelete(entity Entity)

func (*Engine) GetAlters

func (e *Engine) GetAlters() (alters []Alter)

func (*Engine) GetClickHouse

func (e *Engine) GetClickHouse(code ...string) *ClickHouse

func (*Engine) GetElastic

func (e *Engine) GetElastic(code ...string) *Elastic

func (*Engine) GetElasticIndexAlters

func (e *Engine) GetElasticIndexAlters() (alters []ElasticIndexAlter)

func (*Engine) GetEventBroker added in v1.75.3

func (e *Engine) GetEventBroker() EventBroker

func (*Engine) GetLocalCache

func (e *Engine) GetLocalCache(code ...string) *LocalCache

func (*Engine) GetLocker

func (e *Engine) GetLocker(code ...string) *Locker

func (*Engine) GetMysql

func (e *Engine) GetMysql(code ...string) *DB

func (*Engine) GetRedis

func (e *Engine) GetRedis(code ...string) *RedisCache

func (*Engine) GetRedisSearch added in v1.76.0

func (e *Engine) GetRedisSearch(code ...string) *RedisSearch

func (*Engine) GetRedisSearchIndexAlters added in v1.76.0

func (e *Engine) GetRedisSearchIndexAlters() (alters []RedisSearchIndexAlter)

func (*Engine) GetRegistry

func (e *Engine) GetRegistry() ValidatedRegistry

func (*Engine) Load

func (e *Engine) Load(entity Entity, references ...string)

func (*Engine) LoadByID

func (e *Engine) LoadByID(id uint64, entity Entity, references ...string) (found bool)

func (*Engine) LoadByIDs

func (e *Engine) LoadByIDs(ids []uint64, entities interface{}, references ...string) (missing []uint64)

func (*Engine) Log

func (e *Engine) Log() Log

func (*Engine) NewFastEngine added in v1.75.3

func (e *Engine) NewFastEngine() FastEngine

func (*Engine) NewFlusher added in v1.75.3

func (e *Engine) NewFlusher() Flusher

func (*Engine) NewRedisFlusher added in v1.75.3

func (e *Engine) NewRedisFlusher() RedisFlusher

func (*Engine) RedisSearchIds added in v1.79.0

func (e *Engine) RedisSearchIds(entity Entity, query *RedisSearchQuery, pager *Pager, references ...string) (ids []uint64, totalRows uint64)

func (*Engine) Search

func (e *Engine) Search(where *Where, pager *Pager, entities interface{}, references ...string)

func (*Engine) SearchIDs

func (e *Engine) SearchIDs(where *Where, pager *Pager, entity Entity) []uint64

func (*Engine) SearchIDsWithCount

func (e *Engine) SearchIDsWithCount(where *Where, pager *Pager, entity Entity) (results []uint64, totalRows int)

func (*Engine) SearchOne

func (e *Engine) SearchOne(where *Where, entity Entity, references ...string) (found bool)

func (*Engine) SearchWithCount

func (e *Engine) SearchWithCount(where *Where, pager *Pager, entities interface{}, references ...string) (totalRows int)

func (*Engine) SetLogMetaData

func (e *Engine) SetLogMetaData(key string, value interface{})

type Entity

type Entity interface {
	GetID() uint64

	Loaded() bool
	IsDirty() bool
	GetDirtyBind() (bind Bind, has bool)
	SetOnDuplicateKeyUpdate(bind Bind)
	SetEntityLogMeta(key string, value interface{})
	SetField(field string, value interface{}) error
	// contains filtered or unexported methods
}

type Enum

type Enum interface {
	GetFields() []string
	GetMapping() map[string]string
	GetDefault() string
	Has(value string) bool
	// contains filtered or unexported methods
}

type EnumModel

type EnumModel struct {
	// contains filtered or unexported fields
}

func (*EnumModel) GetDefault

func (enum *EnumModel) GetDefault() string

func (*EnumModel) GetFields

func (enum *EnumModel) GetFields() []string

func (*EnumModel) GetMapping

func (enum *EnumModel) GetMapping() map[string]string

func (*EnumModel) Has

func (enum *EnumModel) Has(value string) bool

type Event added in v1.75.3

type Event interface {
	Ack()
	Skip()
	ID() string
	Stream() string
	RawData() map[string]interface{}
	Unserialize(val interface{}) error
}

type EventAsMap added in v1.75.3

type EventAsMap map[string]interface{}

type EventBroker added in v1.75.3

type EventBroker interface {
	PublishMap(stream string, event EventAsMap) (id string)
	Publish(stream string, event interface{}) (id string)
	Consumer(name, group string) EventsConsumer
	NewFlusher() EventFlusher
}

type EventConsumerHandler added in v1.75.3

type EventConsumerHandler func([]Event)

type EventFlusher added in v1.75.3

type EventFlusher interface {
	PublishMap(stream string, event EventAsMap)
	Publish(stream string, event interface{})
	Flush()
}

type EventsConsumer added in v1.75.3

type EventsConsumer interface {
	Consume(ctx context.Context, count int, blocking bool, handler EventConsumerHandler)
	DisableLoop()
	SetHeartBeat(duration time.Duration, beat func())
}

type ExecResult

type ExecResult interface {
	LastInsertId() uint64
	RowsAffected() uint64
}

type FastEngine added in v1.75.3

type FastEngine interface {
	LoadByID(id uint64, entity Entity, references ...string) (found bool, fastEntity FastEntity)
	LoadByIDs(ids []uint64, entity Entity, references ...string) (result []FastEntity, missing []uint64)
}

type FastEntity added in v1.75.3

type FastEntity interface {
	GetID() uint64
	Get(field string) interface{}
	Fill(entity Entity)
	Is(entity Entity) bool
}

type Flusher added in v1.75.3

type Flusher interface {
	Track(entity ...Entity) Flusher
	Flush()
	FlushWithCheck() error
	FlushInTransactionWithCheck() error
	FlushWithFullCheck() error
	FlushLazy()
	FlushInTransaction()
	FlushWithLock(lockerPool string, lockName string, ttl time.Duration, waitTimeout time.Duration)
	FlushInTransactionWithLock(lockerPool string, lockName string, ttl time.Duration, waitTimeout time.Duration)
	Clear()
	MarkDirty(entity Entity, queueCode string, ids ...uint64)
	Delete(entity ...Entity) Flusher
	ForceDelete(entity ...Entity) Flusher
}

type ForeignKeyError

type ForeignKeyError struct {
	Message    string
	Constraint string
}

func (*ForeignKeyError) Error

func (err *ForeignKeyError) Error() string

type GetSetProvider

type GetSetProvider func() interface{}

type LocalCache

type LocalCache struct {
	// contains filtered or unexported fields
}

func (*LocalCache) Clear

func (c *LocalCache) Clear()

func (*LocalCache) Get

func (c *LocalCache) Get(key string) (value interface{}, ok bool)

func (*LocalCache) GetObjectsCount

func (c *LocalCache) GetObjectsCount() int

func (*LocalCache) GetSet

func (c *LocalCache) GetSet(key string, ttlSeconds int, provider GetSetProvider) interface{}

func (*LocalCache) HMget

func (c *LocalCache) HMget(key string, fields ...string) map[string]interface{}

func (*LocalCache) HMset

func (c *LocalCache) HMset(key string, fields map[string]interface{})

func (*LocalCache) MGet

func (c *LocalCache) MGet(keys ...string) map[string]interface{}

func (*LocalCache) MSet

func (c *LocalCache) MSet(pairs ...interface{})

func (*LocalCache) Remove

func (c *LocalCache) Remove(keys ...string)

func (*LocalCache) Set

func (c *LocalCache) Set(key string, value interface{})

type LocalCacheConfig

type LocalCacheConfig struct {
	// contains filtered or unexported fields
}

type Lock

type Lock struct {
	// contains filtered or unexported fields
}

func (*Lock) Refresh added in v1.75.3

func (l *Lock) Refresh(ctx context.Context, ttl time.Duration) bool

func (*Lock) Release

func (l *Lock) Release()

func (*Lock) TTL

func (l *Lock) TTL() time.Duration

type Locker

type Locker struct {
	// contains filtered or unexported fields
}

func (*Locker) Obtain

func (l *Locker) Obtain(ctx context.Context, key string, ttl time.Duration, waitTimeout time.Duration) (lock *Lock, obtained bool)

type Log

type Log interface {
	AddFields(fields apexLog.Fielder)
	Debug(message string, fields apexLog.Fielder)
	Info(message string, fields apexLog.Fielder)
	Warn(message string, fields apexLog.Fielder)
	Error(err interface{}, fields apexLog.Fielder)
	ErrorMessage(message string, fields apexLog.Fielder)
	AddFieldsFromHTTPRequest(r *http.Request, ip string)
	SetHTTPResponseCode(code int)
}

type LogQueueValue

type LogQueueValue struct {
	PoolName  string
	TableName string
	ID        uint64
	LogID     uint64
	Meta      map[string]interface{}
	Before    map[string]interface{}
	Changes   map[string]interface{}
	Updated   time.Time
}

type ORM

type ORM struct {
	// contains filtered or unexported fields
}

func (*ORM) GetDirtyBind added in v1.75.3

func (orm *ORM) GetDirtyBind() (bind Bind, has bool)

func (*ORM) GetID

func (orm *ORM) GetID() uint64

func (*ORM) IsDirty added in v1.75.3

func (orm *ORM) IsDirty() bool

func (*ORM) Loaded added in v1.75.3

func (orm *ORM) Loaded() bool

func (*ORM) SetEntityLogMeta added in v1.75.3

func (orm *ORM) SetEntityLogMeta(key string, value interface{})

func (*ORM) SetField

func (orm *ORM) SetField(field string, value interface{}) error

func (*ORM) SetOnDuplicateKeyUpdate added in v1.75.3

func (orm *ORM) SetOnDuplicateKeyUpdate(bind Bind)

type Pager

type Pager struct {
	CurrentPage int
	PageSize    int
}

func NewPager

func NewPager(currentPage, pageSize int) *Pager

func (*Pager) GetCurrentPage

func (pager *Pager) GetCurrentPage() int

func (*Pager) GetPageSize

func (pager *Pager) GetPageSize() int

func (*Pager) IncrementPage

func (pager *Pager) IncrementPage()

type PipeLineBool added in v1.76.0

type PipeLineBool struct {
	// contains filtered or unexported fields
}

func (*PipeLineBool) Result added in v1.76.0

func (c *PipeLineBool) Result() (bool, error)

type PipeLineGet added in v1.75.3

type PipeLineGet struct {
	// contains filtered or unexported fields
}

func (*PipeLineGet) Result added in v1.75.3

func (c *PipeLineGet) Result() (value string, has bool, err error)

type PipeLineInt added in v1.75.3

type PipeLineInt struct {
	// contains filtered or unexported fields
}

func (*PipeLineInt) Result added in v1.75.3

func (c *PipeLineInt) Result() (int64, error)

type PipeLineStatus added in v1.75.3

type PipeLineStatus struct {
	// contains filtered or unexported fields
}

func (*PipeLineStatus) Result added in v1.75.3

func (c *PipeLineStatus) Result() error

type PipeLineString added in v1.75.3

type PipeLineString struct {
	// contains filtered or unexported fields
}

func (*PipeLineString) Result added in v1.75.3

func (c *PipeLineString) Result() (string, error)

type PreparedStatement

type PreparedStatement struct {
	// contains filtered or unexported fields
}

func (*PreparedStatement) Exec

func (p *PreparedStatement) Exec(args ...interface{}) sql.Result

type QueryLoggerSource

type QueryLoggerSource int

type RedisCache

type RedisCache struct {
	// contains filtered or unexported fields
}

func (*RedisCache) Del

func (r *RedisCache) Del(keys ...string)

func (*RedisCache) Eval added in v1.75.3

func (r *RedisCache) Eval(script string, keys []string, args ...interface{}) interface{}

func (*RedisCache) EvalSha added in v1.75.3

func (r *RedisCache) EvalSha(sha1 string, keys []string, args ...interface{}) interface{}

func (*RedisCache) Exists added in v1.75.3

func (r *RedisCache) Exists(keys ...string) int64

func (*RedisCache) Expire added in v1.76.0

func (r *RedisCache) Expire(key string, expiration time.Duration) bool

func (*RedisCache) FlushDB

func (r *RedisCache) FlushDB()

func (*RedisCache) Get

func (r *RedisCache) Get(key string) (value string, has bool)

func (*RedisCache) GetSet

func (r *RedisCache) GetSet(key string, ttlSeconds int, provider GetSetProvider) interface{}

func (*RedisCache) HDel added in v1.75.3

func (r *RedisCache) HDel(key string, fields ...string)

func (*RedisCache) HGet added in v1.75.3

func (r *RedisCache) HGet(key, field string) (value string, has bool)

func (*RedisCache) HGetAll

func (r *RedisCache) HGetAll(key string) map[string]string

func (*RedisCache) HIncrBy added in v1.75.3

func (r *RedisCache) HIncrBy(key, field string, incr int64) int64

func (*RedisCache) HLen added in v1.76.0

func (r *RedisCache) HLen(key string) int64

func (*RedisCache) HMget

func (r *RedisCache) HMget(key string, fields ...string) map[string]interface{}

func (*RedisCache) HSet

func (r *RedisCache) HSet(key string, values ...interface{})

func (*RedisCache) Info added in v1.75.3

func (r *RedisCache) Info(section ...string) string

func (*RedisCache) LLen

func (r *RedisCache) LLen(key string) int64

func (*RedisCache) LPush

func (r *RedisCache) LPush(key string, values ...interface{}) int64

func (*RedisCache) LRange

func (r *RedisCache) LRange(key string, start, stop int64) []string

func (*RedisCache) LRem

func (r *RedisCache) LRem(key string, count int64, value interface{})

func (*RedisCache) LSet

func (r *RedisCache) LSet(key string, index int64, value interface{})

func (*RedisCache) Ltrim

func (r *RedisCache) Ltrim(key string, start, stop int64)

func (*RedisCache) MGet

func (r *RedisCache) MGet(keys ...string) map[string]interface{}

func (*RedisCache) MSet

func (r *RedisCache) MSet(pairs ...interface{})

func (*RedisCache) PipeLine added in v1.75.3

func (r *RedisCache) PipeLine() *RedisPipeLine

func (*RedisCache) RPop

func (r *RedisCache) RPop(key string) (value string, found bool)

func (*RedisCache) RPush

func (r *RedisCache) RPush(key string, values ...interface{}) int64

func (*RedisCache) RateLimit

func (r *RedisCache) RateLimit(key string, limit redis_rate.Limit) bool

func (*RedisCache) SAdd

func (r *RedisCache) SAdd(key string, members ...interface{}) int64

func (*RedisCache) SCard

func (r *RedisCache) SCard(key string) int64

func (*RedisCache) SPop

func (r *RedisCache) SPop(key string) (string, bool)

func (*RedisCache) SPopN

func (r *RedisCache) SPopN(key string, max int64) []string

func (*RedisCache) ScriptLoad added in v1.75.3

func (r *RedisCache) ScriptLoad(script string) string

func (*RedisCache) Set

func (r *RedisCache) Set(key string, value interface{}, ttlSeconds int)

func (*RedisCache) Type added in v1.75.3

func (r *RedisCache) Type(key string) string

func (*RedisCache) XAck added in v1.75.3

func (r *RedisCache) XAck(stream, group string, ids ...string) int64

func (*RedisCache) XClaim added in v1.75.3

func (r *RedisCache) XClaim(a *redis.XClaimArgs) []redis.XMessage

func (*RedisCache) XClaimJustID added in v1.75.3

func (r *RedisCache) XClaimJustID(a *redis.XClaimArgs) []string

func (*RedisCache) XDel added in v1.75.3

func (r *RedisCache) XDel(stream string, ids ...string) int64

func (*RedisCache) XGroupCreate added in v1.75.3

func (r *RedisCache) XGroupCreate(stream, group, start string) (key string, exists bool)

func (*RedisCache) XGroupCreateMkStream added in v1.75.3

func (r *RedisCache) XGroupCreateMkStream(stream, group, start string) (key string, exists bool)

func (*RedisCache) XGroupDelConsumer added in v1.75.3

func (r *RedisCache) XGroupDelConsumer(stream, group, consumer string) int64

func (*RedisCache) XGroupDestroy added in v1.75.3

func (r *RedisCache) XGroupDestroy(stream, group string) int64

func (*RedisCache) XInfoGroups added in v1.75.3

func (r *RedisCache) XInfoGroups(stream string) []redis.XInfoGroup

func (*RedisCache) XInfoStream added in v1.75.3

func (r *RedisCache) XInfoStream(stream string) *redis.XInfoStream

func (*RedisCache) XLen added in v1.75.3

func (r *RedisCache) XLen(stream string) int64

func (*RedisCache) XPending added in v1.75.3

func (r *RedisCache) XPending(stream, group string) *redis.XPending

func (*RedisCache) XPendingExt added in v1.75.3

func (r *RedisCache) XPendingExt(a *redis.XPendingExtArgs) []redis.XPendingExt

func (*RedisCache) XRange added in v1.75.3

func (r *RedisCache) XRange(stream, start, stop string, count int64) []redis.XMessage

func (*RedisCache) XRead added in v1.75.3

func (r *RedisCache) XRead(a *redis.XReadArgs) []redis.XStream

func (*RedisCache) XReadGroup added in v1.75.3

func (r *RedisCache) XReadGroup(a *redis.XReadGroupArgs) (streams []redis.XStream)

func (*RedisCache) XRevRange added in v1.75.3

func (r *RedisCache) XRevRange(stream, start, stop string, count int64) []redis.XMessage

func (*RedisCache) XTrim added in v1.75.3

func (r *RedisCache) XTrim(stream string, maxLen int64, approx bool) (deleted int64)

func (*RedisCache) ZAdd

func (r *RedisCache) ZAdd(key string, members ...*redis.Z) int64

func (*RedisCache) ZCard

func (r *RedisCache) ZCard(key string) int64

func (*RedisCache) ZCount

func (r *RedisCache) ZCount(key string, min, max string) int64

func (*RedisCache) ZRangeWithScores

func (r *RedisCache) ZRangeWithScores(key string, start, stop int64) []redis.Z

func (*RedisCache) ZRevRange

func (r *RedisCache) ZRevRange(key string, start, stop int64) []string

func (*RedisCache) ZRevRangeWithScores

func (r *RedisCache) ZRevRangeWithScores(key string, start, stop int64) []redis.Z

func (*RedisCache) ZScore

func (r *RedisCache) ZScore(key, member string) float64

type RedisCacheConfig

type RedisCacheConfig struct {
	// contains filtered or unexported fields
}

type RedisFlusher added in v1.75.3

type RedisFlusher interface {
	Del(redisPool string, keys ...string)
	PublishMap(stream string, event EventAsMap)
	Publish(stream string, event interface{})
	Flush()
	HSet(redisPool, key string, values ...interface{})
}

type RedisPipeLine added in v1.75.3

type RedisPipeLine struct {
	// contains filtered or unexported fields
}

func (*RedisPipeLine) Del added in v1.75.3

func (rp *RedisPipeLine) Del(key ...string) *PipeLineInt

func (*RedisPipeLine) Exec added in v1.75.3

func (rp *RedisPipeLine) Exec()

func (*RedisPipeLine) Executed added in v1.75.3

func (rp *RedisPipeLine) Executed() bool

func (*RedisPipeLine) Expire added in v1.76.0

func (rp *RedisPipeLine) Expire(key string, expiration time.Duration) *PipeLineBool

func (*RedisPipeLine) Get added in v1.75.3

func (rp *RedisPipeLine) Get(key string) *PipeLineGet

func (*RedisPipeLine) HIncrBy added in v1.76.0

func (rp *RedisPipeLine) HIncrBy(key, field string, incr int64) *PipeLineInt

func (*RedisPipeLine) HSet added in v1.77.1

func (rp *RedisPipeLine) HSet(key string, values ...interface{}) *PipeLineInt

func (*RedisPipeLine) Set added in v1.75.3

func (rp *RedisPipeLine) Set(key string, value interface{}, expiration time.Duration) *PipeLineStatus

func (*RedisPipeLine) XAdd added in v1.75.3

func (rp *RedisPipeLine) XAdd(stream string, values interface{}) *PipeLineString

type RedisSearch added in v1.76.0

type RedisSearch struct {
	// contains filtered or unexported fields
}

func (*RedisSearch) ForceReindex added in v1.77.1

func (r *RedisSearch) ForceReindex(index string)

func (*RedisSearch) Info added in v1.78.3

func (r *RedisSearch) Info(indexName string) *RedisSearchIndexInfo

func (*RedisSearch) ListIndices added in v1.78.3

func (r *RedisSearch) ListIndices() []string

func (*RedisSearch) Search added in v1.77.1

func (r *RedisSearch) Search(index string, query *RedisSearchQuery, pager *Pager) (total uint64, rows []*RedisSearchResult)

func (*RedisSearch) SearchKeys added in v1.77.1

func (r *RedisSearch) SearchKeys(index string, query *RedisSearchQuery, pager *Pager) (total uint64, keys []string)

func (*RedisSearch) SearchRaw added in v1.77.1

func (r *RedisSearch) SearchRaw(index string, query *RedisSearchQuery, pager *Pager) (total uint64, rows []interface{})

type RedisSearchIndex added in v1.76.0

type RedisSearchIndex struct {
	Name            string
	RedisPool       string
	Prefixes        []string
	Filter          string
	DefaultLanguage string
	LanguageField   string
	DefaultScore    float64
	ScoreField      string
	PayloadField    string
	MaxTextFields   bool
	Temporary       int
	NoOffsets       bool
	NoNHL           bool
	NoFields        bool
	NoFreqs         bool
	SkipInitialScan bool
	StopWords       []string
	Fields          []RedisSearchIndexField
	Indexer         RedisSearchIndexerFunc
}

func (*RedisSearchIndex) AddGeoField added in v1.76.0

func (rs *RedisSearchIndex) AddGeoField(name string, sortable, noindex bool)

func (*RedisSearchIndex) AddNumericField added in v1.76.0

func (rs *RedisSearchIndex) AddNumericField(name string, sortable, noindex bool)

func (*RedisSearchIndex) AddTagField added in v1.76.0

func (rs *RedisSearchIndex) AddTagField(name string, sortable, noindex bool, separator string)

func (*RedisSearchIndex) AddTextField added in v1.76.0

func (rs *RedisSearchIndex) AddTextField(name string, weight float64, sortable, noindex, nostem bool)

type RedisSearchIndexAlter added in v1.76.0

type RedisSearchIndexAlter struct {
	Query     string
	Executing bool
	Documents uint64
	Changes   []string
	Pool      string
	Execute   func()
	// contains filtered or unexported fields
}

type RedisSearchIndexField added in v1.76.0

type RedisSearchIndexField struct {
	Type         string
	Name         string
	Sortable     bool
	NoIndex      bool
	NoStem       bool
	Weight       float64
	TagSeparator string
}

type RedisSearchIndexInfo added in v1.76.0

type RedisSearchIndexInfo struct {
	Name                     string
	Options                  RedisSearchIndexInfoOptions
	Definition               RedisSearchIndexInfoDefinition
	Fields                   []RedisSearchIndexInfoField
	NumDocs                  uint64
	MaxDocID                 uint64
	NumTerms                 uint64
	NumRecords               uint64
	InvertedSzMB             float64
	TotalInvertedIndexBlocks float64
	OffsetVectorsSzMB        float64
	DocTableSizeMB           float64
	SortableValuesSizeMB     float64
	KeyTableSizeMB           float64
	RecordsPerDocAvg         int
	BytesPerRecordAvg        int
	OffsetsPerTermAvg        float64
	OffsetBitsPerRecordAvg   float64
	HashIndexingFailures     uint64
	Indexing                 bool
	PercentIndexed           float64
	StopWords                []string
}

type RedisSearchIndexInfoDefinition added in v1.76.0

type RedisSearchIndexInfoDefinition struct {
	KeyType       string
	Prefixes      []string
	Filter        string
	LanguageField string
	ScoreField    string
	PayloadField  string
	DefaultScore  float64
}

type RedisSearchIndexInfoField added in v1.76.0

type RedisSearchIndexInfoField struct {
	Name         string
	Type         string
	Weight       float64
	Sortable     bool
	NoStem       bool
	NoIndex      bool
	TagSeparator string
}

type RedisSearchIndexInfoOptions added in v1.77.2

type RedisSearchIndexInfoOptions struct {
	NoFreqs       bool
	NoOffsets     bool
	NoFields      bool
	MaxTextFields bool
}

type RedisSearchIndexPusher added in v1.77.1

type RedisSearchIndexPusher interface {
	NewDocument(key string)
	SetField(key string, value interface{})
	PushDocument()
}

type RedisSearchIndexer added in v1.77.1

type RedisSearchIndexer struct {
	// contains filtered or unexported fields
}

func NewRedisSearchIndexer added in v1.77.1

func NewRedisSearchIndexer(engine *Engine) *RedisSearchIndexer

func (*RedisSearchIndexer) DisableLoop added in v1.77.1

func (r *RedisSearchIndexer) DisableLoop()

func (*RedisSearchIndexer) Run added in v1.77.1

func (r *RedisSearchIndexer) Run(ctx context.Context)

func (*RedisSearchIndexer) SetHeartBeat added in v1.77.1

func (r *RedisSearchIndexer) SetHeartBeat(duration time.Duration, beat func())

type RedisSearchIndexerFunc added in v1.77.1

type RedisSearchIndexerFunc func(lastID uint64, pusher RedisSearchIndexPusher) (newID uint64, hasMore bool)

type RedisSearchQuery added in v1.77.1

type RedisSearchQuery struct {
	// contains filtered or unexported fields
}

func (*RedisSearchQuery) ExplainScore added in v1.77.1

func (q *RedisSearchQuery) ExplainScore() *RedisSearchQuery

func (*RedisSearchQuery) FilterFloat added in v1.77.1

func (q *RedisSearchQuery) FilterFloat(field string, value float64) *RedisSearchQuery

func (*RedisSearchQuery) FilterFloatMinMax added in v1.79.2

func (q *RedisSearchQuery) FilterFloatMinMax(field string, min, max float64) *RedisSearchQuery

func (*RedisSearchQuery) FilterGeo added in v1.77.1

func (q *RedisSearchQuery) FilterGeo(field string, lon, lat, radius float64, unit string) *RedisSearchQuery

func (*RedisSearchQuery) FilterInt added in v1.77.1

func (q *RedisSearchQuery) FilterInt(field string, value int64) *RedisSearchQuery

func (*RedisSearchQuery) FilterIntGreater added in v1.79.2

func (q *RedisSearchQuery) FilterIntGreater(field string, value int64) *RedisSearchQuery

func (*RedisSearchQuery) FilterIntGreaterEqual added in v1.79.2

func (q *RedisSearchQuery) FilterIntGreaterEqual(field string, value int64) *RedisSearchQuery

func (*RedisSearchQuery) FilterIntLess added in v1.79.2

func (q *RedisSearchQuery) FilterIntLess(field string, value int64) *RedisSearchQuery

func (*RedisSearchQuery) FilterIntLessEqual added in v1.79.2

func (q *RedisSearchQuery) FilterIntLessEqual(field string, value int64) *RedisSearchQuery

func (*RedisSearchQuery) FilterIntMinMax added in v1.79.2

func (q *RedisSearchQuery) FilterIntMinMax(field string, min, max int64) *RedisSearchQuery

func (*RedisSearchQuery) Highlight added in v1.77.1

func (q *RedisSearchQuery) Highlight(field ...string) *RedisSearchQuery

func (*RedisSearchQuery) HighlightTags added in v1.77.1

func (q *RedisSearchQuery) HighlightTags(openTag, closeTag string) *RedisSearchQuery

func (*RedisSearchQuery) InFields added in v1.77.1

func (q *RedisSearchQuery) InFields(field ...string) *RedisSearchQuery

func (*RedisSearchQuery) InKeys added in v1.77.1

func (q *RedisSearchQuery) InKeys(key ...string) *RedisSearchQuery

func (*RedisSearchQuery) InOrder added in v1.77.1

func (q *RedisSearchQuery) InOrder() *RedisSearchQuery

func (*RedisSearchQuery) Lang added in v1.77.1

func (q *RedisSearchQuery) Lang(lang string) *RedisSearchQuery

func (*RedisSearchQuery) NoStopWords added in v1.77.1

func (q *RedisSearchQuery) NoStopWords() *RedisSearchQuery

func (*RedisSearchQuery) Query added in v1.77.1

func (q *RedisSearchQuery) Query(query string) *RedisSearchQuery

func (*RedisSearchQuery) Return added in v1.77.1

func (q *RedisSearchQuery) Return(field ...string) *RedisSearchQuery

func (*RedisSearchQuery) Slop added in v1.77.1

func (q *RedisSearchQuery) Slop(slop int) *RedisSearchQuery

func (*RedisSearchQuery) Sort added in v1.77.1

func (q *RedisSearchQuery) Sort(field string, desc bool) *RedisSearchQuery

func (*RedisSearchQuery) Summarize added in v1.77.1

func (q *RedisSearchQuery) Summarize(field ...string) *RedisSearchQuery

func (*RedisSearchQuery) SummarizeOptions added in v1.77.1

func (q *RedisSearchQuery) SummarizeOptions(separator string, frags, len int) *RedisSearchQuery

func (*RedisSearchQuery) Verbatim added in v1.77.1

func (q *RedisSearchQuery) Verbatim() *RedisSearchQuery

func (*RedisSearchQuery) WithPayLoads added in v1.77.1

func (q *RedisSearchQuery) WithPayLoads() *RedisSearchQuery

func (*RedisSearchQuery) WithScores added in v1.77.1

func (q *RedisSearchQuery) WithScores() *RedisSearchQuery

type RedisSearchResult added in v1.77.1

type RedisSearchResult struct {
	Key          string
	Fields       []interface{}
	Score        float64
	ExplainScore []interface{}
	PayLoad      string
}

func (*RedisSearchResult) Value added in v1.77.1

func (r *RedisSearchResult) Value(field string) interface{}

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

func InitByYaml

func InitByYaml(yaml map[string]interface{}) (registry *Registry)

func (*Registry) RegisterClickHouse

func (r *Registry) RegisterClickHouse(url string, code ...string)

func (*Registry) RegisterElastic

func (r *Registry) RegisterElastic(url string, code ...string)

func (*Registry) RegisterElasticIndex

func (r *Registry) RegisterElasticIndex(index ElasticIndexDefinition, serverPool ...string)

func (*Registry) RegisterElasticWithTraceLog

func (r *Registry) RegisterElasticWithTraceLog(url string, code ...string)

func (*Registry) RegisterEntity

func (r *Registry) RegisterEntity(entity ...Entity)

func (*Registry) RegisterEnumMap

func (r *Registry) RegisterEnumMap(code string, val map[string]string, defaultValue string)

func (*Registry) RegisterEnumSlice

func (r *Registry) RegisterEnumSlice(code string, val []string)

func (*Registry) RegisterEnumStruct

func (r *Registry) RegisterEnumStruct(code string, val Enum)

func (*Registry) RegisterLocalCache

func (r *Registry) RegisterLocalCache(size int, code ...string)

func (*Registry) RegisterLocker

func (r *Registry) RegisterLocker(code string, redisCode string)

func (*Registry) RegisterMySQLPool

func (r *Registry) RegisterMySQLPool(dataSourceName string, code ...string)

func (*Registry) RegisterRedis

func (r *Registry) RegisterRedis(address string, db int, code ...string)

func (*Registry) RegisterRedisSearchIndex added in v1.76.0

func (r *Registry) RegisterRedisSearchIndex(index ...*RedisSearchIndex)

func (*Registry) RegisterRedisSentinel added in v1.75.3

func (r *Registry) RegisterRedisSentinel(masterName string, db int, sentinels []string, code ...string)

func (*Registry) RegisterRedisStream added in v1.75.3

func (r *Registry) RegisterRedisStream(name string, redisPool string, groups []string)

func (*Registry) SetDefaultEncoding added in v1.75.3

func (r *Registry) SetDefaultEncoding(encoding string)

func (*Registry) Validate

func (r *Registry) Validate() (ValidatedRegistry, error)

type Rows

type Rows interface {
	Next() bool
	Scan(dest ...interface{})
	Columns() []string
}

type SQLRow

type SQLRow interface {
	Scan(dest ...interface{}) error
}

type SQLRows

type SQLRows interface {
	Next() bool
	Err() error
	Close() error
	Scan(dest ...interface{}) error
	Columns() ([]string, error)
}

type SearchOptions

type SearchOptions struct {
	// contains filtered or unexported fields
}

func (*SearchOptions) AddAggregation

func (p *SearchOptions) AddAggregation(name string, aggregation elastic.Aggregation) *SearchOptions

func (*SearchOptions) AddSort

func (p *SearchOptions) AddSort(field string, ascending bool) *SearchOptions

type TableSchema

type TableSchema interface {
	GetTableName() string
	GetType() reflect.Type
	DropTable(engine *Engine)
	TruncateTable(engine *Engine)
	UpdateSchema(engine *Engine)
	UpdateSchemaAndTruncateTable(engine *Engine)
	GetMysql(engine *Engine) *DB
	GetLocalCache(engine *Engine) (cache *LocalCache, has bool)
	GetRedisCache(engine *Engine) (cache *RedisCache, has bool)
	GetReferences() []string
	GetColumns() []string
	GetUsage(registry ValidatedRegistry) map[reflect.Type][]string
	GetSchemaChanges(engine *Engine) (has bool, alters []Alter)
}

type ValidatedRegistry

type ValidatedRegistry interface {
	CreateEngine() *Engine
	GetTableSchema(entityName string) TableSchema
	GetTableSchemaForEntity(entity Entity) TableSchema
	GetSourceRegistry() *Registry
	GetEnum(code string) Enum
	GetEnums() map[string]Enum
	GetRedisStreams() map[string]map[string][]string
	GetRedisPools() []string
	GetRedisSearchIndices() map[string][]*RedisSearchIndex
	GetEntities() map[string]reflect.Type
}

type Where

type Where struct {
	// contains filtered or unexported fields
}

func NewWhere

func NewWhere(query string, parameters ...interface{}) *Where

func (*Where) Append

func (where *Where) Append(query string, parameters ...interface{})

func (*Where) GetParameters

func (where *Where) GetParameters() []interface{}

func (*Where) String

func (where *Where) String() string

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL