Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 0 additions & 27 deletions .github/workflows/frontend.yml

This file was deleted.

15 changes: 13 additions & 2 deletions app/broker/api/exec/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"
"errors"
"fmt"
"net/http"
"reflect"

"github.com/gin-gonic/gin"
Expand Down Expand Up @@ -85,9 +86,20 @@ func (e *ExecuteAPI) Register(route gin.IRoutes) {
// @Router /exec [post]
func (e *ExecuteAPI) Execute(c *gin.Context) {
if err := e.deps.QueryLimiter.Do(func() error {
// FIXME: move to common pkg
defer func() {
if err := recover(); err != nil {
msg := fmt.Sprintf("%v", err)
_ = c.Error(errors.New(msg))
c.Header("Content-Type", "text/plain")
c.String(http.StatusInternalServerError, msg)
}
}()
return e.execute(c)
}); err != nil {
httppkg.Error(c, err)
_ = c.Error(err)
c.Header("Content-Type", "text/plain")
c.String(http.StatusInternalServerError, err.Error())
}
}

Expand Down Expand Up @@ -129,7 +141,6 @@ func (e *ExecuteAPI) execute(c *gin.Context) error {
if stmt == nil {
return errors.New("can't parse lin query language")
}

// set query session context
preparedStmt := &tree.PreparedStatement{
Statement: stmt,
Expand Down
7 changes: 7 additions & 0 deletions constants/render.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package constants

const (
VerticalLine = "│"
LastNode = "└─"
IntermediateNode = "├─"
)
61 changes: 61 additions & 0 deletions pkg/strutil/indent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package strutil

import (
"strings"

"github.com/lindb/lindb/constants"
)

type Indent struct {
firstLinePrefix string
nextLinesPrefix string
hasChildren bool
}

func NewIndent(level int, hasChildren bool) *Indent {
indent := indentString(level)
return &Indent{
firstLinePrefix: indent,
nextLinesPrefix: indent,
hasChildren: hasChildren,
}
}

func (i *Indent) NodeIndent() string {
return i.firstLinePrefix
}

func (i *Indent) DetailIndent() string {
indent := ""
if i.hasChildren {
indent = constants.VerticalLine
}
return i.nextLinesPrefix + pad(indent, 2)
}

func (i *Indent) ForChild(last, hasChildren bool) *Indent {
var (
first string
next string
)
if last {
first = pad(constants.LastNode, 3)
next = pad("", 3)
} else {
first = pad(constants.IntermediateNode, 3)
next = pad(constants.VerticalLine, 3)
}
return &Indent{
firstLinePrefix: i.nextLinesPrefix + first,
nextLinesPrefix: i.nextLinesPrefix + next,
hasChildren: hasChildren,
}
}

func indentString(indent int) string {
return strings.Repeat(" ", indent)
}

func pad(text string, length int) string {
return text + strings.Repeat(" ", length-len([]rune(text)))
}
4 changes: 2 additions & 2 deletions sql/execution/buffer/result_build.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,12 @@ func (rsb *ResultSetBuild) Process() {
columns[i] = timeSeries.GetValue()
}
case types.DTTimestamp:
// FIXME: maybe timestamp is nil
columns[i] = row.GetTimestamp(i).UnixMilli()
case types.DTDuration:
columns[i] = row.GetDuration(i)
default:
panic(fmt.Sprintf("build result set error, column:%v, unknown data type:%v", meta.Name, meta.DataType))
panic(fmt.Sprintf("build resultset error, column:%v, unknown data type:%v", meta.Name, meta.DataType))
}
}
rsb.resultSet.Rows = append(rsb.resultSet.Rows, columns)
Expand All @@ -130,6 +131,5 @@ func (rsb *ResultSetBuild) ResultSet() *model.ResultSet {
// waiting process result page completed
<-rsb.completed
fmt.Println("result.....")
fmt.Println(string(encoding.JSONMarshal(rsb.resultSet)))
return rsb.resultSet
}
4 changes: 4 additions & 0 deletions sql/execution/pipeline/operator/exchange/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,7 @@ func (l *LocalExchangeOperator) Run(output chan<- *types.Page) {
func (l *LocalExchangeOperator) GetLayout() []*plan.Symbol {
return l.node.GetOutputSymbols()
}

func (l *LocalExchangeOperator) Children() []operator.Operator {
return []operator.Operator{l.child}
}
4 changes: 4 additions & 0 deletions sql/execution/pipeline/operator/exchange/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,7 @@ func (op *RemoteExchangeOperator) GetLayout() []*plan.Symbol {
func (op *RemoteExchangeOperator) Complete() {
close(op.inbound)
}

func (op *RemoteExchangeOperator) Children() []operator.Operator {
return nil
}
4 changes: 4 additions & 0 deletions sql/execution/pipeline/operator/hash_aggregation_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,7 @@ func (h *HashAggregationOperator) Run(output chan<- *types.Page) {
func (h *HashAggregationOperator) GetLayout() []*plan.Symbol {
return h.node.GetOutputSymbols()
}

func (h *HashAggregationOperator) Children() []Operator {
return []Operator{h.child}
}
1 change: 1 addition & 0 deletions sql/execution/pipeline/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type Operator interface {
Run(output chan<- *types.Page)
// GetLayout returns the output layout.
GetLayout() []*plan.Symbol
Children() []Operator
}

type SourceOperator interface {
Expand Down
4 changes: 4 additions & 0 deletions sql/execution/pipeline/operator/output/result_set_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,3 +96,7 @@ func (op *ResultSetOutputOperator) Run(output chan<- *types.Page) {
func (op *ResultSetOutputOperator) GetLayout() []*plan.Symbol {
panic("result set output operator should not get layout")
}

func (op *ResultSetOutputOperator) Children() []operator.Operator {
return []operator.Operator{op.child}
}
4 changes: 4 additions & 0 deletions sql/execution/pipeline/operator/projection.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ func (h *ProjectionOperator) GetLayout() []*plan.Symbol {
return h.project.GetOutputSymbols()
}

func (h *ProjectionOperator) Children() []Operator {
return []Operator{h.child}
}

func (h *ProjectionOperator) prepare() {
h.exprCtx = expression.NewEvalContext(h.ctx)
h.exprs = make([]expression.Expression, len(h.project.Assignments))
Expand Down
4 changes: 4 additions & 0 deletions sql/execution/pipeline/operator/scan/table_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,7 @@ func (op *TableScanOperator) Run(output chan<- *types.Page) {
func (op *TableScanOperator) GetLayout() []*plan.Symbol {
return op.node.GetOutputSymbols()
}

func (op *TableScanOperator) Children() []operator.Operator {
return nil
}
4 changes: 4 additions & 0 deletions sql/execution/pipeline/operator/values.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,7 @@ func (op *ValuesOperator) Run(output chan<- *types.Page) {
func (op *ValuesOperator) GetLayout() []*plan.Symbol {
return op.node.GetOutputSymbols()
}

func (op *ValuesOperator) Children() []Operator {
return nil
}
2 changes: 1 addition & 1 deletion sql/execution/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,6 @@ func NewPipeline(taskCtx *context.TaskContext, root operator.Operator) *Pipeline
}

func (p *Pipeline) Run(output chan<- *types.Page) {
fmt.Printf("run pipeline, root=%T\n", p.root)
fmt.Printf("run pipeline, root=>\n%s\n", renderText(p.root))
p.root.Run(output)
}
24 changes: 24 additions & 0 deletions sql/execution/pipeline/render.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package pipeline

import (
"fmt"
"strings"

"github.com/lindb/lindb/pkg/strutil"
"github.com/lindb/lindb/sql/execution/pipeline/operator"
)

func renderText(root operator.Operator) string {
sb := &strings.Builder{}
return strings.TrimSuffix(writeTextOutput(sb, strutil.NewIndent(0, len(root.Children()) > 0), root), "\n")
}

func writeTextOutput(sb *strings.Builder, indent *strutil.Indent, node operator.Operator) string {
sb.WriteString(indent.NodeIndent())
sb.WriteString(strings.TrimPrefix(fmt.Sprintf("%T\n", node), "*"))
children := node.Children()
for i, child := range children {
writeTextOutput(sb, indent.ForChild(i == len(children)-1, len(child.Children()) > 0), child)
}
return sb.String()
}
8 changes: 7 additions & 1 deletion sql/execution/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
sqlContext "github.com/lindb/lindb/sql/context"
"github.com/lindb/lindb/sql/planner"
"github.com/lindb/lindb/sql/planner/plan"
"github.com/lindb/lindb/sql/planner/validate"
"github.com/lindb/lindb/sql/tree"
)

Expand Down Expand Up @@ -51,7 +52,12 @@ func (p *Planner) Plan(session *Session,

// plan query
logicalPlanner := planner.NewLogicalPlanner(plannerContext, planOptimizers())
return logicalPlanner.Plan()
plan := logicalPlanner.Plan()
v := validate.NewValidators()
if err := v.Validate(plannerContext, plan.Root); err != nil {
panic(err)
}
return plan
}

func (p *Planner) PlanDistribution(plan *plan.Plan) *plan.SubPlan {
Expand Down
9 changes: 8 additions & 1 deletion sql/planner/iterative/rule/push_timestamp.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,27 @@ import (
"github.com/lindb/lindb/sql/planner/plan"
)

// PushTimestampIntoTableScan pushes timestamp column into table scan.
type PushTimestampIntoTableScan struct {
Base[*plan.OutputNode]
}

// NewPushTimestampIntoTableScan creates a PushTimestampIntoTableScan instance.
func NewPushTimestampIntoTableScan() iterative.Rule {
rule := &PushTimestampIntoTableScan{}
rule.apply = rule.pushTimestampIntoTableScan
return rule
}

func (rule *PushTimestampIntoTableScan) pushTimestampIntoTableScan(context *iterative.Context, node *plan.OutputNode) plan.PlanNode {
// pushTimestampIntoTableScan pushes timestamp column into table scan.
func (rule *PushTimestampIntoTableScan) pushTimestampIntoTableScan(
context *iterative.Context, node *plan.OutputNode,
) plan.PlanNode {
timestamp, ok := lo.Find(node.GetOutputSymbols(), func(item *plan.Symbol) bool {
return item.DataType == types.DTTimestamp
})
if !ok {
// output node has no timestamp column
return nil
}
tableScan := iterative.ExtractTableScan(context, node)
Expand All @@ -49,6 +55,7 @@ func (rule *PushTimestampIntoTableScan) pushTimestampIntoTableScan(context *iter
if _, ok = lo.Find(tableScan.GetOutputSymbols(), func(item *plan.Symbol) bool {
return item.DataType == types.DTTimestamp
}); ok {
// table scan already has timestamp column
return nil
}
tableScan.OutputSymbols = append(tableScan.OutputSymbols, timestamp)
Expand Down
Loading
Loading