11using System ;
2+ using System . Text ;
23using MongoDB . Driver ;
34using MongoDB . Driver . Builders ;
45
@@ -17,84 +18,99 @@ public class ScheduleRepository : IScheduleRepository
1718 {
1819 private readonly IScheduleRepositoryConfiguration configuration ;
1920 private readonly Func < DateTime > getNow ;
20- private readonly Lazy < MongoCollection < Schedule > > lazyCollection ;
21- private readonly Lazy < MongoServer > lazyServer ;
21+ private readonly Lazy < IMongoCollection < Schedule > > lazyCollection ;
2222
2323 public ScheduleRepository ( IScheduleRepositoryConfiguration configuration , Func < DateTime > getNow )
2424 {
2525 this . configuration = configuration ;
2626 this . getNow = getNow ;
27- lazyServer = new Lazy < MongoServer > ( Connect ) ;
28- lazyCollection = new Lazy < MongoCollection < Schedule > > ( CreateAndIndex ) ;
27+ lazyCollection = new Lazy < IMongoCollection < Schedule > > ( CreateAndIndex ) ;
2928 }
3029
31- private MongoServer Server => lazyServer . Value ;
32-
33- private MongoCollection < Schedule > Collection => lazyCollection . Value ;
30+ private IMongoCollection < Schedule > Collection => lazyCollection . Value ;
3431
3532 public void Store ( Schedule schedule )
3633 {
37- Collection . Insert ( schedule ) ;
34+ Collection . InsertOne ( schedule ) ;
3835 }
3936
4037 public void Cancel ( string cancellation )
4138 {
42- Collection . Remove ( Query < Schedule > . EQ ( x => x . CancellationKey , cancellation ) ) ;
39+ Collection . DeleteOne ( x => x . CancellationKey == cancellation ) ;
4340 }
4441
4542 public Schedule GetPending ( )
4643 {
4744 var now = getNow ( ) ;
48- var query = Query . And (
49- Query < Schedule > . EQ ( x => x . State , ScheduleState . Pending ) ,
50- Query < Schedule > . LTE ( x => x . WakeTime , now ) ) ;
51- var update = Update . Combine ( Update < Schedule > . Set ( x => x . State , ScheduleState . Publishing ) ,
52- Update < Schedule > . Set ( x => x . PublishingTime , now ) ) ;
53- var findAndModifyResult = Collection . FindAndModify ( new FindAndModifyArgs
54- {
55- Query = query ,
56- SortBy = SortBy < Schedule > . Ascending ( x => x . WakeTime ) ,
57- Update = update ,
58- VersionReturned = FindAndModifyDocumentVersion . Modified
59- } ) ;
60- return findAndModifyResult . GetModifiedDocumentAs < Schedule > ( ) ;
45+ var filter = Builders < Schedule > . Filter ;
46+ var query = filter . And (
47+ filter . Eq ( x => x . State , ScheduleState . Pending ) ,
48+ filter . Lte ( x => x . WakeTime , now ) ) ;
49+ var update = Builders < Schedule > . Update
50+ . Set ( x => x . State , ScheduleState . Publishing )
51+ . Set ( x => x . PublishingTime , now ) ; ;
52+ var options = new FindOneAndUpdateOptions < Schedule > { Sort = Builders < Schedule > . Sort . Ascending ( x => x . WakeTime ) , ReturnDocument = ReturnDocument . After } ;
53+ var findAndModifyResult = Collection . FindOneAndUpdate ( query , update , options ) ;
54+
55+ return findAndModifyResult ;
6156 }
6257
6358 public void MarkAsPublished ( Guid id )
6459 {
6560 var now = getNow ( ) ;
66- var query = Query . And ( Query < Schedule > . EQ ( x => x . Id , id ) ) ;
67- var update = Update . Combine ( Update < Schedule > . Set ( x => x . State , ScheduleState . Published ) ,
68- Update < Schedule > . Set ( x => x . PublishedTime , now ) ,
69- Update < Schedule > . Unset ( x => x . PublishingTime )
70- ) ;
71- Collection . Update ( query , update ) ;
61+ var update = Builders < Schedule > . Update
62+ . Set ( x => x . State , ScheduleState . Published )
63+ . Set ( x => x . PublishedTime , now )
64+ . Unset ( x => x . PublishingTime ) ;
65+ Collection . UpdateOne ( x => x . Id == id , update ) ;
7266 }
7367
7468 public void HandleTimeout ( )
7569 {
7670 var publishingTimeTimeout = getNow ( ) - configuration . PublishTimeout ;
77- var query = Query . And ( Query < Schedule > . EQ ( x => x . State , ScheduleState . Publishing ) ,
78- Query < Schedule > . LTE ( x => x . PublishingTime , publishingTimeTimeout ) ) ;
79- var update = Update . Combine ( Update < Schedule > . Set ( x => x . State , ScheduleState . Pending ) ,
80- Update < Schedule > . Unset ( x => x . PublishingTime ) ) ;
81- Collection . Update ( query , update , UpdateFlags . Multi ) ;
71+ var filter = Builders < Schedule > . Filter ;
72+ var query = filter . And ( filter . Eq ( x => x . State , ScheduleState . Publishing ) ,
73+ filter . Lte ( x => x . PublishingTime , publishingTimeTimeout ) ) ;
74+ var update = Builders < Schedule > . Update
75+ . Set ( x => x . State , ScheduleState . Pending )
76+ . Unset ( x => x . PublishingTime ) ;
77+ Collection . UpdateMany ( query , update ) ;
8278 }
8379
84- private MongoCollection < Schedule > CreateAndIndex ( )
80+ private IMongoCollection < Schedule > CreateAndIndex ( )
8581 {
86- var collection = Server . GetDatabase ( configuration . DatabaseName )
87- . GetCollection < Schedule > ( configuration . CollectionName ) ;
88- collection . CreateIndex ( IndexKeys < Schedule > . Ascending ( x => x . CancellationKey ) , IndexOptions . SetSparse ( true ) ) ;
89- collection . CreateIndex ( IndexKeys < Schedule > . Ascending ( x => x . State , x => x . WakeTime ) ) ;
90- collection . CreateIndex ( IndexKeys < Schedule > . Ascending ( x => x . PublishedTime ) ,
91- IndexOptions . SetTimeToLive ( configuration . DeleteTimeout ) . SetSparse ( true ) ) ;
82+ var collection = GetICollection < Schedule > ( configuration . ConnectionString , configuration . DatabaseName , configuration . CollectionName ) ;
83+ collection . Indexes . CreateOne ( new CreateIndexModel < Schedule > (
84+ Builders < Schedule > . IndexKeys . Ascending ( x => x . CancellationKey ) ,
85+ new CreateIndexOptions { Sparse = true } ) ) ;
86+ collection . Indexes . CreateOne ( new CreateIndexModel < Schedule > (
87+ Builders < Schedule > . IndexKeys
88+ . Ascending ( x => x . State )
89+ . Ascending ( x => x . WakeTime ) ) ) ;
90+
91+ collection . Indexes . CreateOne ( new CreateIndexModel < Schedule > (
92+ Builders < Schedule > . IndexKeys . Ascending ( x => x . PublishedTime ) ,
93+ new CreateIndexOptions
94+ {
95+ Sparse = true ,
96+ ExpireAfter = configuration . DeleteTimeout
97+ } ) ) ;
98+
9299 return collection ;
93100 }
94101
95- private MongoServer Connect ( )
102+ private static IMongoDatabase CreateDatabase ( MongoUrl connectionString , string databaseName )
103+ {
104+ var settings = MongoClientSettings . FromUrl ( connectionString ) ;
105+ settings . ReadEncoding = new UTF8Encoding ( false , false ) ;
106+ var client = new MongoClient ( settings ) ;
107+ return client . GetDatabase ( databaseName ) ;
108+ }
109+
110+ private static IMongoCollection < TDocument > GetICollection < TDocument > ( string connectionString , string databaseName , string collectionName )
96111 {
97- return new MongoClient ( configuration . ConnectionString ) . GetServer ( ) ;
112+ var database = CreateDatabase ( new MongoUrl ( connectionString ) , databaseName ) ;
113+ return database . GetCollection < TDocument > ( collectionName ) ;
98114 }
99115 }
100116}
0 commit comments