Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 120 additions & 27 deletions solr/core/src/java/org/apache/solr/update/UpdateLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import com.carrotsearch.hppc.LongHashSet;
import com.carrotsearch.hppc.LongSet;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.context.Scope;
import java.io.Closeable;
import java.io.IOException;
import java.io.UncheckedIOException;
Expand Down Expand Up @@ -88,10 +91,12 @@
import org.apache.solr.update.processor.DistributedUpdateProcessor;
import org.apache.solr.update.processor.UpdateRequestProcessor;
import org.apache.solr.update.processor.UpdateRequestProcessorChain;
import org.apache.solr.update.processor.UpdateRequestProcessorFactory;
import org.apache.solr.util.OrderedExecutor;
import org.apache.solr.util.RefCounted;
import org.apache.solr.util.TestInjection;
import org.apache.solr.util.TimeOut;
import org.apache.solr.util.tracing.TraceUtils;
import org.apache.solr.util.plugin.PluginInfoInitialized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -107,6 +112,8 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
public static String LOG_FILENAME_PATTERN = "%s.%019d";
public static String TLOG_NAME = "tlog";
public static String BUFFER_TLOG_NAME = "buffer.tlog";
private static final String UPDATELOG_REPLAY_SPAN_NAME = "updatelog.replay";
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

honestly just inline these. The constants serve no purpose. I know this is a matter of taste. The practice of constants spreads readability around thus reducing readability.

private static final String UPDATELOG_REPLAY_LOG_SPAN_NAME = "updatelog.replay.log";

