Store objects on single collections (overpass like json)

Filter deleted objects
This commit is contained in:
Alexander NeonXP Kiryukhin 2019-06-03 13:59:49 +03:00
parent 2f4b6e9059
commit c362271047
6 changed files with 79 additions and 199 deletions

View file

@ -4,22 +4,22 @@ Simple loader from osm dump file to mongodb. Based on https://github.com/paulmac
## Build ## Build
`go build -o osm2go` `go build -o osm2mgo`
## Usage ## Usage
`./osm2go -osmfile=PATH_TO_OSM_FILE` `./osm2mgo flags`
All flags: ### Flags:
* `-osmfile` (required) OSM file * `-osmfile string` Path to OSM file (PBF format only) (default "./RU.osm.pbf")
* `-initial` (default:false) Is initial import (uses insert, not upsert) * `-dbconnection string` Mongo database name (default "mongodb://localhost:27017")
* `-indexes` (default:false) Create indexes (needs only first time) * `-dbname string` Mongo database name (default "map")
* `-dbconnection` (default:"mongodb://localhost:27017") Mongo database name * `-initial` Is initial import?
* `-dbname` (default:"map") Mongo database name * `-indexes` Create indexes
* `-layers` (default:"nodes,ways,relations") Layers to import * `-layers string` Layers to import (default "nodes,ways,relations")
* `-block` (default:1000) Block size to bulk write * `-concurrency int` Workers count (default 32)
* `-concurrency` (default:32) Concurrency read and write * `-block int` Block size to bulk write (default 1000)
## Example ## Example

View file

