Skip to content

Commit 0f947b6

Browse files
author
Gennadiy Dubina
committed
use direct ByteBuffer to store audio files
1 parent c777a43 commit 0f947b6

6 files changed

Lines changed: 65 additions & 100 deletions

File tree

network/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
<dependency>
2828
<groupId>io.netty</groupId>
2929
<artifactId>netty-all</artifactId>
30-
<version>4.0.34.Final</version>
30+
<version>4.1.9.Final</version>
3131
</dependency>
3232
</dependencies>
3333

pom.xml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@
6767
<module>core</module>
6868
<module>client</module>
6969
<module>controls</module>
70-
<module>docs</module>
70+
<!--<module>docs</module>-->
7171
<module>bootstrap</module>
7272
</modules>
7373

@@ -167,7 +167,7 @@
167167
<module>core</module>
168168
<module>controls</module>
169169
<module>client</module>
170-
<module>docs</module>
170+
<!--<module>docs</module>-->
171171
<module>bootstrap</module>
172172
</modules>
173173
<build>
@@ -197,7 +197,7 @@
197197
</execution>
198198
</executions>
199199
</plugin>
200-
<plugin>
200+
<!--<plugin>
201201
<groupId>org.apache.maven.plugins</groupId>
202202
<artifactId>maven-javadoc-plugin</artifactId>
203203
<version>2.8.1</version>
@@ -210,7 +210,7 @@
210210
</goals>
211211
</execution>
212212
</executions>
213-
</plugin>
213+
</plugin>-->
214214
</plugins>
215215
</build>
216216
</profile>

resources/mediaplayer/pom.xml

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -91,17 +91,19 @@
9191
<artifactId>mbrola</artifactId>
9292
<version>${version.freetts}</version>
9393
</dependency>
94-
<dependency>
95-
<groupId>org.ehcache</groupId>
96-
<artifactId>ehcache</artifactId>
97-
<version>3.0.1</version>
98-
</dependency>
9994
<dependency>
10095
<groupId>org.slf4j</groupId>
10196
<artifactId>slf4j-log4j12</artifactId>
10297
<version>1.5.6</version>
10398
<scope>provided</scope>
10499
</dependency>
100+
<!-- https://mvnrepository.com/artifact/io.netty/netty-buffer -->
101+
<dependency>
102+
<groupId>io.netty</groupId>
103+
<artifactId>netty-buffer</artifactId>
104+
<version>4.1.9.Final</version>
105+
</dependency>
106+
105107
<dependency>
106108
<groupId>commons-io</groupId>
107109
<artifactId>commons-io</artifactId>
Lines changed: 37 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,17 @@
11
package org.restcomm.media.resource.player.audio;
22

3+
import com.google.common.cache.*;
4+
import io.netty.buffer.ByteBuf;
5+
import io.netty.buffer.ByteBufInputStream;
6+
import io.netty.buffer.Unpooled;
37
import org.apache.commons.io.IOUtils;
48
import org.apache.log4j.Logger;
5-
import org.ehcache.Cache;
6-
import org.ehcache.CacheManager;
7-
import org.ehcache.config.builders.CacheConfigurationBuilder;
8-
import org.ehcache.config.builders.CacheManagerBuilder;
9-
import org.ehcache.config.builders.ResourcePoolsBuilder;
10-
import org.ehcache.config.units.MemoryUnit;
119

12-
import java.io.ByteArrayInputStream;
1310
import java.io.IOException;
1411
import java.io.InputStream;
1512
import java.net.URL;
16-
import java.util.concurrent.ConcurrentHashMap;
17-
import java.util.concurrent.locks.Lock;
18-
import java.util.concurrent.locks.ReentrantLock;
13+
import java.util.Map;
14+
import java.util.concurrent.Callable;
1915

