package main import ( "bufio" "errors" "fmt" "os" "strings" "github.com/google/uuid" ) type ActionStdIn struct { ActionID uuid.UUID OutputSchema *Schema Config StdInConfig status ActionStatus bus *Bus } type StdInConfig struct { ActionConfig HasHeader bool } func NewStdIn(config StdInConfig, b *Bus) (ActionStdIn, error) { as := ActionStdIn{} err := as.init(config, b) if err != nil { return ActionStdIn{}, err } return as, nil } func (s *ActionStdIn) init(config StdInConfig, b *Bus) error { s.OutputSchema = config.OutputSchema s.ActionID = config.ActionID s.Config = config s.status = ActionStatus{ State: NotStarted, HandledRows: 0, ExpectedRows: -1, } s.bus = b return nil } func (s *ActionStdIn) Run(_ ContentRow) error { s.status.State = Running rowsProcessed := int64(0) scanner := bufio.NewScanner(os.Stdin) for scanner.Scan() { potentialRow := scanner.Text() // TODO: handle headers if rowsProcessed <= 0 && s.Config.HasHeader { // currently ignoring the first row always rowsProcessed = rowsProcessed + 1 continue } potentialColumns := strings.Split(potentialRow, ",") contentRow := ContentRow{ Schema: s.OutputSchema, Values: make(map[uuid.UUID]string), Raw: potentialRow, Source: &s.ActionID, } rowValid := true for i, c := range potentialColumns { value := strings.TrimSpace(c) valid, err := ValidateDataType(value, s.OutputSchema.Columns[i].Type) if err != nil { rowValid = false } rowValid = rowValid && valid contentRow.Values[s.OutputSchema.Columns[i].ID] = value } if !rowValid { _ = s.publishErrorRow(contentRow) } err := s.publishRow(contentRow) if err != nil { return errors.New(fmt.Sprintf("error publishing row. ActionID: %v | %v", s.ActionID, err.Error())) } rowsProcessed = rowsProcessed + 1 s.status.HandledRows = rowsProcessed } s.status.State = Finished if err := scanner.Err(); err != nil { return errors.New(fmt.Sprintf("error scanning input. ActionID: %v | %v", s.ActionID, err.Error())) } return nil } func (s *ActionStdIn) publishRow(row ContentRow) error { // TODO: decide on the method of transporting data to the next action return s.bus.Run(s.ActionID, row) } func (s *ActionStdIn) publishErrorRow(row ContentRow) error { // TODO: decide on the method of transporting data to the next action return nil } func (s *ActionStdIn) Validate() error { if len(s.Config.Inputs) <= 0 { return errors.New(fmt.Sprintf("error no input schema defined. ActionID: %v", s.ActionID.String())) } return nil } func (s *ActionStdIn) GetActionID() uuid.UUID { return s.ActionID } func (s *ActionStdIn) Status() ActionStatus { return s.status } func (s *ActionStdIn) GetActionConfig() ActionConfig { return s.Config.ActionConfig }