@ -2,7 +2,6 @@ package main
import ( import (
"context" "context"
"log"
"go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options" "go.mongodb.org/mongo-driver/mongo/options"
@ -11,58 +10,9 @@ import (
func createIndexes(db *mongo.Database) error { func createIndexes(db *mongo.Database) error {
opts := options.CreateIndexes().SetMaxTime(1000) opts := options.CreateIndexes().SetMaxTime(1000)
nodes := db.Collection("nodes") nodes := db.Collection("objects")
log.Println("creating indexes for nodes") _, err := nodes.Indexes().CreateMany(context.Background(), []mongo.IndexModel{
created, err := nodes.Indexes().CreateMany(context.Background(), []mongo.IndexModel{
{ {
Keys: bsonx.Doc{{"osm_id", bsonx.Int32(-1)}},
Options: (options.Index()).SetBackground(true).SetSparse(true).SetUnique(false),
}, {
Keys: bsonx.Doc{{"osm_id", bsonx.Int32(-1)}, {"version", bsonx.Int32(-1)}},
Options: (options.Index()).SetBackground(true).SetSparse(true).SetUnique(false),
}, {
Keys: bsonx.Doc{{"tags", bsonx.Int32(-1)}},
Options: (options.Index()).SetBackground(true).SetSparse(true),
},
}, opts)
if err != nil {
return err
}
log.Println(created)
log.Println("creating geoindexes for nodes")
if err := geoIndex(nodes, "location"); err != nil {
return err
}
log.Println("creating indexes for ways")
ways := db.Collection("ways")
created, err = ways.Indexes().CreateMany(context.Background(), []mongo.IndexModel{
{
Keys: bsonx.Doc{{"osm_id", bsonx.Int32(-1)}},
Options: (options.Index()).SetBackground(true).SetSparse(true).SetUnique(false),
}, {
Keys: bsonx.Doc{{"osm_id", bsonx.Int32(-1)}, {"version", bsonx.Int32(-1)}},
Options: (options.Index()).SetBackground(true).SetSparse(true).SetUnique(false),
}, {
Keys: bsonx.Doc{{"tags", bsonx.Int32(-1)}},
Options: (options.Index()).SetBackground(true).SetSparse(true),
},
}, opts)
if err != nil {
return err
}
log.Println(created)
relations := db.Collection("relations")
log.Println("creating geoindexes for relations")
created, err = relations.Indexes().CreateMany(context.Background(), []mongo.IndexModel{
{
Keys: bsonx.Doc{{"osm_id", bsonx.Int32(-1)}},
Options: (options.Index()).SetBackground(true).SetSparse(true).SetUnique(false),
}, {
Keys: bsonx.Doc{{"osm_id", bsonx.Int32(-1)}, {"version", bsonx.Int32(-1)}},
Options: (options.Index()).SetBackground(true).SetSparse(true).SetUnique(false),
}, {
Keys: bsonx.Doc{{"tags", bsonx.Int32(-1)}}, Keys: bsonx.Doc{{"tags", bsonx.Int32(-1)}},
Options: (options.Index()).SetBackground(true).SetSparse(true), Options: (options.Index()).SetBackground(true).SetSparse(true),
}, },
@ -74,11 +24,9 @@ func createIndexes(db *mongo.Database) error {
if err != nil { if err != nil {
return err return err
} }
log.Println(created) if err := geoIndex(nodes, "location"); err != nil {
if err := geoIndex(relations, "members.coords"); err != nil {
return err return err
} }
log.Println("indexes created")
return nil return nil
} }

19
main.go
View file

@ -16,12 +16,12 @@ import (
func main() { func main() {
dbconnection := flag.String("dbconnection", "mongodb://localhost:27017", "Mongo database name") dbconnection := flag.String("dbconnection", "mongodb://localhost:27017", "Mongo database name")
dbname := flag.String("dbname", "map", "Mongo database name") dbname := flag.String("dbname", "map", "Mongo database name")
osmfile := flag.String("osmfile", "", "OSM file") osmfile := flag.String("osmfile", "./RU.osm.pbf", "Path to OSM file (PBF format only)")
initial := flag.Bool("initial", false, "Is initial import") initial := flag.Bool("initial", false, "Is initial import?")
indexes := flag.Bool("indexes", false, "Create indexes") indexes := flag.Bool("indexes", false, "Create indexes")
layersString := flag.String("layers", "nodes,ways,relations", "Layers to import") layersString := flag.String("layers", "nodes,ways,relations", "Layers to import")
blockSize := flag.Int("block", 1000, "Block size to bulk write") blockSize := flag.Int("block", 1000, "Block size to bulk write")
concurrency := flag.Int("concurrency", 32, "Concurrency read and write") concurrency := flag.Int("concurrency", 32, "Workers count")
flag.Parse() flag.Parse()
layers := strings.Split(*layersString, ",") layers := strings.Split(*layersString, ",")
r := rutina.New() r := rutina.New()
@ -34,26 +34,25 @@ func main() {
db := client.Database(*dbname) db := client.Database(*dbname)
if *indexes { if *indexes {
log.Println("Creating indexes...")
if err := createIndexes(db); err != nil { if err := createIndexes(db); err != nil {
log.Fatal(err) log.Fatal(err)
} }
log.Println("Indexes created") log.Println("Done!")
} }
log.Printf("Started import file %s to db %s", *osmfile, *dbname) log.Printf("Started import file %s to db %s (%d workers)", *osmfile, *dbname, *concurrency)
nodesCh := make(chan Node, 1) insertCh := make(chan Object, 1)
waysCh := make(chan Way, 1)
relationsCh := make(chan Relation, 1)
for i := 0; i < *concurrency; i++ { for i := 0; i < *concurrency; i++ {
worker := i worker := i
r.Go(func(ctx context.Context) error { r.Go(func(ctx context.Context) error {
return write(ctx, db, nodesCh, waysCh, relationsCh, *initial, *blockSize, worker) return write(ctx, db, insertCh, *initial, *blockSize, worker)
}) })
} }
r.Go(func(ctx context.Context) error { r.Go(func(ctx context.Context) error {
return read(ctx, *osmfile, nodesCh, waysCh, relationsCh, *concurrency, layers) return read(ctx, *osmfile, insertCh, *concurrency, layers)
}) })
if err := r.Wait(); err != nil { if err := r.Wait(); err != nil {
log.Fatal(err) log.Fatal(err)

View file

@ -5,7 +5,6 @@ import (
"github.com/paulmach/orb" "github.com/paulmach/orb"
"github.com/paulmach/osm" "github.com/paulmach/osm"
"go.mongodb.org/mongo-driver/bson/primitive"
) )
type Coords struct { type Coords struct {
@ -13,34 +12,27 @@ type Coords struct {
Coordinates []float64 `json:"coordinates" bson:"coordinates"` Coordinates []float64 `json:"coordinates" bson:"coordinates"`
} }
type Node struct { type ItemType string
ID primitive.ObjectID `json:"_id,omitempty" bson:"_id,omitempty"`
OsmID int64 `json:"osm_id" bson:"osm_id"` const (
Visible bool `json:"visible" bson:"visible"` NodeType ItemType = "node"
Version int `json:"version,omitempty" bson:"version,omitempty"` WayType ItemType = "way"
Timestamp time.Time `json:"timestamp" bson:"timestamp"` RelationType ItemType = "relation"
Tags []Tag `json:"tags,omitempty" bson:"tags,omitempty"` )
Location Coords `json:"location" bson:"location"`
type ID struct {
ID int64 `json:"id" bson:"id"`
Type ItemType `json:"type" bson:"type"`
Version int `json:"version" bson:"version"`
} }
type Way struct { type Object struct {
ID primitive.ObjectID `json:"_id,omitempty" bson:"_id,omitempty"` ID ID `json:"_id" bson:"_id"`
OsmID int64 `json:"osm_id" bson:"osm_id"` Timestamp time.Time `json:"timestamp" bson:"timestamp"`
Visible bool `json:"visible" bson:"visible"` Tags []Tag `json:"tags" bson:"tags"`
Version int `json:"version" bson:"version"` Location Coords `json:"location,omitempty" bson:"location,omitempty"`
Timestamp time.Time `json:"timestamp" bson:"timestamp"` Nodes []int64 `json:"nodes,omitempty" bson:"nodes,omitempty"`
Nodes []int64 `json:"nodes" bson:"nodes"` Members []Member `json:"members,omitempty" bson:"members,omitempty"`
Tags []Tag `json:"tags" bson:"tags"`
}
type Relation struct {
ID primitive.ObjectID `json:"_id,omitempty" bson:"_id,omitempty"`
OsmID int64 `json:"osm_id" bson:"osm_id"`
Visible bool `json:"visible" bson:"visible"`
Version int `json:"version" bson:"version"`
Timestamp time.Time `json:"timestamp" bson:"timestamp"`
Members []Member `json:"members" bson:"members"`
Tags []Tag `json:"tags" bson:"tags"`
} }
type Member struct { type Member struct {
@ -48,12 +40,11 @@ type Member struct {
Ref int64 `json:"ref" bson:"ref"` Ref int64 `json:"ref" bson:"ref"`
Role string `json:"role" bson:"role"` Role string `json:"role" bson:"role"`
Version int Location *Coords `json:"location" bson:"location"`
Location *Coords `json:"location,omitempty" bson:"location,omitempty"`
// Orientation is the direction of the way around a ring of a multipolygon. // Orientation is the direction of the way around a ring of a multipolygon.
// Only valid for multipolygon or boundary relations. // Only valid for multipolygon or boundary relations.
Orientation orb.Orientation `json:"orienation,omitempty" bson:"orienation,omitempty"` Orientation orb.Orientation `json:"orienation" bson:"orienation"`
} }
type Tag struct { type Tag struct {

View file

@ -9,7 +9,7 @@ import (
"github.com/paulmach/osm/osmpbf" "github.com/paulmach/osm/osmpbf"
) )
func read(ctx context.Context, file string, nodesCh chan Node, waysCh chan Way, relationsCh chan Relation, concurrency int, layers []string) error { func read(ctx context.Context, file string, insertCh chan Object, concurrency int, layers []string) error {
f, err := os.Open(file) f, err := os.Open(file)
if err != nil { if err != nil {
return err return err
@ -35,7 +35,7 @@ func read(ctx context.Context, file string, nodesCh chan Node, waysCh chan Way,
o := scanner.Object() o := scanner.Object()
switch o := o.(type) { switch o := o.(type) {
case *osm.Way: case *osm.Way:
if !layersToImport["ways"] { if !layersToImport["ways"] || !o.Visible {
continue continue
} }
nodes := make([]int64, 0, len(o.Nodes)) nodes := make([]int64, 0, len(o.Nodes))
@ -43,35 +43,31 @@ func read(ctx context.Context, file string, nodesCh chan Node, waysCh chan Way,
nodes = append(nodes, int64(v.ID)) nodes = append(nodes, int64(v.ID))
} }
w := Way{ w := Object{
OsmID: int64(o.ID), ID: ID{ID: int64(o.ID), Type: WayType, Version: o.Version},
Tags: convertTags(o.Tags), Tags: convertTags(o.Tags),
Nodes: nodes,
Timestamp: o.Timestamp, Timestamp: o.Timestamp,
Version: o.Version, Nodes: nodes,
Visible: o.Visible,
} }
waysCh <- w insertCh <- w
case *osm.Node: case *osm.Node:
if !layersToImport["nodes"] { if !layersToImport["nodes"] || !o.Visible {
continue continue
} }
n := Node{ w := Object{
OsmID: int64(o.ID), ID: ID{ID: int64(o.ID), Type: NodeType, Version: o.Version},
Tags: convertTags(o.Tags),
Timestamp: o.Timestamp,
Location: Coords{ Location: Coords{
Type: "Point", Type: "Point",
Coordinates: []float64{ Coordinates: []float64{
o.Lon, o.Lon,
o.Lat, o.Lat,
}}, }},
Tags: convertTags(o.Tags),
Version: o.Version,
Timestamp: o.Timestamp,
Visible: o.Visible,
} }
nodesCh <- n insertCh <- w
case *osm.Relation: case *osm.Relation:
if !layersToImport["relations"] { if !layersToImport["relations"] || !o.Visible {
continue continue
} }
members := make([]Member, 0, len(o.Members)) members := make([]Member, 0, len(o.Members))
@ -87,22 +83,19 @@ func read(ctx context.Context, file string, nodesCh chan Node, waysCh chan Way,
} }
members = append(members, Member{ members = append(members, Member{
Type: v.Type, Type: v.Type,
Version: v.Version,
Orientation: v.Orientation, Orientation: v.Orientation,
Ref: v.Ref, Ref: v.Ref,
Role: v.Role, Role: v.Role,
Location: location, Location: location,
}) })
} }
r := Relation{ w := Object{
OsmID: int64(o.ID), ID: ID{ID: int64(o.ID), Type: RelationType, Version: o.Version},
Tags: convertTags(o.Tags), Tags: convertTags(o.Tags),
Version: o.Version,
Timestamp: o.Timestamp, Timestamp: o.Timestamp,
Visible: o.Visible,
Members: members, Members: members,
} }
relationsCh <- r insertCh <- w
} }
} }
log.Println("Read done") log.Println("Read done")

View file

@ -9,93 +9,42 @@ import (
"go.mongodb.org/mongo-driver/mongo/options" "go.mongodb.org/mongo-driver/mongo/options"
) )
func write(ctx context.Context, db *mongo.Database, nodesCh chan Node, waysCh chan Way, relationsCh chan Relation, initial bool, blockSize int, worker int) error { func write(ctx context.Context, db *mongo.Database, insertCh chan Object, initial bool, blockSize int, worker int) error {
nodes := db.Collection("nodes") nodes := db.Collection("items")
ways := db.Collection("ways")
relations := db.Collection("relations")
opts := (new(options.BulkWriteOptions)).SetOrdered(false) opts := (new(options.BulkWriteOptions)).SetOrdered(false)
nodesBuffer := make([]mongo.WriteModel, 0, blockSize) buf := make([]mongo.WriteModel, 0, blockSize)
waysBuffer := make([]mongo.WriteModel, 0, blockSize) ic := 0
relationsBuffer := make([]mongo.WriteModel, 0, blockSize)
nc := 0
wc := 0
rc := 0
for { for {
select { select {
case w := <-waysCh: case w := <-insertCh:
if initial { if initial {
um := mongo.NewInsertOneModel() um := mongo.NewInsertOneModel()
um.SetDocument(w) um.SetDocument(w)
waysBuffer = append(waysBuffer, um) buf = append(buf, um)
} else { } else {
um := mongo.NewUpdateOneModel() um := mongo.NewUpdateOneModel()
um.SetUpsert(true) um.SetUpsert(true)
um.SetUpdate(w) um.SetUpdate(w)
um.SetFilter(bson.M{"osm_id": w.OsmID}) um.SetFilter(bson.M{"osm_id": w.ID})
waysBuffer = append(waysBuffer, um) buf = append(buf, um)
}
case n := <-nodesCh:
if initial {
um := mongo.NewInsertOneModel()
um.SetDocument(n)
nodesBuffer = append(nodesBuffer, um)
} else {
um := mongo.NewUpdateOneModel()
um.SetUpsert(true)
um.SetUpdate(n)
um.SetFilter(bson.M{"osm_id": n.OsmID})
nodesBuffer = append(nodesBuffer, um)
}
case r := <-relationsCh:
if initial {
um := mongo.NewInsertOneModel()
um.SetDocument(r)
relationsBuffer = append(relationsBuffer, um)
} else {
um := mongo.NewUpdateOneModel()
um.SetUpsert(true)
um.SetUpdate(r)
um.SetFilter(bson.M{"osm_id": r.OsmID})
relationsBuffer = append(relationsBuffer, um)
} }
case <-ctx.Done(): case <-ctx.Done():
log.Printf("[%d] saving last info in buffers...", worker) if len(buf) > 0 {
if _, err := nodes.BulkWrite(context.Background(), nodesBuffer, opts); err != nil { log.Printf("Worker: %d\tSaving last info in buffers...", worker)
return err if _, err := nodes.BulkWrite(context.Background(), buf, opts); err != nil {
return err
}
} }
if _, err := ways.BulkWrite(context.Background(), waysBuffer, opts); err != nil { log.Printf("Worker: %d\tDone", worker)
return err
}
if _, err := relations.BulkWrite(context.Background(), relationsBuffer, opts); err != nil {
return err
}
log.Printf("[%d] Done", worker)
return nil return nil
} }
if len(nodesBuffer) == blockSize { if len(buf) == blockSize {
nc++ ic++
log.Printf("[%d] nodes %d ways %d relations %d", worker, nc, wc, rc) log.Printf("Worker: %d\tWriting block %d (%d objects)", worker, ic, ic*blockSize)
if _, err := nodes.BulkWrite(context.Background(), nodesBuffer, opts); err != nil { if _, err := nodes.BulkWrite(context.Background(), buf, opts); err != nil {
return err return err
} }
nodesBuffer = make([]mongo.WriteModel, 0) buf = make([]mongo.WriteModel, 0)
}
if len(waysBuffer) == blockSize {
wc++
log.Printf("[%d] nodes %d ways %d relations %d", worker, nc, wc, rc)
if _, err := ways.BulkWrite(context.Background(), waysBuffer, opts); err != nil {
return err
}
waysBuffer = make([]mongo.WriteModel, 0)
}
if len(relationsBuffer) == blockSize {
rc++
log.Printf("[%d] nodes %d ways %d relations %d", worker, nc, wc, rc)
if _, err := relations.BulkWrite(context.Background(), relationsBuffer, opts); err != nil {
return err
}
relationsBuffer = make([]mongo.WriteModel, 0)
} }
} }
} }