|
31 | 31 | import java.util.concurrent.ConcurrentHashMap; |
32 | 32 | import java.util.concurrent.ConcurrentMap; |
33 | 33 | import java.util.concurrent.ScheduledExecutorService; |
| 34 | +import java.util.concurrent.ScheduledFuture; |
34 | 35 | import java.util.concurrent.TimeUnit; |
35 | 36 | import java.util.concurrent.atomic.AtomicLong; |
36 | 37 |
|
@@ -132,9 +133,12 @@ public class ComponentRegistry |
132 | 133 |
|
133 | 134 | private final ScheduledExecutorService m_componentActor; |
134 | 135 |
|
| 136 | + private final UpdateChangeCountProperty m_updateChangeCountPropertyTask; |
| 137 | + |
135 | 138 | public ComponentRegistry(final ScrConfiguration scrConfiguration, final ScrLogger logger, final ScheduledExecutorService componentActor ) |
136 | 139 | { |
137 | 140 | m_configuration = scrConfiguration; |
| 141 | + m_updateChangeCountPropertyTask = new UpdateChangeCountProperty(m_configuration.serviceChangecountTimeout()); |
138 | 142 | m_logger = logger; |
139 | 143 | m_componentActor = componentActor; |
140 | 144 | m_componentHoldersByName = new HashMap<>(); |
@@ -716,44 +720,84 @@ public Dictionary<String, Object> getServiceRegistrationProperties() |
716 | 720 |
|
717 | 721 | public void setRegistration(final ServiceRegistration<ServiceComponentRuntime> reg) |
718 | 722 | { |
719 | | - long delay = m_configuration.serviceChangecountTimeout(); |
720 | | - m_componentActor.scheduleWithFixedDelay(new UpdateChangeCountProperty(reg), delay, delay, TimeUnit.MILLISECONDS); |
| 723 | + m_updateChangeCountPropertyTask.setRegistration(reg); |
| 724 | + m_updateChangeCountPropertyTask.schedule(); |
721 | 725 | } |
722 | 726 |
|
723 | 727 | class UpdateChangeCountProperty implements Runnable { |
724 | | - private final ServiceRegistration<ServiceComponentRuntime> registration; |
| 728 | + private volatile ServiceRegistration<ServiceComponentRuntime> registration; |
| 729 | + private final long maxNumberOfNoChanges; |
| 730 | + private final long delay; |
| 731 | + |
| 732 | + // guarded by this |
| 733 | + private int noChangesCount = 0; |
| 734 | + // guarded by this |
| 735 | + private ScheduledFuture<?> scheduledFuture = null; |
725 | 736 |
|
726 | | - public UpdateChangeCountProperty(ServiceRegistration<ServiceComponentRuntime> registration) |
| 737 | + public UpdateChangeCountProperty(long delay) |
727 | 738 | { |
728 | | - this.registration = registration; |
| 739 | + this.delay = delay; |
| 740 | + // calculate the max number of no changes; must be at least 1 to avoid missing events |
| 741 | + maxNumberOfNoChanges = Long.max(10000 / delay, 1); |
| 742 | + } |
| 743 | + |
| 744 | + public void setRegistration(ServiceRegistration<ServiceComponentRuntime> reg) |
| 745 | + { |
| 746 | + this.registration = reg; |
| 747 | + } |
| 748 | + |
| 749 | + public synchronized void schedule() |
| 750 | + { |
| 751 | + // reset noChangesCount to ensure task runs at least once more if it exists |
| 752 | + noChangesCount = 0; |
| 753 | + if (scheduledFuture != null) { |
| 754 | + return; |
| 755 | + } |
| 756 | + scheduledFuture = m_componentActor.scheduleWithFixedDelay(this , delay, delay, TimeUnit.MILLISECONDS); |
729 | 757 | } |
730 | 758 |
|
731 | 759 | @Override |
732 | 760 | public void run() |
733 | 761 | { |
| 762 | + ServiceRegistration<ServiceComponentRuntime> currentReg = registration; |
| 763 | + if (currentReg == null) { |
| 764 | + return; |
| 765 | + } |
734 | 766 | try { |
735 | | - Long registeredChangeCount = (Long) registration.getReference().getProperty(PROP_CHANGECOUNT); |
| 767 | + Long registeredChangeCount = (Long) currentReg.getReference().getProperty(PROP_CHANGECOUNT); |
736 | 768 | if (registeredChangeCount == null || registeredChangeCount.longValue() != changeCount.get()) { |
737 | 769 | try |
738 | 770 | { |
739 | | - registration.setProperties(getServiceRegistrationProperties()); |
| 771 | + currentReg.setProperties(getServiceRegistrationProperties()); |
740 | 772 | } |
741 | 773 | catch ( final IllegalStateException ise) |
742 | 774 | { |
743 | 775 | // we ignore this as this might happen on shutdown |
744 | 776 | } |
| 777 | + } else { |
| 778 | + synchronized (this) { |
| 779 | + noChangesCount++; |
| 780 | + if (noChangesCount > maxNumberOfNoChanges) { |
| 781 | + // haven't had any changes for max number of tries; |
| 782 | + // cancel the scheduled future if it exists. |
| 783 | + if (scheduledFuture != null) { |
| 784 | + scheduledFuture.cancel(false); |
| 785 | + scheduledFuture = null; |
| 786 | + } |
| 787 | + } |
| 788 | + } |
745 | 789 | } |
746 | 790 | } catch (Exception e) { |
747 | 791 | m_logger.log(Level.WARN, |
748 | 792 | "Service changecount update for {0} had a problem", e, |
749 | 793 | registration.getReference()); |
750 | 794 | } |
751 | 795 | } |
752 | | - |
753 | 796 | } |
754 | 797 |
|
755 | 798 | public void updateChangeCount() |
756 | 799 | { |
757 | 800 | this.changeCount.incrementAndGet(); |
| 801 | + m_updateChangeCountPropertyTask.schedule(); |
758 | 802 | } |
759 | 803 | } |
0 commit comments