inital working metl
This commit is contained in:
122
action-stdin.go
Normal file
122
action-stdin.go
Normal file
@ -0,0 +1,122 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
type ActionStdIn struct {
|
||||
ActionID uuid.UUID
|
||||
OutputSchema *Schema
|
||||
Config StdInConfig
|
||||
status ActionStatus
|
||||
bus *Bus
|
||||
}
|
||||
|
||||
type StdInConfig struct {
|
||||
ActionConfig
|
||||
HasHeader bool
|
||||
}
|
||||
|
||||
func NewStdIn(config StdInConfig, b *Bus) (ActionStdIn, error) {
|
||||
as := ActionStdIn{}
|
||||
err := as.init(config, b)
|
||||
if err != nil {
|
||||
return ActionStdIn{}, err
|
||||
}
|
||||
return as, nil
|
||||
}
|
||||
|
||||
func (s *ActionStdIn) init(config StdInConfig, b *Bus) error {
|
||||
s.OutputSchema = config.OutputSchema
|
||||
s.ActionID = config.ActionID
|
||||
s.Config = config
|
||||
s.status = ActionStatus{
|
||||
State: NotStarted,
|
||||
HandledRows: 0,
|
||||
ExpectedRows: -1,
|
||||
}
|
||||
s.bus = b
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *ActionStdIn) Run(_ ContentRow) error {
|
||||
s.status.State = Running
|
||||
rowsProcessed := int64(0)
|
||||
scanner := bufio.NewScanner(os.Stdin)
|
||||
for scanner.Scan() {
|
||||
potentialRow := scanner.Text()
|
||||
// TODO: handle headers
|
||||
if rowsProcessed <= 0 && s.Config.HasHeader {
|
||||
// currently ignoring the first row always
|
||||
rowsProcessed = rowsProcessed + 1
|
||||
continue
|
||||
}
|
||||
potentialColumns := strings.Split(potentialRow, ",")
|
||||
contentRow := ContentRow{
|
||||
Schema: s.OutputSchema,
|
||||
Values: make(map[uuid.UUID]string),
|
||||
Raw: potentialRow,
|
||||
Source: &s.ActionID,
|
||||
}
|
||||
rowValid := true
|
||||
for i, c := range potentialColumns {
|
||||
value := strings.TrimSpace(c)
|
||||
valid, err := ValidateDataType(value, s.OutputSchema.Columns[i].Type)
|
||||
if err != nil {
|
||||
rowValid = false
|
||||
}
|
||||
rowValid = rowValid && valid
|
||||
contentRow.Values[s.OutputSchema.Columns[i].ID] = value
|
||||
}
|
||||
if !rowValid {
|
||||
_ = s.publishErrorRow(contentRow)
|
||||
}
|
||||
err := s.publishRow(contentRow)
|
||||
if err != nil {
|
||||
return errors.New(fmt.Sprintf("error publishing row. ActionID: %v | %v", s.ActionID, err.Error()))
|
||||
}
|
||||
rowsProcessed = rowsProcessed + 1
|
||||
s.status.HandledRows = rowsProcessed
|
||||
}
|
||||
s.status.State = Finished
|
||||
|
||||
if err := scanner.Err(); err != nil {
|
||||
return errors.New(fmt.Sprintf("error scanning input. ActionID: %v | %v", s.ActionID, err.Error()))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *ActionStdIn) publishRow(row ContentRow) error {
|
||||
// TODO: decide on the method of transporting data to the next action
|
||||
return s.bus.Run(s.ActionID, row)
|
||||
}
|
||||
|
||||
func (s *ActionStdIn) publishErrorRow(row ContentRow) error {
|
||||
// TODO: decide on the method of transporting data to the next action
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *ActionStdIn) Validate() error {
|
||||
if len(s.Config.Inputs) <= 0 {
|
||||
return errors.New(fmt.Sprintf("error no input schema defined. ActionID: %v", s.ActionID.String()))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *ActionStdIn) GetActionID() uuid.UUID {
|
||||
return s.ActionID
|
||||
}
|
||||
|
||||
func (s *ActionStdIn) Status() ActionStatus {
|
||||
return s.status
|
||||
}
|
||||
|
||||
func (s *ActionStdIn) GetActionConfig() ActionConfig {
|
||||
return s.Config.ActionConfig
|
||||
}
|
Reference in New Issue
Block a user