private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private boolean debug = log.isDebugEnabled();
Expand Down Expand Up @@ -2129,36 +2136,69 @@ public void run() {
// setting request info will help logging
SolrRequestInfo.setRequestInfo(new SolrRequestInfo(req, rsp));

try {
for (; ; ) {
TransactionLog translog = translogs.pollFirst();
if (translog == null) break;
doReplay(translog);
}
} catch (SolrException e) {
if (e.code() == ErrorCode.SERVICE_UNAVAILABLE.code) {
log.error("Replay failed service unavailable", e);
recoveryInfo.failed = true;
} else {
final int initialLogCount = translogs.size();
int logsReplayed = 0;
long replayedOps = 0;
final int replayErrorsStart = recoveryInfo.errors.get();
final Span replaySpan =
TraceUtils.getGlobalTracer().spanBuilder(UPDATELOG_REPLAY_SPAN_NAME).startSpan();
TraceUtils.ifNotNoop(
replaySpan,
span -> {
span.setAttribute("updatelog.replay.state", state.toString());
span.setAttribute("updatelog.replay.active_log", activeLog);
span.setAttribute("updatelog.replay.in_sorted_order", inSortedOrder);
span.setAttribute("updatelog.replay.logs_total", initialLogCount);
span.setAttribute("updatelog.replay.core", req.getCore().getName());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably remove, assuming redundant with the line below

TraceUtils.setDbInstance(span, req.getCore().getName());
});

try (Scope scope = replaySpan.makeCurrent()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we avoid adding a try-finally when I see one here already?

assert scope != null;
try {
for (; ; ) {
TransactionLog translog = translogs.pollFirst();
if (translog == null) break;
replayedOps += doReplay(translog);
logsReplayed++;
}
} catch (SolrException e) {
if (e.code() == ErrorCode.SERVICE_UNAVAILABLE.code) {
log.error("Replay failed service unavailable", e);
recoveryInfo.failed = true;
} else {
recoveryInfo.errors.incrementAndGet();
log.error("Replay failed due to exception", e);
}
replaySpan.recordException(e);
replaySpan.setStatus(StatusCode.ERROR);
} catch (Exception e) {
recoveryInfo.errors.incrementAndGet();
log.error("Replay failed due to exception", e);
replaySpan.recordException(e);
replaySpan.setStatus(StatusCode.ERROR);
} finally {
// change the state while updates are still blocked to prevent races
state = State.ACTIVE;
if (finishing) {
updateLocks.unblockUpdates();
}

// clean up in case we hit some unexpected exception and didn't get
// to more transaction logs
for (TransactionLog translog : translogs) {
log.error("ERROR: didn't get to recover from tlog {}", translog);
translog.decref();
}
}
} catch (Exception e) {
recoveryInfo.errors.incrementAndGet();
log.error("Replay failed due to exception", e);
} finally {
// change the state while updates are still blocked to prevent races
state = State.ACTIVE;
if (finishing) {
updateLocks.unblockUpdates();
}

// clean up in case we hit some unexpected exception and didn't get
// to more transaction logs
for (TransactionLog translog : translogs) {
log.error("ERROR: didn't get to recover from tlog {}", translog);
translog.decref();
if (replaySpan.isRecording()) {
replaySpan.setAttribute("updatelog.replay.logs_replayed", logsReplayed);
replaySpan.setAttribute("updatelog.replay.ops_replayed", replayedOps);
replaySpan.setAttribute(
"updatelog.replay.errors", recoveryInfo.errors.get() - replayErrorsStart);
}
replaySpan.end();
}

loglog.warn("Log replay finished. recoveryInfo={}", recoveryInfo);
Expand All @@ -2168,8 +2208,24 @@ public void run() {
SolrRequestInfo.clearRequestInfo();
}

public void doReplay(TransactionLog translog) {
try {
public long doReplay(TransactionLog translog) {
long replayedOps = 0L;
final int replayErrorsStart = recoveryInfo.errors.get();
final Span replayLogSpan =
TraceUtils.getGlobalTracer().spanBuilder(UPDATELOG_REPLAY_LOG_SPAN_NAME).startSpan();
TraceUtils.ifNotNoop(
replayLogSpan,
span -> {
if (translog.tlog != null) {
span.setAttribute("updatelog.replay.log_file", translog.tlog.getFileName().toString());
}
span.setAttribute("updatelog.replay.log_size_bytes", translog.getLogSize());
span.setAttribute("updatelog.replay.active_log", activeLog);
span.setAttribute("updatelog.replay.in_sorted_order", inSortedOrder);
});
boolean replayLogSucceeded = false;
try (Scope scope = replayLogSpan.makeCurrent()) {
assert scope != null;
loglog.warn(
"Starting log replay {} active={} starting pos={} inSortedOrder={}",
translog,
Expand All @@ -2192,6 +2248,11 @@ public void doReplay(TransactionLog translog) {

// Use a pool of URPs using a ThreadLocal to have them per-thread. URPs aren't threadsafe.
UpdateRequestProcessorChain processorChain = req.getCore().getUpdateProcessingChain(null);
TraceUtils.ifNotNoop(
replayLogSpan,
span ->
span.setAttribute(
"updatelog.replay.urp_chain", summarizeProcessorChain(processorChain)));
Collection<UpdateRequestProcessor> procPool =
Collections.synchronizedList(new ArrayList<>());
ThreadLocal<UpdateRequestProcessor> procThreadLocal =
Expand Down Expand Up @@ -2287,6 +2348,7 @@ public void doReplay(TransactionLog translog) {
case UpdateLog.ADD:
{
recoveryInfo.adds++;
replayedOps++;
AddUpdateCommand cmd =
convertTlogEntryToAddUpdateCommand(req, entry, oper, version);
cmd.setFlags(UpdateCommand.REPLAY | UpdateCommand.IGNORE_AUTOCOMMIT);
Expand All @@ -2297,6 +2359,7 @@ public void doReplay(TransactionLog translog) {
case UpdateLog.DELETE:
{
recoveryInfo.deletes++;
replayedOps++;
byte[] idBytes = (byte[]) entry.get(2);
DeleteUpdateCommand cmd = new DeleteUpdateCommand(req);
cmd.setIndexedId(new BytesRef(idBytes));
Expand All @@ -2310,6 +2373,7 @@ public void doReplay(TransactionLog translog) {
case UpdateLog.DELETE_BY_QUERY:
{
recoveryInfo.deleteByQuery++;
replayedOps++;
String query = (String) entry.get(2);
DeleteUpdateCommand cmd = new DeleteUpdateCommand(req);
cmd.query = query;
Expand Down Expand Up @@ -2346,10 +2410,12 @@ public void doReplay(TransactionLog translog) {
} catch (ClassCastException cl) {
recoveryInfo.errors.incrementAndGet();
loglog.warn("REPLAY_ERR: Unexpected log entry or corrupt log. Entry={}", o, cl);
replayLogSpan.recordException(cl);
// would be caused by a corrupt transaction log
} catch (Exception ex) {
recoveryInfo.errors.incrementAndGet();
loglog.warn("REPLAY_ERR: Exception replaying log", ex);
replayLogSpan.recordException(ex);
// something wrong with the request?
}
assert TestInjection.injectUpdateLogReplayRandomPause();
Expand Down Expand Up @@ -2389,11 +2455,38 @@ public void doReplay(TransactionLog translog) {
IOUtils.closeQuietly(proc);
}
}
replayLogSucceeded = true;

} finally {
if (tlogReader != null) tlogReader.close();
translog.decref();
final int replayErrors = recoveryInfo.errors.get() - replayErrorsStart;
if (replayLogSpan.isRecording()) {
replayLogSpan.setAttribute("updatelog.replay.log_ops", replayedOps);
replayLogSpan.setAttribute("updatelog.replay.log_errors", replayErrors);
replayLogSpan.setAttribute("updatelog.replay.log_success", replayLogSucceeded);
}
if (!replayLogSucceeded || replayErrors > 0) {
replayLogSpan.setStatus(StatusCode.ERROR);
}
replayLogSpan.end();
}
return replayedOps;
}

private String summarizeProcessorChain(UpdateRequestProcessorChain processorChain) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets put this on UpdateRequestProcessorChain.toString().
BTW processorChain won't be null

if (processorChain == null) {
return "none";
}
List<UpdateRequestProcessorFactory> processors = processorChain.getProcessors();
if (processors == null || processors.isEmpty()) {
return "empty";
}
List<String> names = new ArrayList<>(processors.size());
for (UpdateRequestProcessorFactory processor : processors) {
names.add(processor.getClass().getSimpleName());
}
return String.join(">", names);
}

private void waitForAllUpdatesGetExecuted(AtomicInteger pendingTasks) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.update;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.update.processor.DistributedUpdateProcessor;
import org.apache.solr.update.processor.DistributingUpdateProcessorFactory;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;

public class UpdateLogReplayTracingTest extends SolrTestCaseJ4 {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks.... I would've been sufficiently happy to see a pic of it working in your tracing viewer of choice. We mostly don't test logs; traces are a glorified log in the end, and thus I think testing traces is questionable value trade-off.

private static final String FROM_LEADER = DistributedUpdateProcessor.DistribPhase.FROMLEADER.toString();
private static InMemorySpanExporter spanExporter;
private static OpenTelemetrySdk openTelemetrySdk;
private static UpdateLog ulog;

@BeforeClass
public static void beforeClass() throws Exception {
initCore("solrconfig-tlog.xml", "schema-inplace-updates.xml");

GlobalOpenTelemetry.resetForTest();
spanExporter = InMemorySpanExporter.create();
SdkTracerProvider tracerProvider =
SdkTracerProvider.builder()
.addSpanProcessor(SimpleSpanProcessor.create(spanExporter))
.build();
openTelemetrySdk = OpenTelemetrySdk.builder().setTracerProvider(tracerProvider).build();
GlobalOpenTelemetry.set(openTelemetrySdk);

try (var req = req()) {
UpdateHandler updateHandler = req.getCore().getUpdateHandler();
ulog = updateHandler.getUpdateLog();
assertNotNull("UpdateLog must be enabled for replay tracing tests", ulog);
}
}

@AfterClass
public static void afterClass() {
ulog = null;
if (openTelemetrySdk != null) {
openTelemetrySdk.close();
openTelemetrySdk = null;
}
if (spanExporter != null) {
spanExporter.close();
spanExporter = null;
}
GlobalOpenTelemetry.resetForTest();
}

@Test
public void testReplayEmitsParentAndPerLogSpans() throws Exception {
Span sanitySpan = GlobalOpenTelemetry.getTracer("solr").spanBuilder("sanity.span").startSpan();
sanitySpan.end();
assertTrue("Expected test tracer to record spans", spanExporter.getFinishedSpanItems().size() > 0);

SolrParams replayParams =
params(
DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM,
FROM_LEADER,
"distrib.from",
"http://127.0.0.1:8983/solr/test",
"distrib.inplace.update",
"false");

clearIndex();
ulog.bufferUpdates();
assertNotNull(
updateJ(jsonAdd(sdoc("id", "1", "title_s", "doc-1")), withVersion(replayParams, 1001L)));
assertNotNull(
updateJ(jsonAdd(sdoc("id", "2", "title_s", "doc-2")), withVersion(replayParams, 1002L)));

spanExporter.reset();
Future<UpdateLog.RecoveryInfo> replayFuture = ulog.applyBufferedUpdates();
assertNotNull("Expected buffered replay future", replayFuture);
UpdateLog.RecoveryInfo recoveryInfo = replayFuture.get(30, TimeUnit.SECONDS);
assertTrue("Expected replay to process buffered updates", recoveryInfo.adds > 0);

List<SpanData> spans = new ArrayList<>(spanExporter.getFinishedSpanItems());
SpanData replaySpan = findSpan(spans, "updatelog.replay");
SpanData replayLogSpan = findSpan(spans, "updatelog.replay.log");

assertNotNull("Expected parent replay span. span names=" + spanNames(spans), replaySpan);
assertNotNull("Expected per-log replay span. span names=" + spanNames(spans), replayLogSpan);
assertEquals(replaySpan.getSpanContext().getTraceId(), replayLogSpan.getSpanContext().getTraceId());
assertEquals(replaySpan.getSpanContext().getSpanId(), replayLogSpan.getParentSpanContext().getSpanId());

assertEquals(
h.getCore().getName(),
replaySpan.getAttributes().get(AttributeKey.stringKey("updatelog.replay.core")));
assertEquals(
Long.valueOf(1L),
replaySpan.getAttributes().get(AttributeKey.longKey("updatelog.replay.logs_replayed")));
assertEquals(
Long.valueOf(2L),
replayLogSpan.getAttributes().get(AttributeKey.longKey("updatelog.replay.log_ops")));
}

private SolrParams withVersion(SolrParams base, long version) {
ModifiableSolrParams versioned = new ModifiableSolrParams(base);
versioned.set("_version_", Long.toString(version));
return versioned;
}

private SpanData findSpan(List<SpanData> spans, String spanName) {
for (SpanData span : spans) {
if (spanName.equals(span.getName())) {
return span;
}
}
return null;
}

private String spanNames(List<SpanData> spans) {
List<String> names = new ArrayList<>(spans.size());
for (SpanData span : spans) {
names.add(span.getName());
}
return names.toString();
}
}