100 lines
2.2 KiB
Go
100 lines
2.2 KiB
Go
package main
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"strings"
|
|
|
|
"github.com/google/uuid"
|
|
)
|
|
|
|
type ActionStdOut struct {
|
|
ActionID uuid.UUID
|
|
OutputSchema *Schema
|
|
InputSchema *Schema
|
|
Config StdOutConfig
|
|
status ActionStatus
|
|
bus *Bus
|
|
}
|
|
|
|
type StdOutConfig struct {
|
|
ActionConfig
|
|
AddHeader bool
|
|
}
|
|
|
|
func NewStdOut(config StdOutConfig, b *Bus) (ActionStdOut, error) {
|
|
as := ActionStdOut{}
|
|
err := as.init(config, b)
|
|
if err != nil {
|
|
return ActionStdOut{}, err
|
|
}
|
|
return as, nil
|
|
}
|
|
|
|
func (s *ActionStdOut) init(config StdOutConfig, b *Bus) error {
|
|
if len(config.Inputs) <= 0 {
|
|
return errors.New(fmt.Sprintf("error no input schema defined. ActionID: %v", config.ActionID.String()))
|
|
}
|
|
s.InputSchema = config.Inputs[0].InputSchema
|
|
s.OutputSchema = config.OutputSchema
|
|
s.ActionID = config.ActionID
|
|
s.Config = config
|
|
s.status = ActionStatus{
|
|
State: NotStarted,
|
|
HandledRows: 0,
|
|
ExpectedRows: -1,
|
|
}
|
|
s.bus = b
|
|
return nil
|
|
}
|
|
|
|
func (s *ActionStdOut) Run(contentRow ContentRow) error {
|
|
s.status.State = Running
|
|
if s.status.HandledRows == 0 && s.Config.AddHeader {
|
|
headerRowRaw := ""
|
|
for _, h := range s.Config.OutputSchema.Columns {
|
|
if strings.EqualFold(headerRowRaw, "") {
|
|
headerRowRaw = h.Name
|
|
} else {
|
|
headerRowRaw = headerRowRaw + "," + h.Name
|
|
}
|
|
}
|
|
fmt.Printf("%v\n", headerRowRaw)
|
|
}
|
|
fmt.Printf("%v\n", contentRow.Raw)
|
|
s.status.HandledRows = s.status.HandledRows + 1
|
|
if s.status.HandledRows == s.status.ExpectedRows {
|
|
s.status.State = Finished
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *ActionStdOut) publishRow(row ContentRow) error {
|
|
// TODO: decide on the method of transporting data to the next action
|
|
return s.bus.Run(s.ActionID, row)
|
|
}
|
|
|
|
func (s *ActionStdOut) publishErrorRow(row ContentRow) error {
|
|
// TODO: decide on the method of transporting data to the next action
|
|
return nil
|
|
}
|
|
|
|
func (s *ActionStdOut) Validate() error {
|
|
if len(s.Config.Inputs) <= 0 {
|
|
return errors.New(fmt.Sprintf("error no input schema defined. ActionID: %v", s.ActionID.String()))
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *ActionStdOut) GetActionID() uuid.UUID {
|
|
return s.ActionID
|
|
}
|
|
|
|
func (s *ActionStdOut) Status() ActionStatus {
|
|
return s.status
|
|
}
|
|
|
|
func (s *ActionStdOut) GetActionConfig() ActionConfig {
|
|
return s.Config.ActionConfig
|
|
}
|