Skip to content

Commit 49db4d4

Browse files
committed
Add new MongoDB connector
1 parent 924cd6a commit 49db4d4

8 files changed

Lines changed: 386 additions & 0 deletions

File tree

mongodb-neo4j-connector/.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
*.pyc
2+
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
from pkgutil import extend_path
2+
__path__ = extend_path(__path__, __name__)
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
from pkgutil import extend_path
2+
__path__ = extend_path(__path__, __name__)
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
import re
2+
import logging
3+
4+
from py2neo import GraphError as graph_error, BindError as bind_error
5+
from py2neo.cypher.error import ClientError as client_error
6+
from py2neo.cypher import CypherTransactionError as cypher_transaction_error
7+
from py2neo.cypher.error.statement import InvalidType as invalid_type
8+
from py2neo.cypher import CypherError as cypher_error
9+
from py2neo.cypher.error.request import Invalid as invalid
10+
from py2neo.cypher.error.request import InvalidFormat as invalid_format
11+
from py2neo.cypher.error.schema import ConstraintAlreadyExists as constraint_already_exists
12+
from py2neo.cypher.error.schema import ConstraintVerificationFailure as constraint_verification_failure
13+
from py2neo.cypher.error.schema import ConstraintViolation as constraint_violation
14+
from py2neo.cypher.error.schema import IllegalTokenName as illegal_token_name
15+
from py2neo.cypher.error.schema import IndexAlreadyExists as index_already_exists
16+
from py2neo.cypher.error.schema import IndexBelongsToConstraint as index_belongs_to_constraint
17+
from py2neo.cypher.error.schema import LabelLimitReached as label_limit_reached
18+
from py2neo.cypher.error.schema import NoSuchConstraint as no_such_constraint
19+
from py2neo.cypher.error.schema import NoSuchIndex as no_such_index
20+
from py2neo.cypher.error.statement import ArithmeticError as statement_arithmetic_error
21+
from py2neo.cypher.error.statement import ConstraintViolation as statement_constraint_violation
22+
from py2neo.cypher.error.statement import EntityNotFound as entity_not_found
23+
from py2neo.cypher.error.statement import InvalidArguments as invalid_arguments
24+
from py2neo.cypher.error.statement import InvalidSemantics as invalid_semantics
25+
from py2neo.cypher.error.statement import InvalidSyntax as invalid_syntax
26+
from py2neo.cypher.error.statement import NoSuchLabel as no_such_label
27+
from py2neo.cypher.error.statement import NoSuchProperty as no_such_property
28+
from py2neo.cypher.error.statement import ParameterMissing as parameter_missing
29+
from py2neo.cypher.error.transaction import ConcurrentRequest as concurrent_request
30+
from py2neo.cypher.error.transaction import EventHandlerThrewException as event_handler_threw_exception
31+
from py2neo.cypher.error.transaction import InvalidType as transaction_invalid_type
32+
from py2neo.cypher.error.transaction import UnknownId as unknown_id
33+
from py2neo.cypher import DatabaseError as database_error
34+
from py2neo.cypher.error.schema import ConstraintCreationFailure as constraint_creation_failure
35+
from py2neo.cypher.error.schema import ConstraintDropFailure as constraint_drop_failure
36+
from py2neo.cypher.error.schema import IndexCreationFailure as index_creation_failure
37+
from py2neo.cypher.error.schema import IndexDropFailure as index_drop_failure
38+
from py2neo.cypher.error.schema import NoSuchLabel as schema_no_such_label
39+
from py2neo.cypher.error.schema import NoSuchPropertyKey as schema_no_such_property
40+
from py2neo.cypher.error.schema import NoSuchRelationshipType as no_such_relationship_type
41+
from py2neo.cypher.error.schema import NoSuchSchemaRule as no_such_schema_rule
42+
from py2neo.cypher.error.statement import ExecutionFailure as execution_failure
43+
from py2neo.cypher.error.transaction import CouldNotBegin as could_not_begin
44+
from py2neo.cypher.error.transaction import CouldNotCommit as could_not_commit
45+
from py2neo.cypher.error.transaction import CouldNotRollback as could_not_rollback
46+
from py2neo.cypher.error.transaction import ReleaseLocksFailed as release_locks_failed
47+
from py2neo.cypher import TransientError as transient_error
48+
from py2neo.cypher.error.network import UnknownFailure as unknown_failure
49+
from py2neo.cypher.error.statement import ExternalResourceFailure as external_resource_failure
50+
from py2neo.cypher.error.transaction import AcquireLockTimeout as acquire_lock_timeout
51+
52+
from mongo_connector import errors
53+
from mongo_connector.errors import OperationFailed
54+
55+
56+
LOG = logging.getLogger(__name__)
57+
58+
class Neo4jOperationFailed(OperationFailed):
59+
"""Raised for failed commands on the destination database
60+
"""
61+
# print("An error ocurred. Please check mongo-connector.log file")
62+
63+
class ErrorHandler(object):
64+
def __init__(self):
65+
self.error_hash = {
66+
graph_error: Neo4jOperationFailed,
67+
bind_error: errors.ConnectionFailed,
68+
invalid_type: Neo4jOperationFailed,
69+
cypher_transaction_error: Neo4jOperationFailed,
70+
cypher_error: Neo4jOperationFailed,
71+
client_error: Neo4jOperationFailed,
72+
invalid: Neo4jOperationFailed,
73+
invalid_format: Neo4jOperationFailed,
74+
constraint_already_exists: Neo4jOperationFailed,
75+
constraint_verification_failure: Neo4jOperationFailed,
76+
constraint_violation: Neo4jOperationFailed,
77+
illegal_token_name: Neo4jOperationFailed,
78+
index_already_exists: Neo4jOperationFailed,
79+
index_belongs_to_constraint: Neo4jOperationFailed,
80+
label_limit_reached: Neo4jOperationFailed,
81+
no_such_constraint: Neo4jOperationFailed,
82+
no_such_index: Neo4jOperationFailed,
83+
statement_arithmetic_error: Neo4jOperationFailed,
84+
statement_constraint_violation: Neo4jOperationFailed,
85+
entity_not_found: Neo4jOperationFailed,
86+
invalid_arguments: Neo4jOperationFailed,
87+
invalid_semantics: Neo4jOperationFailed,
88+
invalid_syntax: Neo4jOperationFailed,
89+
no_such_label: Neo4jOperationFailed,
90+
no_such_property: Neo4jOperationFailed,
91+
parameter_missing: Neo4jOperationFailed,
92+
concurrent_request: Neo4jOperationFailed,
93+
event_handler_threw_exception: Neo4jOperationFailed,
94+
transaction_invalid_type: Neo4jOperationFailed,
95+
unknown_id: Neo4jOperationFailed,
96+
database_error: Neo4jOperationFailed,
97+
constraint_creation_failure: Neo4jOperationFailed,
98+
constraint_drop_failure: Neo4jOperationFailed,
99+
index_creation_failure: Neo4jOperationFailed,
100+
index_drop_failure: Neo4jOperationFailed,
101+
schema_no_such_label: Neo4jOperationFailed,
102+
schema_no_such_property: Neo4jOperationFailed,
103+
no_such_relationship_type: Neo4jOperationFailed,
104+
no_such_schema_rule: Neo4jOperationFailed,
105+
execution_failure: Neo4jOperationFailed,
106+
could_not_begin: Neo4jOperationFailed,
107+
could_not_commit: Neo4jOperationFailed,
108+
could_not_rollback: Neo4jOperationFailed,
109+
release_locks_failed: Neo4jOperationFailed,
110+
transient_error: Neo4jOperationFailed,
111+
unknown_failure: Neo4jOperationFailed,
112+
external_resource_failure: Neo4jOperationFailed,
113+
acquire_lock_timeout: Neo4jOperationFailed,
114+
AttributeError: Neo4jOperationFailed,
115+
TypeError: Neo4jOperationFailed,
116+
NameError: Neo4jOperationFailed,
117+
RuntimeError: Neo4jOperationFailed
118+
119+
}
120+
121+
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
"""
2+
Neo4j implementation for the DocManager. Receives documents and
3+
communicates with Neo4j Server using doc2graph cypher api.
4+
"""
5+
import base64
6+
import logging
7+
import os
8+
import os.path as path, sys
9+
10+
import bson.json_util
11+
from bson.json_util import dumps
12+
13+
from mongo_connector.doc_managers.error_handler import ErrorHandler
14+
15+
from py2neo import Graph, authenticate
16+
17+
18+
from mongo_connector import errors
19+
from mongo_connector.compat import u
20+
from mongo_connector.constants import (DEFAULT_COMMIT_INTERVAL,
21+
DEFAULT_MAX_BULK)
22+
from mongo_connector.util import exception_wrapper, retry_until_ok
23+
from mongo_connector.doc_managers.doc_manager_base import DocManagerBase
24+
25+
errors_handler = ErrorHandler()
26+
wrap_exceptions = exception_wrapper(errors_handler.error_hash)
27+
28+
LOG = logging.getLogger(__name__)
29+
30+
class DocManager(DocManagerBase):
31+
"""
32+
Neo4j implementation for the DocManager. Receives documents and
33+
communicates with Neo4j Server using doc2graph cypher api.
34+
"""
35+
36+
def __init__(self, url, auto_commit_interval=DEFAULT_COMMIT_INTERVAL,
37+
unique_key='_id', chunk_size=DEFAULT_MAX_BULK, **kwargs):
38+
39+
self.graph = Graph(url)
40+
self.auto_commit_interval = auto_commit_interval
41+
self.unique_key = unique_key
42+
self.chunk_size = chunk_size
43+
self.kwargs = kwargs.get("clientOptions")
44+
45+
def stop(self):
46+
"""Stop the auto-commit thread."""
47+
self.auto_commit_interval = None
48+
#TODO bulk not implemented
49+
50+
@wrap_exceptions
51+
def upsert(self, doc, namespace, timestamp):
52+
"""Inserts a document into Neo4j."""
53+
doc_id = u(doc.pop("_id"))
54+
statement = "CALL json.upsert({doc_id},{doc})"
55+
params_dict = {"doc_id": doc_id, "doc": dumps(doc)}
56+
tx = self.graph.cypher.begin()
57+
tx.append(statement, params_dict)
58+
tx.commit()
59+
60+
@wrap_exceptions
61+
def bulk_upsert(self, docs, namespace, timestamp):
62+
"""Insert multiple documents into Neo4j."""
63+
"""Maximum chunk size is 1000. Transaction blocks won't have more than 1000 statements."""
64+
tx = self.graph.cypher.begin()
65+
#TODO implement
66+
tx.commit()
67+
68+
@wrap_exceptions
69+
def update(self, document_id, update_spec, namespace, timestamp):
70+
#FIXME implement
71+
tx = self.graph.cypher.begin()
72+
tx.commit()
73+
74+
@wrap_exceptions
75+
def remove(self, document_id, namespace, timestamp):
76+
"""Removes a document from Neo4j."""
77+
doc_id = u(document_id)
78+
statement = "CALL json.delete({doc_id})"
79+
params_dict = {"doc_id": doc_id}
80+
tx = self.graph.cypher.begin()
81+
tx.append(statement, params_dict)
82+
tx.commit()
83+
84+
@wrap_exceptions
85+
def search(self, start_ts, end_ts):
86+
#TODO implement
87+
statement = "MATCH (d:Document) WHERE d._ts>={start_ts} AND d._ts<={end_ts} RETURN d".format(start_ts=start_ts, end_ts=end_ts)
88+
results = self.graph.cypher.execute(statement)
89+
return results
90+
91+
92+
def commit(self):
93+
LOG.error("Commit")
94+
95+
96+
@wrap_exceptions
97+
def get_last_doc(self):
98+
"""Get the most recently modified node from Neo4j.
99+
This method is used to help define a time window within which documents
100+
may be in conflict after a MongoDB rollback.
101+
"""
102+
LOG.error("Commit")
103+
104+
#FIXME remove???
105+
def handle_command(self, doc, namespace, timestamp):
106+
db = namespace.split('.', 1)[0]
107+

