Skip to content

Commit

Permalink
Merge pull request #66 from UlfBj/thread-comm
Browse files Browse the repository at this point in the history
Thread comm refactoring
  • Loading branch information
UlfBj authored Jan 6, 2025
2 parents 6d06eac + b207aea commit 11561a8
Show file tree
Hide file tree
Showing 4 changed files with 199 additions and 179 deletions.
8 changes: 4 additions & 4 deletions client/client-1.0/requests.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{"request":[{"action":"get","path":"Vehicle/Cabin/Door/Row1/Right/IsOpen","requestId":"232"},
{"request":[{"action":"get","path":"Vehicle/Cabin/Door/Row1/DriverSide/IsOpen","requestId":"232"},
{"action":"get","path":"Incorrect/Path","requestId":"1957"},
{"action":"get","path":"Vehicle.Acceleration.Longitudinal","requestId":"233"},
{"action":"get","path":"Vehicle/ADAS","filter":{"type":"paths","parameter":["ABS/*","CruiseControl/Error"]},"requestId":"237"},
{"action":"set", "path":"Vehicle/Cabin/Door/Row1/Right/IsOpen", "value":"999", "requestId":"245"},
{"action":"subscribe","path":"Vehicle/Cabin/Door/Row1/Right/IsOpen","filter":{"type":"timebased","parameter":{"period":"3"}},"requestId":"246"}]}
{"action":"get","path":"Vehicle/ADAS","filter":{"variant":"paths","parameter":["ABS/*","CruiseControl/IsError"]},"requestId":"237"},
{"action":"set", "path":"Vehicle/Cabin/Door/Row1/DriverSide/IsOpen", "value":"true", "requestId":"245"},
{"action":"subscribe","path":"Vehicle/Cabin/Door/Row1/DriverSide/IsOpen","filter":{"variant":"timebased","parameter":{"period":"3"}},"requestId":"246"}]}
107 changes: 69 additions & 38 deletions server/vissv2server/serviceMgr/serviceMgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ type FeederPathElem struct {
Reference int
}
var feederPathList []FeederPathElem
var feederConnected bool

