Skip to content
This repository has been archived by the owner on Nov 8, 2023. It is now read-only.

Commit

Permalink
更新配置
Browse files Browse the repository at this point in the history
  • Loading branch information
hezhuozhuo committed Oct 11, 2023
1 parent deaf90c commit fb8170f
Showing 1 changed file with 73 additions and 9 deletions.
82 changes: 73 additions & 9 deletions services/tskv_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,15 @@ func (*TSKVService) MsgProcOther(body []byte, topic string) {
// 判断设备是否在线,不在线更新ts_kv_latest表为在线
func checkDeviceOnline(deviceId string) {
// 如果dbType为timescaledb,则不更新ts_kv_latest表
if dbType, _ := web.AppConfig.String("dbType"); dbType == "timescaledb" {
dbType := os.Getenv("TP_DB_TYPE")
if dbType == "" {
var err error
dbType, err = web.AppConfig.String("dbType")
if err != nil {
dbType = "timescaledb"
}
}
if dbType == "timescaledb" {
var count int64
// 判断5秒外设备是否在线
var currentData = time.Now().UnixMicro() - 5000000
Expand Down Expand Up @@ -362,7 +370,14 @@ func (*TSKVService) MsgProc(messages chan<- map[string]interface{}, body []byte,
sendMqtt.SendMQTT(body, topic, 0)
}
// 非系统数据库不需要入库
dbType, _ := web.AppConfig.String("dbType")
dbType := os.Getenv("TP_DB_TYPE")
if dbType == "" {
var err error
dbType, err = web.AppConfig.String("dbType")
if err != nil {
dbType = "timescaledb"
}
}
if dbType != "cassandra" {
// 入库
//存入系统时间
Expand Down Expand Up @@ -793,7 +808,14 @@ func (*TSKVService) GetTelemetry(device_ids []string, startTs int64, endTs int64
// 通过设备ID获取一段时间的数据
func (*TSKVService) GetHistoryData(device_id string, attributes []string, startTs int64, endTs int64, rate string) (map[string][]interface{}, error) {
var rsp_map = make(map[string][]interface{})
dbType, _ := web.AppConfig.String("dbType")
dbType := os.Getenv("TP_DB_TYPE")
if dbType == "" {
var err error
dbType, err = web.AppConfig.String("dbType")
if err != nil {
dbType = "timescaledb"
}
}
if dbType == "cassandra" {
// 通过grpc获取数据
request := &pb.GetDeviceAttributesHistoryRequest{
Expand Down Expand Up @@ -923,7 +945,14 @@ func (*TSKVService) Status(device_id string) (*models.TSKVLatest, int64) {

// GetCurrentData 通过设备ID和属性来获取数据
func (*TSKVService) GetCurrentData(device_id string, attributes []string) (map[string]interface{}, error) {
dbType, _ := web.AppConfig.String("dbType")
dbType := os.Getenv("TP_DB_TYPE")
if dbType == "" {
var err error
dbType, err = web.AppConfig.String("dbType")
if err != nil {
dbType = "timescaledb"
}
}
if dbType == "cassandra" {
return fetchFromCassandra(device_id, attributes)
}
Expand Down Expand Up @@ -1205,7 +1234,14 @@ func (*TSKVService) GetCurrentDataByAssetA(asset_id string) map[string]interface
// 根据设备id查询key的历史数据
func (*TSKVService) GetHistoryDataByKey(device_id string, key string, startTs int64, endTs int64, limit int64) (map[string]interface{}, error) {
var rsp_map = make(map[string]interface{})
dbType, _ := web.AppConfig.String("dbType")
dbType := os.Getenv("TP_DB_TYPE")
if dbType == "" {
var err error
dbType, err = web.AppConfig.String("dbType")
if err != nil {
dbType = "timescaledb"
}
}
if dbType == "cassandra" {
// 通过grpc获取数据
request := &pb.GetDeviceHistoryRequest{
Expand Down Expand Up @@ -1334,7 +1370,14 @@ func (*TSKVService) GetCurrentDataAndMap(device_id string, attributes []string)

logs.Info("**********************************************")
var fields []map[string]interface{}
dbType, _ := web.AppConfig.String("dbType")
dbType := os.Getenv("TP_DB_TYPE")
if dbType == "" {
var err error
dbType, err = web.AppConfig.String("dbType")
if err != nil {
dbType = "timescaledb"
}
}
if dbType == "cassandra" {
// 通过grpc获取数据
request := &pb.GetDeviceAttributesCurrentsRequest{
Expand Down Expand Up @@ -1528,7 +1571,14 @@ func (*TSKVService) DeviceOnline(device_id string, interval int64) (string, erro
// 查询设备当前值,与物模型映射,返回map列表
func (*TSKVService) GetCurrentDataAndMapList(device_id string) ([]map[string]interface{}, error) {
var fields []map[string]interface{}
dbType, _ := web.AppConfig.String("dbType")
dbType := os.Getenv("TP_DB_TYPE")
if dbType == "" {
var err error
dbType, err = web.AppConfig.String("dbType")
if err != nil {
dbType = "timescaledb"
}
}
if dbType == "cassandra" {
return fields, errors.New("cassandra不支持此接口")
}
Expand Down Expand Up @@ -1577,7 +1627,14 @@ func (*TSKVService) GetCurrentDataAndMapList(device_id string) ([]map[string]int
// 获取不聚合的数据
func (*TSKVService) GetKVDataWithNoAggregate(deviceId, key string, sTime, eTime int64) ([]map[string]interface{}, error) {

dbType, _ := web.AppConfig.String("dbType")
dbType := os.Getenv("TP_DB_TYPE")
if dbType == "" {
var err error
dbType, err = web.AppConfig.String("dbType")
if err != nil {
dbType = "timescaledb"
}
}
if dbType == "cassandra" {
var fields []map[string]interface{}
request := &pb.GetDeviceKVDataWithNoAggregateRequest{
Expand Down Expand Up @@ -1620,7 +1677,14 @@ func (*TSKVService) GetKVDataWithNoAggregate(deviceId, key string, sTime, eTime
// 获取聚合的数据
func (*TSKVService) GetKVDataWithAggregate(deviceId, key string, sTime, eTime, aggregateWindow int64, aggregateFunc string) ([]map[string]interface{}, error) {

dbType, _ := web.AppConfig.String("dbType")
dbType := os.Getenv("TP_DB_TYPE")
if dbType == "" {
var err error
dbType, err = web.AppConfig.String("dbType")
if err != nil {
dbType = "timescaledb"
}
}
if dbType == "cassandra" {
var fields []map[string]interface{}
request := &pb.GetDeviceKVDataWithAggregateRequest{
Expand Down

0 comments on commit fb8170f

Please sign in to comment.