123 lines
2.8 KiB
Go
123 lines
2.8 KiB
Go
package main
|
|
|
|
import (
|
|
"bufio"
|
|
"errors"
|
|
"fmt"
|
|
"os"
|
|
"strings"
|
|
|
|
"github.com/google/uuid"
|
|
)
|
|
|
|
type ActionStdIn struct {
|
|
ActionID uuid.UUID
|
|
OutputSchema *Schema
|
|
Config StdInConfig
|
|
status ActionStatus
|
|
bus *Bus
|
|
}
|
|
|
|
type StdInConfig struct {
|
|
ActionConfig
|
|
HasHeader bool
|
|
}
|
|
|
|
func NewStdIn(config StdInConfig, b *Bus) (ActionStdIn, error) {
|
|
as := ActionStdIn{}
|
|
err := as.init(config, b)
|
|
if err != nil {
|
|
return ActionStdIn{}, err
|
|
}
|
|
return as, nil
|
|
}
|
|
|
|
func (s *ActionStdIn) init(config StdInConfig, b *Bus) error {
|
|
s.OutputSchema = config.OutputSchema
|
|
s.ActionID = config.ActionID
|
|
s.Config = config
|
|
s.status = ActionStatus{
|
|
State: NotStarted,
|
|
HandledRows: 0,
|
|
ExpectedRows: -1,
|
|
}
|
|
s.bus = b
|
|
return nil
|
|
}
|
|
|
|
func (s *ActionStdIn) Run(_ ContentRow) error {
|
|
s.status.State = Running
|
|
rowsProcessed := int64(0)
|
|
scanner := bufio.NewScanner(os.Stdin)
|
|
for scanner.Scan() {
|
|
potentialRow := scanner.Text()
|
|
// TODO: handle headers
|
|
if rowsProcessed <= 0 && s.Config.HasHeader {
|
|
// currently ignoring the first row always
|
|
rowsProcessed = rowsProcessed + 1
|
|
continue
|
|
}
|
|
potentialColumns := strings.Split(potentialRow, ",")
|
|
contentRow := ContentRow{
|
|
Schema: s.OutputSchema,
|
|
Values: make(map[uuid.UUID]string),
|
|
Raw: potentialRow,
|
|
Source: &s.ActionID,
|
|
}
|
|
rowValid := true
|
|
for i, c := range potentialColumns {
|
|
value := strings.TrimSpace(c)
|
|
valid, err := ValidateDataType(value, s.OutputSchema.Columns[i].Type)
|
|
if err != nil {
|
|
rowValid = false
|
|
}
|
|
rowValid = rowValid && valid
|
|
contentRow.Values[s.OutputSchema.Columns[i].ID] = value
|
|
}
|
|
if !rowValid {
|
|
_ = s.publishErrorRow(contentRow)
|
|
}
|
|
err := s.publishRow(contentRow)
|
|
if err != nil {
|
|
return errors.New(fmt.Sprintf("error publishing row. ActionID: %v | %v", s.ActionID, err.Error()))
|
|
}
|
|
rowsProcessed = rowsProcessed + 1
|
|
s.status.HandledRows = rowsProcessed
|
|
}
|
|
s.status.State = Finished
|
|
|
|
if err := scanner.Err(); err != nil {
|
|
return errors.New(fmt.Sprintf("error scanning input. ActionID: %v | %v", s.ActionID, err.Error()))
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *ActionStdIn) publishRow(row ContentRow) error {
|
|
// TODO: decide on the method of transporting data to the next action
|
|
return s.bus.Run(s.ActionID, row)
|
|
}
|
|
|
|
func (s *ActionStdIn) publishErrorRow(row ContentRow) error {
|
|
// TODO: decide on the method of transporting data to the next action
|
|
return nil
|
|
}
|
|
|
|
func (s *ActionStdIn) Validate() error {
|
|
if len(s.Config.Inputs) <= 0 {
|
|
return errors.New(fmt.Sprintf("error no input schema defined. ActionID: %v", s.ActionID.String()))
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *ActionStdIn) GetActionID() uuid.UUID {
|
|
return s.ActionID
|
|
}
|
|
|
|
func (s *ActionStdIn) Status() ActionStatus {
|
|
return s.status
|
|
}
|
|
|
|
func (s *ActionStdIn) GetActionConfig() ActionConfig {
|
|
return s.Config.ActionConfig
|
|
}
|