//var feederConn net.Conn
//var hostIp string
Expand Down Expand Up @@ -126,16 +127,16 @@ var IoTDBConfig = IoTDBConfiguration{

var dummyValue int // dummy value returned when DB configured to none. Counts from 0 to 999, wrap around, updated every 47 msec

func initDataServer(serviceMgrChan chan string, clientChannel chan string, backendChannel chan string) {
func initDataServer(serviceMgrChan chan map[string]interface{}, clientChannel chan map[string]interface{}, backendChannel chan map[string]interface{}) {
for {
select {
case request := <-serviceMgrChan:
utils.Info.Printf("Service mgr request: %s", request)
// utils.Info.Printf("Service mgr request: %s", request)

clientChannel <- request // forward to mgr hub,
if strings.Contains(request, "internal-killsubscriptions") == false { // no response on kill sub
if request["action"] != "internal-killsubscriptions" { // no response on kill sub
response := <-clientChannel // and wait for response
utils.Info.Printf("Service mgr response: %s", response)
// utils.Info.Printf("Service mgr response: %s", response)
serviceMgrChan <- response
}
case notification := <-backendChannel: // notification
Expand Down Expand Up @@ -395,10 +396,6 @@ func compareValues(logicOp string, latestValue string, currentValue string, diff
return false
}

func addPackage(incompleteMessage string, packName string, packValue string) string {
return incompleteMessage[:len(incompleteMessage)-1] + ", \"" + packName + "\":" + packValue + "}"
}

func deactivateSubscription(subscriptionList []SubscriptionState, subscriptionId string) (int, []SubscriptionState) {
id, _ := strconv.Atoi(subscriptionId)
index := getSubcriptionStateIndex(id, subscriptionList)
Expand Down Expand Up @@ -621,6 +618,9 @@ func setVehicleData(path string, value string) string {
utils.Error.Printf("setVehicleData:Write failed, err = %s", err)
return ""
}*/
if !feederConnected {
return ""
}
message := `{"action": "set", "data": {"path":"` + path + `", "dp":{"value":"` + value + `", "ts":"` + ts + `"}}}`
toFeeder <- message
return ts
Expand Down Expand Up @@ -921,6 +921,26 @@ func getDataPack(pathArray []string, filterList []utils.FilterObject) string {
return dataPack
}

func getDataPackMap(pathArray []string) map[string]interface{} {
dataPack := make(map[string]interface{}, 0)
if len(pathArray) > 1 {
dataPackElement := make([]interface{}, len(pathArray))
for i := 0; i < len(pathArray); i++ {
dataPackElement[i] = map[string]interface{}{
"path": pathArray[i],
"dp": string2Map(getVehicleData(pathArray[i]))["s2m"],
}
}
dataPack["dpack"] = dataPackElement
} else {
dataPack["dpack"] = map[string]interface{}{
"path": pathArray[0],
"dp": string2Map(getVehicleData(pathArray[0]))["s2m"],
}
}
return dataPack
}

func getVssPathList(host string, port int, path string) []byte {
url := "http://" + host + ":" + strconv.Itoa(port) + path
utils.Info.Printf("url = %s", url)
Expand Down Expand Up @@ -1011,11 +1031,12 @@ func feederFrontend(toFeeder chan string, fromFeederRorC chan string, fromFeeder
udsConn = utils.GetUdsConn("*", "serverFeeder")
if udsConn == nil && attempts >= 10-1 {
utils.Error.Printf("feederFrontend:Failed to UDS connect to feeder.")
return // ???
return
}
attempts++
time.Sleep(3 * time.Second)
}
feederConnected = true
utils.Info.Printf("feederFrontend:Connected to feeder.")
configureDefault(udsConn)
fromFeeder := make(chan string)
Expand Down Expand Up @@ -1224,9 +1245,11 @@ func feederReader(udsConn net.Conn, fromFeeder chan string) {
}
}

func ServiceMgrInit(mgrId int, serviceMgrChan chan string, stateStorageType string, histSupport bool, dbFile string) {
//func ServiceMgrInit(mgrId int, serviceMgrChan chan string, stateStorageType string, histSupport bool, dbFile string) {
func ServiceMgrInit(mgrId int, serviceMgrChan chan map[string]interface{}, stateStorageType string, histSupport bool, dbFile string) {
stateDbType = stateStorageType
historySupport = histSupport
feederConnected = false

utils.ReadUdsRegistrations("uds-registration.json")

Expand Down Expand Up @@ -1330,8 +1353,8 @@ func ServiceMgrInit(mgrId int, serviceMgrChan chan string, stateStorageType stri
utils.Error.Printf("Unknown state storage type = %s", stateDbType)
}

dataChan := make(chan string)
backendChan := make(chan string)
dataChan := make(chan map[string]interface{})
backendChan := make(chan map[string]interface{})
subscriptionChan := make(chan int)
historyAccessChannel = make(chan string)
initClResources()
Expand Down Expand Up @@ -1360,12 +1383,10 @@ func ServiceMgrInit(mgrId int, serviceMgrChan chan string, stateStorageType stri

for {
select {
case request := <-dataChan: // request from server core
utils.Info.Printf("Service manager: Request from Server core:%s\n", request)
case requestMap := <-dataChan: // request from server core
// utils.Info.Printf("Service manager: Request from Server core:%s\n", request)
// TODO: interact with underlying subsystem to get the value
var requestMap = make(map[string]interface{})
var responseMap = make(map[string]interface{})
utils.MapRequest(request, &requestMap)
responseMap["RouterId"] = requestMap["RouterId"]
responseMap["action"] = requestMap["action"]
responseMap["requestId"] = requestMap["requestId"]
Expand All @@ -1377,23 +1398,23 @@ func ServiceMgrInit(mgrId int, serviceMgrChan chan string, stateStorageType stri
case "set":
if strings.Contains(requestMap["path"].(string), "[") == true {
utils.SetErrorResponse(requestMap, errorResponseMap, 1, "") //invalid_data
dataChan <- utils.FinalizeMessage(errorResponseMap)
dataChan <- errorResponseMap
break
}
ts := setVehicleData(requestMap["path"].(string), requestMap["value"].(string))
if len(ts) == 0 {
utils.SetErrorResponse(requestMap, errorResponseMap, 7, "") //service_unavailable
dataChan <- utils.FinalizeMessage(errorResponseMap)
dataChan <- errorResponseMap
break
}
responseMap["ts"] = ts
dataChan <- utils.FinalizeMessage(responseMap)
dataChan <- responseMap
case "get":
pathArray := unpackPaths(requestMap["path"].(string))
if pathArray == nil {
utils.Error.Printf("Unmarshal of path array failed.")
utils.SetErrorResponse(requestMap, errorResponseMap, 1, "") //invalid_data
dataChan <- utils.FinalizeMessage(errorResponseMap)
dataChan <- errorResponseMap
break
}
var filterList []utils.FilterObject
Expand All @@ -1402,32 +1423,33 @@ func ServiceMgrInit(mgrId int, serviceMgrChan chan string, stateStorageType stri
if len(filterList) == 0 {
utils.Error.Printf("Request filter malformed.")
utils.SetErrorResponse(requestMap, errorResponseMap, 0, "") //bad_request
dataChan <- utils.FinalizeMessage(errorResponseMap)
dataChan <- errorResponseMap
break
}
}
dataPack := getDataPack(pathArray, filterList)
if len(dataPack) == 0 {
dataPack := getDataPackMap(pathArray)
/* if len(dataPack) == 0 {
utils.Info.Printf("No historic data available")
utils.SetErrorResponse(requestMap, errorResponseMap, 6, "") //unavailable_data
dataChan <- utils.FinalizeMessage(errorResponseMap)
dataChan <- errorResponseMap
break
}
dataChan <- addPackage(utils.FinalizeMessage(responseMap), "data", dataPack)
}*/
responseMap["data"] = dataPack["dpack"]
dataChan <- responseMap
case "subscribe":
var subscriptionState SubscriptionState
subscriptionState.SubscriptionId = subscriptionId
subscriptionState.RouterId = requestMap["RouterId"].(string)
subscriptionState.Path = unpackPaths(requestMap["path"].(string))
if requestMap["filter"] == nil || requestMap["filter"] == "" {
utils.SetErrorResponse(requestMap, errorResponseMap, 0, "") //bad_request
dataChan <- utils.FinalizeMessage(errorResponseMap)
dataChan <- errorResponseMap
break
}
utils.UnpackFilter(requestMap["filter"], &(subscriptionState.FilterList))
if len(subscriptionState.FilterList) == 0 {
utils.SetErrorResponse(requestMap, errorResponseMap, 1, "") //invalid_data
dataChan <- utils.FinalizeMessage(errorResponseMap)
dataChan <- errorResponseMap
}
if requestMap["gatingId"] != nil {
subscriptionState.GatingId = requestMap["gatingId"].(string)
Expand All @@ -1441,7 +1463,7 @@ func ServiceMgrInit(mgrId int, serviceMgrChan chan string, stateStorageType stri
toFeeder <- createFeederNotifyMessage(variant, subscriptionState.Path, subscriptionId)
}
subscriptionId++ // not to be incremented elsewhere
dataChan <- utils.FinalizeMessage(responseMap)
dataChan <- responseMap
case "unsubscribe":
if requestMap["subscriptionId"] != nil {
status := -1
Expand All @@ -1450,15 +1472,15 @@ func ServiceMgrInit(mgrId int, serviceMgrChan chan string, stateStorageType stri
status, subscriptionList = deactivateSubscription(subscriptionList, subscriptId)
if status != -1 {
responseMap["subscriptionId"] = subscriptId
dataChan <- utils.FinalizeMessage(responseMap)
toFeeder <- request
dataChan <- responseMap
toFeeder <- utils.FinalizeMessage(requestMap)
break
}
requestMap["subscriptionId"] = subscriptId
}
}
utils.SetErrorResponse(requestMap, errorResponseMap, 1, "") //invalid_data
dataChan <- utils.FinalizeMessage(errorResponseMap)
dataChan <- errorResponseMap
case "internal-killsubscriptions":
isRemoved := true
for isRemoved == true {
Expand All @@ -1473,12 +1495,12 @@ func ServiceMgrInit(mgrId int, serviceMgrChan chan string, stateStorageType stri
requestMap["requestId"] = nil
requestMap["subscriptionId"] = subscriptionId
utils.SetErrorResponse(requestMap, errorResponseMap, 2, "Token expired or consent cancelled.")
dataChan <- utils.FinalizeMessage(errorResponseMap)
dataChan <- errorResponseMap
_, subscriptionList = scanAndRemoveListItem(subscriptionList, routerId)
}
default:
utils.SetErrorResponse(requestMap, errorResponseMap, 1, "Unknown action") //invalid_data
dataChan <- utils.FinalizeMessage(errorResponseMap)
dataChan <- errorResponseMap
} // switch
case <-dummyTicker.C:
dummyValue++
Expand All @@ -1492,7 +1514,8 @@ func ServiceMgrInit(mgrId int, serviceMgrChan chan string, stateStorageType stri
subscriptionMap["ts"] = utils.GetRfcTime()
subscriptionMap["subscriptionId"] = strconv.Itoa(subscriptionState.SubscriptionId)
subscriptionMap["RouterId"] = subscriptionState.RouterId
backendChan <- addPackage(utils.FinalizeMessage(subscriptionMap), "data", getDataPack(subscriptionState.Path, nil))
subscriptionMap["data"] = getDataPackMap(subscriptionState.Path)["dpack"]
backendChan <- subscriptionMap
case clPack := <-CLChannel: // curve logging notification
index := getSubcriptionStateIndex(clPack.SubscriptionId, subscriptionList)
if index == -1 {
Expand All @@ -1510,7 +1533,8 @@ func ServiceMgrInit(mgrId int, serviceMgrChan chan string, stateStorageType stri
subscriptionMap["ts"] = utils.GetRfcTime()
subscriptionMap["subscriptionId"] = strconv.Itoa(subscriptionList[index].SubscriptionId)
subscriptionMap["RouterId"] = subscriptionList[index].RouterId
backendChan <- addPackage(utils.FinalizeMessage(subscriptionMap), "data", clPack.DataPack)
subscriptionMap["data"] = string2Map(clPack.DataPack)["s2m"]
backendChan <- subscriptionMap
case <-subscriptTicker.C:
if feederNotification == false { // feeder does not issue notifications
subscriptionList = checkRCFilterAndIssueMessages("", subscriptionList, backendChan)
Expand All @@ -1526,7 +1550,7 @@ func ServiceMgrInit(mgrId int, serviceMgrChan chan string, stateStorageType stri
utils.Info.Printf("Service manager exit")
}

func checkRCFilterAndIssueMessages(triggeredPath string, subscriptionList []SubscriptionState, backendChan chan string) []SubscriptionState {
func checkRCFilterAndIssueMessages(triggeredPath string, subscriptionList []SubscriptionState, backendChan chan map[string]interface{}) []SubscriptionState {
// check if range or change notification triggered
for i := range subscriptionList {
if len(triggeredPath) == 0 || triggeredPath == subscriptionList[i].Path[0] {
Expand All @@ -1539,13 +1563,20 @@ func checkRCFilterAndIssueMessages(triggeredPath string, subscriptionList []Subs
subscriptionMap["ts"] = utils.GetRfcTime()
subscriptionMap["subscriptionId"] = strconv.Itoa(subscriptionState.SubscriptionId)
subscriptionMap["RouterId"] = subscriptionState.RouterId
backendChan <- addPackage(utils.FinalizeMessage(subscriptionMap), "data", getDataPack(subscriptionList[i].Path, nil))
subscriptionMap["data"] = getDataPackMap(subscriptionList[i].Path)["dpack"]
backendChan <- subscriptionMap
}
}
}
return subscriptionList
}

func string2Map(msg string) map[string]interface{} {
var msgMap map[string]interface{}
utils.MapRequest(`{"s2m":`+msg+"}", &msgMap)
return msgMap
}

func decodeFeederMessage(feederMessage string, feederNotification bool) (string, bool) {
if len(feederMessage) == 0 {
return "", feederNotification
Expand Down
Loading

0 comments on commit 11561a8

Please sign in to comment.