Files
metl/action-stdin.go
2021-05-23 22:15:30 -06:00

123 lines
2.8 KiB
Go

package main
import (
"bufio"
"errors"
"fmt"
"os"
"strings"
"github.com/google/uuid"
)
type ActionStdIn struct {
ActionID uuid.UUID
OutputSchema *Schema
Config StdInConfig
status ActionStatus
bus *Bus
}
type StdInConfig struct {
ActionConfig
HasHeader bool
}
func NewStdIn(config StdInConfig, b *Bus) (ActionStdIn, error) {
as := ActionStdIn{}
err := as.init(config, b)
if err != nil {
return ActionStdIn{}, err
}
return as, nil
}
func (s *ActionStdIn) init(config StdInConfig, b *Bus) error {
s.OutputSchema = config.OutputSchema
s.ActionID = config.ActionID
s.Config = config
s.status = ActionStatus{
State: NotStarted,
HandledRows: 0,
ExpectedRows: -1,
}
s.bus = b
return nil
}
func (s *ActionStdIn) Run(_ ContentRow) error {
s.status.State = Running
rowsProcessed := int64(0)
scanner := bufio.NewScanner(os.Stdin)
for scanner.Scan() {
potentialRow := scanner.Text()
// TODO: handle headers
if rowsProcessed <= 0 && s.Config.HasHeader {
// currently ignoring the first row always
rowsProcessed = rowsProcessed + 1
continue
}
potentialColumns := strings.Split(potentialRow, ",")
contentRow := ContentRow{
Schema: s.OutputSchema,
Values: make(map[uuid.UUID]string),
Raw: potentialRow,
Source: &s.ActionID,
}
rowValid := true
for i, c := range potentialColumns {
value := strings.TrimSpace(c)
valid, err := ValidateDataType(value, s.OutputSchema.Columns[i].Type)
if err != nil {
rowValid = false
}
rowValid = rowValid && valid
contentRow.Values[s.OutputSchema.Columns[i].ID] = value
}
if !rowValid {
_ = s.publishErrorRow(contentRow)
}
err := s.publishRow(contentRow)
if err != nil {
return errors.New(fmt.Sprintf("error publishing row. ActionID: %v | %v", s.ActionID, err.Error()))
}
rowsProcessed = rowsProcessed + 1
s.status.HandledRows = rowsProcessed
}
s.status.State = Finished
if err := scanner.Err(); err != nil {
return errors.New(fmt.Sprintf("error scanning input. ActionID: %v | %v", s.ActionID, err.Error()))
}
return nil
}
func (s *ActionStdIn) publishRow(row ContentRow) error {
// TODO: decide on the method of transporting data to the next action
return s.bus.Run(s.ActionID, row)
}
func (s *ActionStdIn) publishErrorRow(row ContentRow) error {
// TODO: decide on the method of transporting data to the next action
return nil
}
func (s *ActionStdIn) Validate() error {
if len(s.Config.Inputs) <= 0 {
return errors.New(fmt.Sprintf("error no input schema defined. ActionID: %v", s.ActionID.String()))
}
return nil
}
func (s *ActionStdIn) GetActionID() uuid.UUID {
return s.ActionID
}
func (s *ActionStdIn) Status() ActionStatus {
return s.status
}
func (s *ActionStdIn) GetActionConfig() ActionConfig {
return s.Config.ActionConfig
}