Skip to content

Commit

Permalink
add hdfs example
Browse files Browse the repository at this point in the history
  • Loading branch information
chrislusf committed Dec 11, 2015
1 parent 3eec5cb commit 135e845
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 0 deletions.
38 changes: 38 additions & 0 deletions examples/hdfs_source/hdfs_source.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package main

import (
"flag"
"strings"

_ "github.com/chrislusf/glow/driver"
"github.com/chrislusf/glow/flow"
"github.com/chrislusf/glow/source/hdfs"
)

var ()

func main() {
flag.Parse()

f := flow.New()
hdfs.Source(
f,
"hdfs://localhost:9000/etc",
8,
).Filter(func(line string) bool {
// println("filter:", line)
return !strings.HasPrefix(line, "#")
}).Map(func(line string, ch chan string) {
for _, token := range strings.Split(line, ":") {
ch <- token
}
}).Map(func(key string) int {
// println("map:", key)
return 1
}).Reduce(func(x int, y int) int {
// println("reduce:", x+y)
return x + y
}).Map(func(x int) {
println("count:", x)
}).Run()
}
11 changes: 11 additions & 0 deletions source/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Adapter

# Purpose

This folder contains implementations for external data sources.

The different data sources are under different folders because:

1. In most cases, only a few adapters are needed for a program. Separating
them would reduce the total compiled binary size.
2. Easier to implement new adapters.
88 changes: 88 additions & 0 deletions source/hdfs/hdfs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package hdfs

import (
"bufio"
"fmt"
"log"
"strings"

"github.com/chrislusf/glow/flow"
"github.com/colinmarc/hdfs"
)

// Source lists files under a hdfs folder, and process all files
// This is provided more as an example. You can copy the code and customize
// any way you want.
func Source(f *flow.FlowContext, hdfsLocation string, shard int) *flow.Dataset {
locations, err := List(hdfsLocation)
if err != nil {
log.Fatalf("Can not list files under %s:%v", hdfsLocation, err)
}

return f.Slice(locations).Partition(shard).Map(TextFile)
}

// List generates a full list of file locations under the given
// location, which should have a prefix of hdfs://
func List(hdfsLocation string) (locations []string, err error) {

namenode, path, err := splitLocationToParts(hdfsLocation)
if err != nil {
return
}

client, err := hdfs.New(namenode)
if err != nil {
return nil, fmt.Errorf("failed to create client to %s:%v\n", namenode, err)
}

fileInfos, err := client.ReadDir("/" + path)
if err != nil {
return nil, fmt.Errorf("failed to list files under /%s:%v\n", path, err)
}

for _, fi := range fileInfos {
locations = append(locations, hdfsLocation+"/"+fi.Name())
}

return

}

func splitLocationToParts(location string) (namenode, path string, err error) {
hdfsPrefix := "hdfs://"
if !strings.HasPrefix(location, hdfsPrefix) {
return "", "", fmt.Errorf("parameter %s should start with hdfs://", location)
}

parts := strings.SplitN(location[len(hdfsPrefix):], "/", 2)
return parts[0], "/" + parts[1], nil
}

func TextFile(location string, lines chan string) {
namenode, path, err := splitLocationToParts(location)
if err != nil {
return
}

client, err := hdfs.New(namenode)
if err != nil {
log.Fatalf("failed to create client to %s:%v\n", namenode, err)
}

file, err := client.Open(path)

if err != nil {
log.Fatalf("Can not open file %s: %v", location, err)
}
defer file.Close()

scanner := bufio.NewScanner(file)
for scanner.Scan() {
lines <- scanner.Text()
}

if err := scanner.Err(); err != nil {
log.Printf("Scan file %s: %v", location, err)
}
}

0 comments on commit 135e845

Please sign in to comment.