2016
/**
2117
* Created by achikin on 5/9/16.
@@ -24,90 +20,46 @@ public class CachedRemoteStreamProvider implements RemoteStreamProvider {
2420

2521
private final static Logger log = Logger.getLogger(CachedRemoteStreamProvider.class);
2622

27-
private CacheManager cacheManager;
28-
29-
private ConcurrentHashMap<String, ByteStreamDownloader> inProgress = new ConcurrentHashMap<>();
23+
private Cache<String, ByteBuf> cache;
3024

3125
public CachedRemoteStreamProvider(int size) {
3226
log.info("Create AudioCache with size: " + size + "Mb");
33-
cacheManager = CacheManagerBuilder.newCacheManagerBuilder()
34-
.withCache("preConfigured",
35-
CacheConfigurationBuilder.newCacheConfigurationBuilder(String.class, byte[].class,
36-
ResourcePoolsBuilder.newResourcePoolsBuilder().offheap(size, MemoryUnit.MB))
37-
.build())
38-
.build(true);
39-
}
40-
41-
private Cache<String, byte[]> getCache() {
42-
return cacheManager.getCache("preConfigured", String.class, byte[].class);
43-
}
44-
45-
public InputStream getStream(URL uri) throws IOException {
46-
String key = uri.toString();
47-
Cache<String, byte[]> cache = getCache();
48-
49-
byte[] stream = cache.get(key);
50-
if (stream == null) {
51-
stream = download(cache, uri);
52-
}
53-
54-
return new ByteArrayInputStream(stream);
55-
}
56-
57-
private byte[] download(Cache<String, byte[]> cache, final URL uri) throws IOException {
58-
String key = uri.toString();
59-
ByteStreamDownloader stream = inProgress.get(key);
60-
if (stream == null) {
61-
stream = new ByteStreamDownloader();
62-
ByteStreamDownloader prev = inProgress.putIfAbsent(key, stream);
63-
if (prev == null) {
64-
//check bytes in cache again too, maybe it's already added
65-
byte[] bytes = cache.get(key);
66-
if (bytes != null) {
67-
return bytes;
27+
cache = CacheBuilder.newBuilder().maximumWeight(size * 1024L * 1024L).weigher(new Weigher<String, ByteBuf>() {
28+
@Override
29+
public int weigh(String s, ByteBuf byteBuf) {
30+
return byteBuf.capacity();
31+
}
32+
}).removalListener(new RemovalListener<String, ByteBuf>() {
33+
@Override
34+
public void onRemoval(RemovalNotification<String, ByteBuf> removalNotification) {
35+
ByteBuf buf = removalNotification.getValue();
36+
if (buf != null) {
37+
buf.release();
6838
}
69-
} else {
70-
stream = prev;
7139
}
72-
}
40+
}).build();
41+
}
42+
43+
public InputStream getStream(final URL uri) throws IOException {
44+
final String key = uri.toString();
7345
try {
74-
byte[] bytes = stream.download(uri);
75-
if (bytes != null) {
76-
cache.putIfAbsent(key, bytes);
77-
} else {
78-
bytes = cache.get(key);
79-
}
80-
if (bytes == null) {
81-
throw new IOException("No data for " + uri);
82-
}
83-
return bytes;
84-
} finally {
85-
inProgress.remove(key);
46+
ByteBuf buf = cache.get(key, new Callable<ByteBuf>() {
47+
@Override
48+
public ByteBuf call() throws Exception {
49+
byte[] bytes = IOUtils.toByteArray(uri.openStream());
50+
return Unpooled.directBuffer(bytes.length).writeBytes(bytes);
51+
}
52+
});
53+
return new ByteBufInputStream(buf.retainedDuplicate(), true);
54+
} catch (Throwable e) {
55+
throw new IOException(e);
8656
}
8757
}
8858

89-
private static class ByteStreamDownloader {
90-
91-
private Lock lock = new ReentrantLock();
92-
93-
volatile boolean downloaded;
94-
95-
public byte[] download(final URL uri) throws IOException {
96-
if (downloaded) {
97-
return null;
98-
}
99-
lock.lock();
100-
try {
101-
//need to check twice
102-
if (downloaded) {
103-
return null;
104-
}
105-
byte[] bytes = IOUtils.toByteArray(uri.openStream());
106-
downloaded = bytes != null;
107-
return bytes;
108-
} finally {
109-
lock.unlock();
110-
}
59+
public void dump() {
60+
log.info("--- Cache dump ---");
61+
for (Map.Entry<String, ByteBuf> e : cache.asMap().entrySet()) {
62+
log.info(e.getKey() + "; " + e.getValue().refCnt());
11163
}
11264
}
11365
}

resources/mediaplayer/src/main/java/org/restcomm/media/resource/player/audio/RemoteStreamProvider.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import java.io.IOException;
33
import java.io.InputStream;
44
import java.net.URL;
5+
import java.util.concurrent.ExecutionException;
56

67
/**
78
* Created by achikin on 5/9/16.

resources/mediaplayer/src/test/java/org/restcomm/media/resource/player/audio/wav/WavTrackCacheTest.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.mockito.stubbing.Answer;
2424
import org.restcomm.media.resource.player.audio.CachedRemoteStreamProvider;
2525
import org.restcomm.media.resource.player.audio.DirectRemoteStreamProvider;
26-
import org.restcomm.media.resource.player.audio.wav.WavTrackImpl;
2726
import org.restcomm.media.spi.format.EncodingName;
2827
import org.restcomm.media.spi.format.Format;
2928

@@ -68,42 +67,50 @@ public void testCache() throws IOException, UnsupportedAudioFileException {
6867
WavTrackImpl track1 = new WavTrackImpl(url1, cache);
6968
assertEquals(expectedFormat.getName(), track1.getFormat().getName());
7069
assertEquals(expectedDuration, track1.getDuration());
70+
track1.close();
7171

7272
WavTrackImpl track2 = new WavTrackImpl(url2, cache);
7373
assertEquals(expectedFormat.getName(), track2.getFormat().getName());
7474
assertEquals(expectedDuration, track2.getDuration());
75+
track2.close();
7576

7677
WavTrackImpl track3 = new WavTrackImpl(url2, cache);
7778
assertEquals(expectedFormat.getName(), track3.getFormat().getName());
7879
assertEquals(expectedDuration, track3.getDuration());
80+
track3.close();
7981

82+
cache.dump();
8083
verify(mockConnection).getInputStream();
8184
}
8285

8386
@Test
8487
public void testCacheOverflow() throws IOException, UnsupportedAudioFileException {
8588
//file size is 61712 bytes
8689
//1Mb cache contains have 15 full files
87-
int cacheSize = 1;
88-
double fileSize = 61712d;
89-
int iteration = (int) Math.floor(cacheSize * 1024d * 1024d / fileSize) - 1;
90-
90+
//int cacheSize = 1;
91+
//double fileSize = 61712d;
92+
//we have 4 segments in guava cache
93+
int iteration = 8;//(int) Math.floor(cacheSize * 1024d * 1024d / fileSize) - 1;
9194
CachedRemoteStreamProvider cache = new CachedRemoteStreamProvider(1);
9295

9396
for (int j = 0; j < 10; j++) {
97+
System.out.println("--- Iteration #: " + (j + 1));
9498
for (int i = 0; i < iteration; i++) {
9599
URL url = new URL(null, "http://test" + i + ".wav", handler);
96100
WavTrackImpl track = new WavTrackImpl(url, cache);
97101
assertEquals(expectedFormat.getName(), track.getFormat().getName());
98102
assertEquals(expectedDuration, track.getDuration());
103+
track.close();
99104
}
105+
cache.dump();
100106
}
101107
verify(mockConnection, Mockito.times(iteration)).getInputStream();
102108
for (int i = iteration; i < 2 * iteration; i++) {
103109
URL url = new URL(null, "http://test" + i + ".wav", handler);
104110
WavTrackImpl track = new WavTrackImpl(url, cache);
105111
assertEquals(expectedFormat.getName(), track.getFormat().getName());
106112
assertEquals(expectedDuration, track.getDuration());
113+
track.close();
107114
}
108115
verify(mockConnection, Mockito.times(2 * iteration)).getInputStream();
109116
}
@@ -118,14 +125,17 @@ public void testNoCache() throws IOException, UnsupportedAudioFileException {
118125
WavTrackImpl track1 = new WavTrackImpl(url1, noCache);
119126
assertEquals(expectedFormat.getName(), track1.getFormat().getName());
120127
assertEquals(expectedDuration, track1.getDuration());
128+
track1.close();
121129

122130
WavTrackImpl track2 = new WavTrackImpl(url2, noCache);
123131
assertEquals(expectedFormat.getName(), track2.getFormat().getName());
124132
assertEquals(expectedDuration, track2.getDuration());
133+
track2.close();
125134

126135
WavTrackImpl track3 = new WavTrackImpl(url2, noCache);
127136
assertEquals(expectedFormat.getName(), track3.getFormat().getName());
128137
assertEquals(expectedDuration, track3.getDuration());
138+
track3.close();
129139

130140
verify(mockConnection, times(3)).getInputStream();
131141
}

0 commit comments

Comments
 (0)