Files
metl/bus.go
2021-05-23 22:15:30 -06:00

155 lines
4.0 KiB
Go

package main
import (
"errors"
"fmt"
"strings"
"github.com/google/uuid"
)
type MetlConfig struct {
ActionConfigs []parallelActionConfig `json:"actionConfigs"`
}
type parallelActionConfig struct {
ActionConfig
StdInConfig
ModifyColumnConfig
StdOutConfig
}
type Bus struct {
actions map[uuid.UUID]Action
events map[uuid.UUID][]func(contentRow ContentRow) error
}
func NewBus(config MetlConfig) (Bus, error) {
b := Bus{}
err := b.init(config)
if err != nil {
return Bus{}, err
}
return b, nil
}
func wrap(err error, message string) error {
return errors.New(fmt.Sprintf("%v | %v", message, err.Error()))
}
func (b *Bus) init(config MetlConfig) error {
b.events = make(map[uuid.UUID][]func(contentRow ContentRow) error)
b.actions = make(map[uuid.UUID]Action)
for _, ac := range config.ActionConfigs {
// setup the event list for this action (it will contain all the Run funcs for actions that depend on this action
_, ok := b.events[ac.ActionID]
if !ok {
b.events[ac.ActionID] = make([]func(contentRow ContentRow) error, 0)
}
var action Action
switch ac.Type {
case StdIn:
stdIn, err := NewStdIn(StdInConfig{
ActionConfig: ac.ActionConfig,
HasHeader: ac.HasHeader,
}, b)
if err != nil {
return errors.New(fmt.Sprintf("error initializing StdIn Action. ActionID: %v | %v", ac.ActionID, err.Error()))
}
action = &stdIn
case ModifyColumn:
modCol, err := NewModifyColumn(ModifyColumnConfig{
ActionConfig: ac.ActionConfig,
}, b)
if err != nil {
return errors.New(fmt.Sprintf("error initializing ModifyColumn Action. ActionID: %v | %v", ac.ActionID, err.Error()))
}
action = &modCol
case StdOut:
stdOut, err := NewStdOut(StdOutConfig{
ActionConfig: ac.ActionConfig,
AddHeader: ac.AddHeader,
}, b)
if err != nil {
return errors.New(fmt.Sprintf("error initializing StdOut Action. ActionID: %v | %v", ac.ActionID, err.Error()))
}
action = &stdOut
// TODO: add other actions here
}
b.actions[ac.ActionID] = action
// add this action's Run func to all events it is dependent on
for _, input := range ac.Inputs {
_, ok := b.events[*input.ActionID]
if !ok {
b.events[*input.ActionID] = make([]func(contentRow ContentRow) error, 0)
}
if input.ActionID != nil {
b.events[*input.ActionID] = append(b.events[*input.ActionID], action.Run)
}
}
}
return nil
}
func (b *Bus) ValidateAll() (bool, error) {
problems := make(map[uuid.UUID]string)
for key, a := range b.actions {
err := a.Validate()
if err != nil {
problems[key] = err.Error()
}
// TODO: check the input schemas against the actions they are coming from
}
if len(problems) != 0 {
errorOut := ""
for key, prob := range problems {
if !strings.EqualFold("", errorOut) {
errorOut = fmt.Sprintf("%v | %v: %v", errorOut, key, prob)
} else {
errorOut = fmt.Sprintf("%v: %v", key, prob)
}
}
return false, errors.New(errorOut)
}
return true, nil
}
func (b *Bus) Start() error {
for _, a := range b.actions {
aConfig := a.GetActionConfig()
if len(aConfig.Inputs) <= 0 {
err := a.Run(ContentRow{})
if err != nil {
return errors.New(fmt.Sprintf("error running Action. ActionID: %v | %v", a.GetActionID(), err.Error()))
}
}
}
return nil
}
func (b *Bus) Run(actionID uuid.UUID, row ContentRow) error {
for _, e := range b.events[actionID] {
err := e(row)
if err != nil {
return errors.New(fmt.Sprintf("error running Action. ActionID: %v | %v", actionID, err.Error()))
}
}
return nil
}
/*
Bus will manage the transporting of the data from an actions output to all actions that are dependent upon that data.
If an action does not have a defined action it depends on it will be executed right away using the Run() func and will be
passed a zeroed ContentRow.
Otherwise, all actions will have their run functions stored in arrays within a map of action uuids. When a certain action
publishes a row then we loop through the array of stored functions that are waiting for data from that action.
*/