Skip to content

Commit f96b5cb

Browse files
authored
submit workflow logs (#1707)
1 parent f6c69bd commit f96b5cb

1 file changed

Lines changed: 14 additions & 13 deletions

File tree

pkg/sql/executor_ir.go

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -79,28 +79,29 @@ func RunSQLProgram(sqlProgram string, modelDir string, session *pb.Session) *pip
7979
// *pipe.Reader, and remove the calls to log.Printf.
8080
func SubmitWorkflow(sqlProgram string, modelDir string, session *pb.Session) *pipe.Reader {
8181
rd, wr := pipe.Pipe()
82+
startTime := time.Now().Second()
8283
go func() {
8384
defer wr.Close()
84-
err := submitWorkflow(wr, sqlProgram, modelDir, session)
85-
if err != nil {
86-
if err != pipe.ErrClosedPipe {
87-
if err := wr.Write(err); err != nil {
88-
log.Printf("submit workflow error(piping): %v", err)
89-
}
85+
wfID, err := submitWorkflow(wr, sqlProgram, modelDir, session)
86+
defer log.Printf("Submit SQL program: %s\nuserID: %s\nworkflowID: %s\nspent: %d\nerror:%v", sqlProgram, session.UserId, wfID, time.Now().Second()-startTime, err)
87+
if err != nil && err != pipe.ErrClosedPipe {
88+
if err := wr.Write(err); err != nil {
89+
log.Printf("submit workflow error(piping): %v", err)
9090
}
9191
}
92+
9293
}()
9394
return rd
9495
}
9596

96-
func submitWorkflow(wr *pipe.Writer, sqlProgram string, modelDir string, session *pb.Session) error {
97+
func submitWorkflow(wr *pipe.Writer, sqlProgram string, modelDir string, session *pb.Session) (string, error) {
9798
driverName, _, err := database.ParseURL(session.DbConnStr)
9899
if err != nil {
99-
return err
100+
return "", err
100101
}
101102
sqls, err := parser.Parse(driverName, sqlProgram)
102103
if err != nil {
103-
return err
104+
return "", err
104105
}
105106
// TODO(yancey1989): separate the IR generation to multiple steps:
106107
// For example, a TRAIN statement:
@@ -124,7 +125,7 @@ func submitWorkflow(wr *pipe.Writer, sqlProgram string, modelDir string, session
124125
r = &standardSQL
125126
}
126127
if err != nil {
127-
return err
128+
return "", err
128129
}
129130
r.SetOriginalSQL(sql.Original)
130131
spIRs = append(spIRs, r)
@@ -133,17 +134,17 @@ func submitWorkflow(wr *pipe.Writer, sqlProgram string, modelDir string, session
133134
// 1. call codegen_couler.go to generate Argo workflow YAML
134135
argoFileName, err := couler.RunAndWriteArgoFile(spIRs, session)
135136
if err != nil {
136-
return err
137+
return "", err
137138
}
138139
defer os.RemoveAll(argoFileName)
139140

140141
// 2. submit the argo workflow
141142
workflowID, err := argo.Submit(argoFileName)
142143
if err != nil {
143-
return err
144+
return "", err
144145
}
145146

146-
return wr.Write(WorkflowJob{
147+
return workflowID, wr.Write(WorkflowJob{
147148
JobID: workflowID,
148149
})
149150
}

0 commit comments

Comments
 (0)