mongodb-neo4j-connector/setup.cfg

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
[bdist_wheel]
2+
universal = 1

mongodb-neo4j-connector/setup.py

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
2+
import os
3+
import sys
4+
5+
try:
6+
from setuptools import setup, find_packages
7+
from setuptools.extension import Extension
8+
except ImportError:
9+
from ez_setup import use_setuptools
10+
use_setuptools()
11+
from setuptools import setup
12+
from distutils.core import setup, find_packages
13+
from distutils.extension import Extension
14+
15+
extra_opts = {}
16+
17+
python_2 = sys.version_info < (3,)
18+
19+
try:
20+
with open("README.md", "r") as fd:
21+
extra_opts['long_description'] = fd.read()
22+
except IOError:
23+
pass # Install without README.md
24+
25+
packages = ["mongo_connector", "mongo_connector.doc_managers"]
26+
package_metadata = {
27+
"name": "neo4j-json-doc-manager",
28+
"version": "0.0.1",
29+
"description": "Neo4j Json Doc manager for Mongo Connector",
30+
"long_description": "Neo4j Json Doc Manager is a tool that will import data from Mongodb to a "
31+
"Neo4j graph structure, via Mongo-Connector, using doc2graph way.",
32+
"author": "LARUS",
33+
"author_email": "omar.rampado@larus-ba.it",
34+
"url": "https://github.com/neo4j-contrib/neo4j_doc_manager.git", #FIXME change
35+
"entry_points": {
36+
"console_scripts": [
37+
'mongo-connector = mongo_connector.connector:main',
38+
],
39+
},
40+
"packages": packages,
41+
"install_requires": ['mongo-connector>=2.1','py2neo==2.0.8','requests>=2.5.1'],
42+
"license": "Apache Software License",
43+
"classifiers": [
44+
"Development Status :: 5 - Production/Stable", #FIXME change
45+
"Intended Audience :: Developers",
46+
"License :: OSI Approved :: Apache Software License",
47+
"Operating System :: OS Independent",
48+
"Programming Language :: Python :: 2.7",
49+
"Programming Language :: Python :: 3.3",
50+
"Programming Language :: Python :: 3.4",
51+
"Topic :: Database",
52+
"Topic :: Software Development",
53+
],
54+
}
55+
56+
57+
try:
58+
setup(ext_modules=extensions, **package_metadata)
59+
except:
60+
setup(**package_metadata)
61+

