package main
import (
"context"
"fmt"
"io"
"os"
"px.dev/pxapi"
"px.dev/pxapi/errdefs"
"px.dev/pxapi/types"
)
var (
pxl = `
import px
df = px.DataFrame('http_events')
df = df[['upid', 'req_path', 'remote_addr', 'req_method']]
df = df.head(10)
px.display(df, 'http')
`
)
func main() {
ctx := context.Background()
client, err := pxapi.NewClient(ctx, pxapi.WithAPIKey(<YOUR_API_TOKEN_STRING>))
if err != nil {
panic(err)
}
vz, err := client.NewVizierClient(ctx, <YOUR_CLUSTER_ID_STRING>)
if err != nil {
panic(err)
}
tm := &tableMux{}
resultSet, err := vz.ExecuteScript(ctx, pxl, tm)
if err != nil && err != io.EOF {
panic(err)
}
defer resultSet.Close()
if err := resultSet.Stream(); err != nil {
if errdefs.IsCompilationError(err) {
fmt.Printf("Got compiler error: \n %s\n", err.Error())
} else {
fmt.Printf("Got error : %+v, while streaming\n", err)
}
}
stats := resultSet.Stats()
fmt.Printf("Execution Time: %v\n", stats.ExecutionTime)
fmt.Printf("Bytes received: %v\n", stats.TotalBytes)
}
type tablePrinter struct{}
func (t *tablePrinter) HandleInit(ctx context.Context, metadata types.TableMetadata) error {
return nil
}
func (t *tablePrinter) HandleRecord(ctx context.Context, r *types.Record) error {
for _, d := range r.Data {
fmt.Printf("%s ", d.String())
}
fmt.Printf("\n")
return nil
}
func (t *tablePrinter) HandleDone(ctx context.Context) error {
return nil
}
type tableMux struct {
}
func (s *tableMux) AcceptTable(ctx context.Context, metadata types.TableMetadata) (pxapi.TableRecordHandler, error) {
return &tablePrinter{}, nil
}