Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# CHANGELOG

## 4.1.6
FIXED:
- database timeout in osm buidling

## 4.1.5
FIXED :
- psycopg2 version
Expand Down
2 changes: 1 addition & 1 deletion r2gg/__about__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
__uri_tracker__ = f"{__uri_repository__}issues/"
__uri__ = __uri_repository__

__version__ = "4.1.5"
__version__ = "4.1.6"
__version_info__ = tuple(
[
int(num) if num.isdigit() else num
Expand Down
11 changes: 7 additions & 4 deletions r2gg/_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import psycopg2
from psycopg2 import OperationalError, DatabaseError, InterfaceError
from psycopg2.extras import DictCursor
import logging

TIMEOUT = int(getenv("SQL_STATEMENT_TIMEOUT", 0))
RETRY = int(getenv("SQL_STATEMENT_RETRY_ATTEMPTS", 3))
Expand Down Expand Up @@ -134,9 +133,9 @@ def execute_select_fetch_multiple(self, query, batchsize=1, show_duration=False)
)
else:
cursor.execute(query)

count = cursor.rowcount

while True:
rows = cursor.fetchmany(batchsize)
if not rows:
Expand All @@ -145,7 +144,11 @@ def execute_select_fetch_multiple(self, query, batchsize=1, show_duration=False)
rows = rows.pop()
yield rows, count
finally:
self._connection.commit()
if self._connection and self._connection.closed == 0:
try:
self._connection.commit()
except Exception:
pass

# the method below should be used as a generator function otherwise use execute_update
@database_retry_decorator
Expand Down
78 changes: 34 additions & 44 deletions r2gg/_pivot_to_osm.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,59 +66,49 @@ def pivot_to_osm(config, source, db_configs, database: DatabaseManager, logger,
attribs = {"version": "0.6", "generator": "r2gg"}
with xf.element("osm", attribs):

# Récupération du nombre de nodes
number_of_nodes_query = f"SELECT COUNT(*) as cnt FROM {input_schema}.nodes"
row, _ = database.execute_select_fetch_one(number_of_nodes_query, show_duration=True)
nodesize = row["cnt"]

# Ecriture des nodes
batchsize = 500000
offset = 0
logger.info(f"Writing nodes: {nodesize} ways to write")
batchsize = 10000
nd_query = getQueryByTableAndBoundingBox(f'{input_schema}.nodes', source['bbox'])
generator = database.execute_select_fetch_multiple(nd_query, show_duration=True, batchsize=batchsize)
logger.info(f"Writing nodes")
st_nodes = time.time()
while offset < nodesize:
sql_query_nodes = getQueryByTableAndBoundingBox(f'{input_schema}.nodes', source['bbox'])
sql_query_nodes += " LIMIT {} OFFSET {}".format(batchsize, offset)
offset += batchsize
logger.info("Writing nodes")
gen = database.execute_select_fetch_multiple(sql_query_nodes, show_duration=True)
try:
for row, count in gen:
try:
rows, count = next(generator, (None, None))
while rows:
for row in rows:
nodeEl = writeNode(row, extraction_date)
xf.write(nodeEl, pretty_print=True)
finally:
gen.close()
logger.info("%s / %s nodes ajoutés" % (offset, nodesize))
rows, _ = next(generator,(None, None))
finally:
generator.close()
et_nodes = time.time()
logger.info("Writing nodes ended. Elapsed time : %s seconds." % (et_nodes - st_nodes))

# Récupération du nombre de ways
sql_query_edges_count = f"SELECT COUNT(*) as cnt FROM {input_schema}.edges"
row, _ = database.execute_select_fetch_one(sql_query_edges_count, show_duration=True)
edgesize = row["cnt"]

# Ecriture des ways
batchsize = 300000
offset = 0
logger.info(f"Writing ways: {edgesize} ways to write")
logger.info(f"Writing ways")
sql_query_edges = getQueryByTableAndBoundingBox(
f'{input_schema}.edges',
source['bbox'],
['*', f'{input_schema}.inter_nodes(geom) as internodes']
)
generator = database.execute_select_fetch_multiple(sql_query_edges, show_duration=True, batchsize=batchsize)
st_edges = time.time()
while offset < edgesize:
sql_query_edges = getQueryByTableAndBoundingBox(f'{input_schema}.edges', source['bbox'], ['*',
f'{input_schema}.inter_nodes(geom) as internodes'])
sql_query_edges += " LIMIT {} OFFSET {}".format(batchsize, offset)
offset += batchsize
for row, count in database.execute_select_fetch_multiple(sql_query_edges, show_duration=True):
wayEl = writeWay(row, extraction_date)
for node in row['internodes']:
vertexSequence = vertexSequence + 1
node['id'] = vertexSequence
nodeEl = writeNode(node, extraction_date)
xf.write(nodeEl, pretty_print=True)
wayEl = writeWayNds(wayEl, row, row['internodes'])
wayEl = writeWayTags(wayEl, row)
xf.write(wayEl, pretty_print=True)

logger.info("%s / %s ways ajoutés" % (offset, edgesize))
try:
rows, count = next(generator, (None, None))
while rows:
for row in rows:
wayEl = writeWay(row, extraction_date)
for node in row['internodes']:
vertexSequence = vertexSequence + 1
node['id'] = vertexSequence
nodeEl = writeNode(node, extraction_date)
xf.write(nodeEl, pretty_print=True)
wayEl = writeWayNds(wayEl, row, row['internodes'])
wayEl = writeWayTags(wayEl, row)
xf.write(wayEl, pretty_print=True)
rows, _ = next(generator,(None, None))
finally:
generator.close()
et_edges = time.time()
logger.info("Writing ways ended. Elapsed time : %s seconds." % (et_edges - st_edges))

Expand Down
4 changes: 1 addition & 3 deletions r2gg/_pivot_to_pgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ def pivot_to_pgr(source, cost_calculation_file_path, database_work: DatabaseMana
database_out.execute_update(create_nodes)

logger.info("Populating vertices")
nd_query = f"SELECT id, geom FROM {input_schema}.nodes;"
nd_query = getQueryByTableAndBoundingBox(f'{input_schema}.nodes', source['bbox'], ["id", "geom"])
# Insertion petit à petit -> plus performant
# logger.info("SQL: Inserting or updating {} values in out db".format(cursor_in.rowcount))
st_execute = time.time()
Expand Down Expand Up @@ -270,11 +270,9 @@ def pivot_to_pgr(source, cost_calculation_file_path, database_work: DatabaseMana
# Insertion petit à petit -> plus performant
# logger.info("SQL: Inserting or updating {} values in out db".format(cursor_in.rowcount))
st_execute = time.time()
percent = 0
try:
rows, count = next(generator, (None, None))
while rows:
percent += 1000000 / count
# Chaîne permettant l'insertion de valeurs via psycopg
values_str = ""
# Tuple des valuers à insérer
Expand Down
Loading