mongodb-neo4j-connector/util.py

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
# Copyright 2013-2014 MongoDB, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""A set of utilities used throughout the mongo-connector
16+
"""
17+
18+
import logging
19+
import sys
20+
import time
21+
22+
from bson.timestamp import Timestamp
23+
from mongo_connector.compat import reraise
24+
25+
LOG = logging.getLogger(__name__)
26+
27+
28+
def exception_wrapper(mapping):
29+
def decorator(f):
30+
def wrapped(*args, **kwargs):
31+
try:
32+
return f(*args, **kwargs)
33+
except:
34+
exc_type, exc_value, exc_tb = sys.exc_info()
35+
new_type = mapping.get(exc_type)
36+
if new_type is None:
37+
raise
38+
reraise(new_type, exc_value, exc_tb)
39+
return wrapped
40+
return decorator
41+
42+
43+
def bson_ts_to_long(timestamp):
44+
"""Convert BSON timestamp into integer.
45+
46+
Conversion rule is based from the specs
47+
(http://bsonspec.org/#/specification).
48+
"""
49+
return ((timestamp.time << 32) + timestamp.inc)
50+
51+
52+
def long_to_bson_ts(val):
53+
"""Convert integer into BSON timestamp.
54+
"""
55+
seconds = val >> 32
56+
increment = val & 0xffffffff
57+
58+
return Timestamp(seconds, increment)
59+
60+
61+
def retry_until_ok(func, *args, **kwargs):
62+
"""Retry code block until it succeeds.
63+
64+
If it does not succeed in 60 attempts, the function re-raises any
65+
error the function raised on its last attempt.
66+
67+
"""
68+
69+
count = 0
70+
while True:
71+
try:
72+
return func(*args, **kwargs)
73+
except Exception:
74+
count += 1
75+
if count > 120:
76+
LOG.exception('Call to %s failed too many times in '
77+
'retry_until_ok', func)
78+
raise
79+
time.sleep(1)
80+
81+
82+
def log_fatal_exceptions(func):
83+
def wrapped(*args, **kwargs):
84+
try:
85+
func(*args, **kwargs)
86+
except Exception:
87+
LOG.exception("Fatal Exception")
88+
raise
89+
return wrapped

0 commit comments

Comments
 (0)