Parallel write

Speedup
This commit is contained in:
Alexander NeonXP Kiryukhin 2019-05-30 14:05:08 +03:00
parent 97fd40df4d
commit 2f4b6e9059
9 changed files with 394 additions and 323 deletions

View file

@ -8,14 +8,18 @@ Simple loader from osm dump file to mongodb. Based on https://github.com/paulmac
## Usage
`./osm2go -osmfile PATH_TO_OSM_FILE [-dbconnection mongodb://localhost:27017] [-dbname osm] [-initial=true] [-concurrency=16] [-block=1000]`
`./osm2go -osmfile=PATH_TO_OSM_FILE`
* `osmfile` required, path to *.osm or *.osm.pbf file
* `dbconnection` optional, mongodb connection string (default: `mongodb://localhost:27017`)
* `dbname` optional, mongodb database name (default: `osm`)
* `initial` optional, use insert instead upsert. Faster, but not check if item exists (default: `false`)
* `concurrency` optional, parallel read processes (default, `16`)
* `block` optional, block size to bulk write (default: `1000`)
All flags:
* `-osmfile` (required) OSM file
* `-initial` (default:false) Is initial import (uses insert, not upsert)
* `-indexes` (default:false) Create indexes (needs only first time)
* `-dbconnection` (default:"mongodb://localhost:27017") Mongo database name
* `-dbname` (default:"map") Mongo database name
* `-layers` (default:"nodes,ways,relations") Layers to import
* `-block` (default:1000) Block size to bulk write
* `-concurrency` (default:32) Concurrency read and write
## Example

2
go.mod
View file

