diff --git a/changelog.md b/changelog.md index c717af2..294b34b 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,9 @@ # CHANGELOG +## 4.1.6 +FIXED: +- database timeout in osm buidling + ## 4.1.5 FIXED : - psycopg2 version diff --git a/r2gg/__about__.py b/r2gg/__about__.py index b6bbe4c..d30d34c 100644 --- a/r2gg/__about__.py +++ b/r2gg/__about__.py @@ -34,7 +34,7 @@ __uri_tracker__ = f"{__uri_repository__}issues/" __uri__ = __uri_repository__ -__version__ = "4.1.5" +__version__ = "4.1.6" __version_info__ = tuple( [ int(num) if num.isdigit() else num diff --git a/r2gg/_database.py b/r2gg/_database.py index b2b278e..26020af 100644 --- a/r2gg/_database.py +++ b/r2gg/_database.py @@ -4,7 +4,6 @@ import psycopg2 from psycopg2 import OperationalError, DatabaseError, InterfaceError from psycopg2.extras import DictCursor -import logging TIMEOUT = int(getenv("SQL_STATEMENT_TIMEOUT", 0)) RETRY = int(getenv("SQL_STATEMENT_RETRY_ATTEMPTS", 3)) @@ -134,9 +133,9 @@ def execute_select_fetch_multiple(self, query, batchsize=1, show_duration=False) ) else: cursor.execute(query) - + count = cursor.rowcount - + while True: rows = cursor.fetchmany(batchsize) if not rows: @@ -145,7 +144,11 @@ def execute_select_fetch_multiple(self, query, batchsize=1, show_duration=False) rows = rows.pop() yield rows, count finally: - self._connection.commit() + if self._connection and self._connection.closed == 0: + try: + self._connection.commit() + except Exception: + pass # the method below should be used as a generator function otherwise use execute_update @database_retry_decorator diff --git a/r2gg/_pivot_to_osm.py b/r2gg/_pivot_to_osm.py index 10b9d24..5003ed1 100644 --- a/r2gg/_pivot_to_osm.py +++ b/r2gg/_pivot_to_osm.py @@ -66,59 +66,49 @@ def pivot_to_osm(config, source, db_configs, database: DatabaseManager, logger, attribs = {"version": "0.6", "generator": "r2gg"} with xf.element("osm", attribs): - # Récupération du nombre de nodes - number_of_nodes_query = f"SELECT COUNT(*) as cnt FROM {input_schema}.nodes" - row, _ = database.execute_select_fetch_one(number_of_nodes_query, show_duration=True) - nodesize = row["cnt"] - # Ecriture des nodes - batchsize = 500000 - offset = 0 - logger.info(f"Writing nodes: {nodesize} ways to write") + batchsize = 10000 + nd_query = getQueryByTableAndBoundingBox(f'{input_schema}.nodes', source['bbox']) + generator = database.execute_select_fetch_multiple(nd_query, show_duration=True, batchsize=batchsize) + logger.info(f"Writing nodes") st_nodes = time.time() - while offset < nodesize: - sql_query_nodes = getQueryByTableAndBoundingBox(f'{input_schema}.nodes', source['bbox']) - sql_query_nodes += " LIMIT {} OFFSET {}".format(batchsize, offset) - offset += batchsize - logger.info("Writing nodes") - gen = database.execute_select_fetch_multiple(sql_query_nodes, show_duration=True) - try: - for row, count in gen: + try: + rows, count = next(generator, (None, None)) + while rows: + for row in rows: nodeEl = writeNode(row, extraction_date) xf.write(nodeEl, pretty_print=True) - finally: - gen.close() - logger.info("%s / %s nodes ajoutés" % (offset, nodesize)) + rows, _ = next(generator,(None, None)) + finally: + generator.close() et_nodes = time.time() logger.info("Writing nodes ended. Elapsed time : %s seconds." % (et_nodes - st_nodes)) - # Récupération du nombre de ways - sql_query_edges_count = f"SELECT COUNT(*) as cnt FROM {input_schema}.edges" - row, _ = database.execute_select_fetch_one(sql_query_edges_count, show_duration=True) - edgesize = row["cnt"] - # Ecriture des ways - batchsize = 300000 - offset = 0 - logger.info(f"Writing ways: {edgesize} ways to write") + logger.info(f"Writing ways") + sql_query_edges = getQueryByTableAndBoundingBox( + f'{input_schema}.edges', + source['bbox'], + ['*', f'{input_schema}.inter_nodes(geom) as internodes'] + ) + generator = database.execute_select_fetch_multiple(sql_query_edges, show_duration=True, batchsize=batchsize) st_edges = time.time() - while offset < edgesize: - sql_query_edges = getQueryByTableAndBoundingBox(f'{input_schema}.edges', source['bbox'], ['*', - f'{input_schema}.inter_nodes(geom) as internodes']) - sql_query_edges += " LIMIT {} OFFSET {}".format(batchsize, offset) - offset += batchsize - for row, count in database.execute_select_fetch_multiple(sql_query_edges, show_duration=True): - wayEl = writeWay(row, extraction_date) - for node in row['internodes']: - vertexSequence = vertexSequence + 1 - node['id'] = vertexSequence - nodeEl = writeNode(node, extraction_date) - xf.write(nodeEl, pretty_print=True) - wayEl = writeWayNds(wayEl, row, row['internodes']) - wayEl = writeWayTags(wayEl, row) - xf.write(wayEl, pretty_print=True) - - logger.info("%s / %s ways ajoutés" % (offset, edgesize)) + try: + rows, count = next(generator, (None, None)) + while rows: + for row in rows: + wayEl = writeWay(row, extraction_date) + for node in row['internodes']: + vertexSequence = vertexSequence + 1 + node['id'] = vertexSequence + nodeEl = writeNode(node, extraction_date) + xf.write(nodeEl, pretty_print=True) + wayEl = writeWayNds(wayEl, row, row['internodes']) + wayEl = writeWayTags(wayEl, row) + xf.write(wayEl, pretty_print=True) + rows, _ = next(generator,(None, None)) + finally: + generator.close() et_edges = time.time() logger.info("Writing ways ended. Elapsed time : %s seconds." % (et_edges - st_edges)) diff --git a/r2gg/_pivot_to_pgr.py b/r2gg/_pivot_to_pgr.py index 1a0ba4f..0f4bea1 100644 --- a/r2gg/_pivot_to_pgr.py +++ b/r2gg/_pivot_to_pgr.py @@ -167,7 +167,7 @@ def pivot_to_pgr(source, cost_calculation_file_path, database_work: DatabaseMana database_out.execute_update(create_nodes) logger.info("Populating vertices") - nd_query = f"SELECT id, geom FROM {input_schema}.nodes;" + nd_query = getQueryByTableAndBoundingBox(f'{input_schema}.nodes', source['bbox'], ["id", "geom"]) # Insertion petit à petit -> plus performant # logger.info("SQL: Inserting or updating {} values in out db".format(cursor_in.rowcount)) st_execute = time.time() @@ -270,11 +270,9 @@ def pivot_to_pgr(source, cost_calculation_file_path, database_work: DatabaseMana # Insertion petit à petit -> plus performant # logger.info("SQL: Inserting or updating {} values in out db".format(cursor_in.rowcount)) st_execute = time.time() - percent = 0 try: rows, count = next(generator, (None, None)) while rows: - percent += 1000000 / count # Chaîne permettant l'insertion de valeurs via psycopg values_str = "" # Tuple des valuers à insérer