@ -7,6 +7,8 @@ require (
github.com/gogo/protobuf v1.2.1 // indirect
github.com/golang/snappy v0.0.1 // indirect
github.com/google/go-cmp v0.3.0 // indirect
github.com/neonxp/rutina v0.4.3
github.com/paulmach/go.geojson v1.4.0
github.com/paulmach/orb v0.1.3
github.com/paulmach/osm v0.0.1
github.com/stretchr/testify v1.3.0 // indirect

4
go.sum
View file

@ -10,6 +10,10 @@ github.com/google/go-cmp v0.3.0 h1:crn/baboCvb5fXaQ0IJ1SGTsTVrWpDsCWC8EGETZijY=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/neonxp/rutina v0.4.3 h1:It7wu2L1FlPMC7UFGS7cTdMPWqd2hwL6+xP8UP72xZc=
github.com/neonxp/rutina v0.4.3/go.mod h1:QJOHIcMI4Lh4Nyyi0v119KZllW1S5KxJyy/zg5KQXno=
github.com/paulmach/go.geojson v1.4.0 h1:5x5moCkCtDo5x8af62P9IOAYGQcYHtxz2QJ3x1DoCgY=
github.com/paulmach/go.geojson v1.4.0/go.mod h1:YaKx1hKpWF+T2oj2lFJPsW/t1Q5e1jQI61eoQSTwpIs=
github.com/paulmach/orb v0.1.3 h1:Wa1nzU269Zv7V9paVEY1COWW8FCqv4PC/KJRbJSimpM=
github.com/paulmach/orb v0.1.3/go.mod h1:VFlX/8C+IQ1p6FTRRKzKoOPJnvEtA5G0Veuqwbu//Vk=
github.com/paulmach/osm v0.0.1 h1:TxU/uZnJ+ssntblY6kSieOP4tQv31VW8wfyf4L3BeKs=

14
helpers.go Normal file
View file

@ -0,0 +1,14 @@
package main
import "github.com/paulmach/osm"
func convertTags(tags osm.Tags) []Tag {
result := make([]Tag, 0, len(tags))
for _, t := range tags {
result = append(result, Tag{
Key: t.Key,
Value: t.Value,
})
}
return result
}

96
indexes.go Normal file
View file

@ -0,0 +1,96 @@
package main
import (
"context"
"log"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/x/bsonx"
)
func createIndexes(db *mongo.Database) error {
opts := options.CreateIndexes().SetMaxTime(1000)
nodes := db.Collection("nodes")
log.Println("creating indexes for nodes")
created, err := nodes.Indexes().CreateMany(context.Background(), []mongo.IndexModel{
{
Keys: bsonx.Doc{{"osm_id", bsonx.Int32(-1)}},
Options: (options.Index()).SetBackground(true).SetSparse(true).SetUnique(false),
}, {
Keys: bsonx.Doc{{"osm_id", bsonx.Int32(-1)}, {"version", bsonx.Int32(-1)}},
Options: (options.Index()).SetBackground(true).SetSparse(true).SetUnique(false),
}, {
Keys: bsonx.Doc{{"tags", bsonx.Int32(-1)}},
Options: (options.Index()).SetBackground(true).SetSparse(true),
},
}, opts)
if err != nil {
return err
}
log.Println(created)
log.Println("creating geoindexes for nodes")
if err := geoIndex(nodes, "location"); err != nil {
return err
}
log.Println("creating indexes for ways")
ways := db.Collection("ways")
created, err = ways.Indexes().CreateMany(context.Background(), []mongo.IndexModel{
{
Keys: bsonx.Doc{{"osm_id", bsonx.Int32(-1)}},
Options: (options.Index()).SetBackground(true).SetSparse(true).SetUnique(false),
}, {
Keys: bsonx.Doc{{"osm_id", bsonx.Int32(-1)}, {"version", bsonx.Int32(-1)}},
Options: (options.Index()).SetBackground(true).SetSparse(true).SetUnique(false),
}, {
Keys: bsonx.Doc{{"tags", bsonx.Int32(-1)}},
Options: (options.Index()).SetBackground(true).SetSparse(true),
},
}, opts)
if err != nil {
return err
}
log.Println(created)
relations := db.Collection("relations")
log.Println("creating geoindexes for relations")
created, err = relations.Indexes().CreateMany(context.Background(), []mongo.IndexModel{
{
Keys: bsonx.Doc{{"osm_id", bsonx.Int32(-1)}},
Options: (options.Index()).SetBackground(true).SetSparse(true).SetUnique(false),
}, {
Keys: bsonx.Doc{{"osm_id", bsonx.Int32(-1)}, {"version", bsonx.Int32(-1)}},
Options: (options.Index()).SetBackground(true).SetSparse(true).SetUnique(false),
}, {
Keys: bsonx.Doc{{"tags", bsonx.Int32(-1)}},
Options: (options.Index()).SetBackground(true).SetSparse(true),
},
{
Keys: bsonx.Doc{{"members.ref", bsonx.Int32(-1)}},
Options: (options.Index()).SetBackground(true).SetSparse(true),
},
}, opts)
if err != nil {
return err
}
log.Println(created)
if err := geoIndex(relations, "members.coords"); err != nil {
return err
}
log.Println("indexes created")
return nil
}
func geoIndex(col *mongo.Collection, key string) error {
_, err := col.Indexes().CreateOne(
context.Background(),
mongo.IndexModel{
Keys: bsonx.Doc{{
Key: key, Value: bsonx.String("2dsphere"),
}},
Options: options.Index().SetSphereVersion(2).SetSparse(true).SetBackground(true),
},
)
return err
}

308
main.go
View file

@ -4,15 +4,13 @@ import (
"context"
"flag"
"log"
"os"
"strings"
"time"
"github.com/paulmach/osm"
"github.com/paulmach/osm/osmpbf"
"go.mongodb.org/mongo-driver/bson"
"github.com/neonxp/rutina"
_ "go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/x/bsonx"
)
func main() {
@ -20,10 +18,13 @@ func main() {
dbname := flag.String("dbname", "map", "Mongo database name")
osmfile := flag.String("osmfile", "", "OSM file")
initial := flag.Bool("initial", false, "Is initial import")
indexes := flag.Bool("indexes", false, "Just create indexes")
concurrency := flag.Int("concurrency", 16, "Concurrency")
indexes := flag.Bool("indexes", false, "Create indexes")
layersString := flag.String("layers", "nodes,ways,relations", "Layers to import")
blockSize := flag.Int("block", 1000, "Block size to bulk write")
concurrency := flag.Int("concurrency", 32, "Concurrency read and write")
flag.Parse()
layers := strings.Split(*layersString, ",")
r := rutina.New()
ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
client, err := mongo.Connect(ctx, options.Client().ApplyURI(*dbconnection))
if err != nil {
@ -37,289 +38,24 @@ func main() {
log.Fatal(err)
}
log.Println("Indexes created")
return
}
log.Printf("Started import file %s to db %s", *osmfile, *dbname)
nodesCh := make(chan Node, 1)
waysCh := make(chan Way, 1)
relationsCh := make(chan Relation, 1)
if err := read(db, *osmfile, *initial, *concurrency, *blockSize); err != nil {
for i := 0; i < *concurrency; i++ {
worker := i
r.Go(func(ctx context.Context) error {
return write(ctx, db, nodesCh, waysCh, relationsCh, *initial, *blockSize, worker)
})
}
r.Go(func(ctx context.Context) error {
return read(ctx, *osmfile, nodesCh, waysCh, relationsCh, *concurrency, layers)
})
if err := r.Wait(); err != nil {
log.Fatal(err)
}
}
func read(db *mongo.Database, file string, initial bool, concurrency int, blockSize int) error {
nodes := db.Collection("nodes")
ways := db.Collection("ways")
relations := db.Collection("relations")
f, err := os.Open(file)
if err != nil {
return err
}
defer f.Close()
opts := (new(options.BulkWriteOptions)).SetOrdered(false)
nc := 0
wc := 0
rc := 0
scanner := osmpbf.New(context.Background(), f, concurrency)
defer scanner.Close()
bufferNodes := make([]mongo.WriteModel, 0, blockSize)
bufferWays := make([]mongo.WriteModel, 0, blockSize)
bufferRelations := make([]mongo.WriteModel, 0, blockSize)
for scanner.Scan() {
o := scanner.Object()
switch o := o.(type) {
case *osm.Way:
nodes := make([]int64, 0, len(o.Nodes))
for _, v := range o.Nodes {
nodes = append(nodes, int64(v.ID))
}
w := Way{
OsmID: int64(o.ID),
Tags: convertTags(o.Tags),
Nodes: nodes,
Timestamp: o.Timestamp,
Version: o.Version,
Visible: o.Visible,
}
if initial {
um := mongo.NewInsertOneModel()
um.SetDocument(w)
bufferWays = append(bufferWays, um)
} else {
um := mongo.NewUpdateOneModel()
um.SetUpsert(true)
um.SetUpdate(w)
um.SetFilter(bson.M{"osm_id": w.OsmID})
bufferWays = append(bufferWays, um)
}
wc++
case *osm.Node:
n := Node{
OsmID: int64(o.ID),
Location: Coords{
Type: "Point",
Coordinates: []float64{
o.Lon,
o.Lat,
}},
Tags: convertTags(o.Tags),
Version: o.Version,
Timestamp: o.Timestamp,
Visible: o.Visible,
}
if initial {
um := mongo.NewInsertOneModel()
um.SetDocument(n)
bufferNodes = append(bufferNodes, um)
} else {
um := mongo.NewUpdateOneModel()
um.SetUpsert(true)
um.SetUpdate(n)
um.SetFilter(bson.M{"osm_id": n.OsmID})
bufferNodes = append(bufferNodes, um)
}
nc++
case *osm.Relation:
members := make([]Member, len(o.Members))
for _, v := range o.Members {
members = append(members, Member{
Type: v.Type,
Version: v.Version,
Orientation: v.Orientation,
Ref: v.Ref,
Role: v.Role,
Location: Coords{
Type: "Point",
Coordinates: []float64{
v.Lon,
v.Lat,
}},
})
}
r := Relation{
OsmID: int64(o.ID),
Tags: convertTags(o.Tags),
Version: o.Version,
Timestamp: o.Timestamp,
Visible: o.Visible,
Members: members,
}
if initial {
um := mongo.NewInsertOneModel()
um.SetDocument(r)
bufferRelations = append(bufferRelations, um)
} else {
um := mongo.NewUpdateOneModel()
um.SetUpsert(true)
um.SetUpdate(r)
um.SetFilter(bson.M{"osm_id": r.OsmID})
bufferRelations = append(bufferRelations, um)
}
rc++
}
if len(bufferNodes) == blockSize {
if _, err := nodes.BulkWrite(context.Background(), bufferNodes, opts); err != nil {
return err
}
bufferNodes = make([]mongo.WriteModel, 0, blockSize)
log.Printf("Nodes: %d Ways: %d Relations: %d", nc, wc, rc)
}
if len(bufferWays) == blockSize {
if _, err := nodes.BulkWrite(context.Background(), bufferWays, opts); err != nil {
return err
}
bufferWays = make([]mongo.WriteModel, 0, blockSize)
log.Printf("Nodes: %d Ways: %d Relations: %d", nc, wc, rc)
}
if len(bufferRelations) == blockSize {
if _, err := nodes.BulkWrite(context.Background(), bufferRelations, opts); err != nil {
return err
}
bufferRelations = make([]mongo.WriteModel, 0, blockSize)
log.Printf("Nodes: %d Ways: %d Relations: %d", nc, wc, rc)
}
}
if len(bufferNodes) != 0 {
if _, err := nodes.BulkWrite(context.Background(), bufferNodes, opts); err != nil {
return err
}
log.Printf("Nodes: %d Ways: %d Relations: %d", nc, wc, rc)
}
if len(bufferWays) != 0 {
if _, err := ways.BulkWrite(context.Background(), bufferWays, opts); err != nil {
return err
}
log.Printf("Nodes: %d Ways: %d Relations: %d", nc, wc, rc)
}
if len(bufferRelations) != 0 {
if _, err := relations.BulkWrite(context.Background(), bufferRelations, opts); err != nil {
return err
}
log.Printf("Nodes: %d Ways: %d Relations: %d", nc, wc, rc)
}
log.Println("Import done")
scanErr := scanner.Err()
if scanErr != nil {
return scanErr
}
return nil
}
func createIndexes(db *mongo.Database) error {
opts := options.CreateIndexes().SetMaxTime(1000)
nodes := db.Collection("nodes")
log.Println("creating indexes for nodes")
created, err := nodes.Indexes().CreateMany(context.Background(), []mongo.IndexModel{
{
Keys: bsonx.Doc{{"osm_id", bsonx.Int32(-1)}},
Options: (options.Index()).SetBackground(true).SetSparse(true).SetUnique(false),
},
{
Keys: bsonx.Doc{
{"tags.key", bsonx.Int32(-1)},
{"tags.value", bsonx.Int32(-1)},
},
Options: (options.Index()).SetBackground(true).SetSparse(true),
},
}, opts)
if err != nil {
return err
}
log.Println(created)
log.Println("creating geoindexes for nodes")
if err := geoIndex(nodes, "location"); err != nil {
return err
}
log.Println("creating indexes for ways")
ways := db.Collection("ways")
created, err = ways.Indexes().CreateMany(context.Background(), []mongo.IndexModel{
{
Keys: bsonx.Doc{{"osm_id", bsonx.Int32(-1)}},
Options: (options.Index()).SetBackground(true).SetSparse(true).SetUnique(false),
},
{
Keys: bsonx.Doc{
{"tags.key", bsonx.Int32(-1)},
{"tags.value", bsonx.Int32(-1)},
},
Options: (options.Index()).SetBackground(true).SetSparse(true),
},
}, opts)
if err != nil {
return err
}
log.Println(created)
relations := db.Collection("relations")
log.Println("creating geoindexes for relations")
created, err = relations.Indexes().CreateMany(context.Background(), []mongo.IndexModel{
{
Keys: bsonx.Doc{{"osm_id", bsonx.Int32(-1)}},
Options: (options.Index()).SetBackground(true).SetSparse(true).SetUnique(false),
},
{
Keys: bsonx.Doc{
{"tags.key", bsonx.Int32(-1)},
{"tags.value", bsonx.Int32(-1)},
},
Options: (options.Index()).SetBackground(true).SetSparse(true),
},
{
Keys: bsonx.Doc{{"members.ref", bsonx.Int32(-1)}},
Options: (options.Index()).SetBackground(true).SetSparse(true),
},
}, opts)
if err != nil {
return err
}
log.Println(created)
if err := geoIndex(relations, "members.coords"); err != nil {
return err
}
log.Println("indexes created")
return nil
}
func convertTags(tags osm.Tags) []Tag {
result := make([]Tag, 0, len(tags))
for _, t := range tags {
result = append(result, Tag{
Key: t.Key,
Value: t.Value,
})
}
return result
}
func simpleIndex(col *mongo.Collection, keys []string, unique bool) error {
idxKeys := bsonx.Doc{}
for _, e := range keys {
idxKeys.Append(e, bsonx.Int32(1))
}
_, err := col.Indexes().CreateOne(
context.Background(),
mongo.IndexModel{
Keys: idxKeys,
Options: options.Index().SetUnique(unique).SetSparse(true).SetBackground(true),
},
)
return err
}
func geoIndex(col *mongo.Collection, key string) error {
_, err := col.Indexes().CreateOne(
context.Background(),
mongo.IndexModel{
Keys: bsonx.Doc{{
Key: key, Value: bsonx.Int32(1),
}},
Options: options.Index().SetSphereVersion(2).SetSparse(true).SetBackground(true),
},
)
return err
}

View file

@ -9,54 +9,54 @@ import (
)
type Coords struct {
Type string `bson:"type"`
Coordinates []float64 `bson:"coordinates"`
Type string `json:"type" bson:"type"`
Coordinates []float64 `json:"coordinates" bson:"coordinates"`
}
type Node struct {
ID primitive.ObjectID `bson:"_id,omitempty"`
OsmID int64 `bson:"osm_id"`
Visible bool `bson:"visible"`
Version int `bson:"version,omitempty"`
Timestamp time.Time `bson:"timestamp"`
Tags []Tag `bson:"tags,omitempty"`
Location Coords `bson:"location"`
ID primitive.ObjectID `json:"_id,omitempty" bson:"_id,omitempty"`
OsmID int64 `json:"osm_id" bson:"osm_id"`
Visible bool `json:"visible" bson:"visible"`
Version int `json:"version,omitempty" bson:"version,omitempty"`
Timestamp time.Time `json:"timestamp" bson:"timestamp"`
Tags []Tag `json:"tags,omitempty" bson:"tags,omitempty"`
Location Coords `json:"location" bson:"location"`
}
type Way struct {
ID primitive.ObjectID `bson:"_id,omitempty"`
OsmID int64 `bson:"osm_id"`
Visible bool `bson:"visible"`
Version int `bson:"version"`
Timestamp time.Time `bson:"timestamp"`
Nodes []int64 `bson:"nodes"`
Tags []Tag `bson:"tags"`
ID primitive.ObjectID `json:"_id,omitempty" bson:"_id,omitempty"`
OsmID int64 `json:"osm_id" bson:"osm_id"`
Visible bool `json:"visible" bson:"visible"`
Version int `json:"version" bson:"version"`
Timestamp time.Time `json:"timestamp" bson:"timestamp"`
Nodes []int64 `json:"nodes" bson:"nodes"`
Tags []Tag `json:"tags" bson:"tags"`
}
type Relation struct {
ID primitive.ObjectID `bson:"_id,omitempty"`
OsmID int64 `bson:"osm_id"`
Visible bool `bson:"visible"`
Version int `bson:"version"`
Timestamp time.Time `bson:"timestamp"`
Members []Member `bson:"members"`
Tags []Tag `bson:"tags"`
ID primitive.ObjectID `json:"_id,omitempty" bson:"_id,omitempty"`
OsmID int64 `json:"osm_id" bson:"osm_id"`
Visible bool `json:"visible" bson:"visible"`
Version int `json:"version" bson:"version"`
Timestamp time.Time `json:"timestamp" bson:"timestamp"`
Members []Member `json:"members" bson:"members"`
Tags []Tag `json:"tags" bson:"tags"`
}
type Member struct {
Type osm.Type `bson:"type"`
Ref int64 `bson:"ref"`
Role string `bson:"role"`
Type osm.Type `json:"type" bson:"type"`
Ref int64 `json:"ref" bson:"ref"`
Role string `json:"role" bson:"role"`
Version int
Location Coords `bson:"location"`
Location *Coords `json:"location,omitempty" bson:"location,omitempty"`
// Orientation is the direction of the way around a ring of a multipolygon.
// Only valid for multipolygon or boundary relations.
Orientation orb.Orientation `bson:"orienation,omitempty"`
Orientation orb.Orientation `json:"orienation,omitempty" bson:"orienation,omitempty"`
}
type Tag struct {
Key string `bson:"key"`
Value string `bson:"value"`
Key string `json:"key" bson:"key"`
Value string `json:"value" bson:"value"`
}

114
reader.go Normal file
View file

@ -0,0 +1,114 @@
package main
import (
"context"
"log"
"os"
"github.com/paulmach/osm"
"github.com/paulmach/osm/osmpbf"
)
func read(ctx context.Context, file string, nodesCh chan Node, waysCh chan Way, relationsCh chan Relation, concurrency int, layers []string) error {
f, err := os.Open(file)
if err != nil {
return err
}
defer f.Close()
scanner := osmpbf.New(context.Background(), f, concurrency)
defer scanner.Close()
layersToImport := map[string]bool{
"ways": false,
"nodes": false,
"relations": false,
}
for _, l := range layers {
layersToImport[l] = true
}
for scanner.Scan() {
if ctx.Err() != nil {
return ctx.Err()
}
o := scanner.Object()
switch o := o.(type) {
case *osm.Way:
if !layersToImport["ways"] {
continue
}
nodes := make([]int64, 0, len(o.Nodes))
for _, v := range o.Nodes {
nodes = append(nodes, int64(v.ID))
}
w := Way{
OsmID: int64(o.ID),
Tags: convertTags(o.Tags),
Nodes: nodes,
Timestamp: o.Timestamp,
Version: o.Version,
Visible: o.Visible,
}
waysCh <- w
case *osm.Node:
if !layersToImport["nodes"] {
continue
}
n := Node{
OsmID: int64(o.ID),
Location: Coords{
Type: "Point",
Coordinates: []float64{
o.Lon,
o.Lat,
}},
Tags: convertTags(o.Tags),
Version: o.Version,
Timestamp: o.Timestamp,
Visible: o.Visible,
}
nodesCh <- n
case *osm.Relation:
if !layersToImport["relations"] {
continue
}
members := make([]Member, 0, len(o.Members))
for _, v := range o.Members {
var location *Coords
if v.Lat != 0.0 && v.Lon != 0.0 {
location = &Coords{
Type: "Point",
Coordinates: []float64{
v.Lon,
v.Lat,
}}
}
members = append(members, Member{
Type: v.Type,
Version: v.Version,
Orientation: v.Orientation,
Ref: v.Ref,
Role: v.Role,
Location: location,
})
}
r := Relation{
OsmID: int64(o.ID),
Tags: convertTags(o.Tags),
Version: o.Version,
Timestamp: o.Timestamp,
Visible: o.Visible,
Members: members,
}
relationsCh <- r
}
}
log.Println("Read done")
scanErr := scanner.Err()
if scanErr != nil {
return scanErr
}
return nil
}

101
writer.go Normal file
View file

@ -0,0 +1,101 @@
package main
import (
"context"
"log"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
func write(ctx context.Context, db *mongo.Database, nodesCh chan Node, waysCh chan Way, relationsCh chan Relation, initial bool, blockSize int, worker int) error {
nodes := db.Collection("nodes")
ways := db.Collection("ways")
relations := db.Collection("relations")
opts := (new(options.BulkWriteOptions)).SetOrdered(false)
nodesBuffer := make([]mongo.WriteModel, 0, blockSize)
waysBuffer := make([]mongo.WriteModel, 0, blockSize)
relationsBuffer := make([]mongo.WriteModel, 0, blockSize)
nc := 0
wc := 0
rc := 0
for {
select {
case w := <-waysCh:
if initial {
um := mongo.NewInsertOneModel()
um.SetDocument(w)
waysBuffer = append(waysBuffer, um)
} else {
um := mongo.NewUpdateOneModel()
um.SetUpsert(true)
um.SetUpdate(w)
um.SetFilter(bson.M{"osm_id": w.OsmID})
waysBuffer = append(waysBuffer, um)
}
case n := <-nodesCh:
if initial {
um := mongo.NewInsertOneModel()
um.SetDocument(n)
nodesBuffer = append(nodesBuffer, um)
} else {
um := mongo.NewUpdateOneModel()
um.SetUpsert(true)
um.SetUpdate(n)
um.SetFilter(bson.M{"osm_id": n.OsmID})
nodesBuffer = append(nodesBuffer, um)
}
case r := <-relationsCh:
if initial {
um := mongo.NewInsertOneModel()
um.SetDocument(r)
relationsBuffer = append(relationsBuffer, um)
} else {
um := mongo.NewUpdateOneModel()
um.SetUpsert(true)
um.SetUpdate(r)
um.SetFilter(bson.M{"osm_id": r.OsmID})
relationsBuffer = append(relationsBuffer, um)
}
case <-ctx.Done():
log.Printf("[%d] saving last info in buffers...", worker)
if _, err := nodes.BulkWrite(context.Background(), nodesBuffer, opts); err != nil {
return err
}
if _, err := ways.BulkWrite(context.Background(), waysBuffer, opts); err != nil {
return err
}
if _, err := relations.BulkWrite(context.Background(), relationsBuffer, opts); err != nil {
return err
}
log.Printf("[%d] Done", worker)
return nil
}
if len(nodesBuffer) == blockSize {
nc++
log.Printf("[%d] nodes %d ways %d relations %d", worker, nc, wc, rc)
if _, err := nodes.BulkWrite(context.Background(), nodesBuffer, opts); err != nil {
return err
}
nodesBuffer = make([]mongo.WriteModel, 0)
}
if len(waysBuffer) == blockSize {
wc++
log.Printf("[%d] nodes %d ways %d relations %d", worker, nc, wc, rc)
if _, err := ways.BulkWrite(context.Background(), waysBuffer, opts); err != nil {
return err
}
waysBuffer = make([]mongo.WriteModel, 0)
}
if len(relationsBuffer) == blockSize {
rc++
log.Printf("[%d] nodes %d ways %d relations %d", worker, nc, wc, rc)
if _, err := relations.BulkWrite(context.Background(), relationsBuffer, opts); err != nil {
return err
}
relationsBuffer = make([]mongo.WriteModel, 0)
}
}
}