Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 603c8e7

Browse files
committedMay 14, 2025
GH2701: Fuseki Mod to list and abort running executions.
1 parent 3be1c43 commit 603c8e7

File tree

23 files changed

+2023
-7
lines changed

23 files changed

+2023
-7
lines changed
 
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.jena.sparql.exec.tracker;
20+
21+
import org.apache.jena.sparql.core.DatasetGraph;
22+
import org.apache.jena.sparql.core.DatasetGraphWrapper;
23+
import org.apache.jena.sparql.core.DatasetGraphWrapperView;
24+
import org.apache.jena.sparql.util.Context;
25+
26+
public class DatasetGraphWithExecTracker
27+
extends DatasetGraphWrapper
28+
implements DatasetGraphWrapperView
29+
{
30+
public static DatasetGraph wrap(DatasetGraph dsg) {
31+
DatasetGraph result;
32+
if (dsg instanceof DatasetGraphWithExecTracker) {
33+
result = dsg;
34+
} else {
35+
// Put an exec tracker into the dataset's context.
36+
Context context = dsg.getContext();
37+
ExecTracker.ensureTracker(context);
38+
result = new DatasetGraphWithExecTracker(dsg);
39+
}
40+
return result;
41+
}
42+
43+
protected DatasetGraphWithExecTracker(DatasetGraph dsg) {
44+
super(dsg);
45+
}
46+
}
Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.jena.sparql.exec.tracker;
20+
21+
import java.time.Duration;
22+
import java.time.Instant;
23+
import java.util.Collections;
24+
import java.util.IdentityHashMap;
25+
import java.util.Iterator;
26+
import java.util.Map.Entry;
27+
import java.util.Objects;
28+
import java.util.Set;
29+
import java.util.concurrent.ConcurrentNavigableMap;
30+
import java.util.concurrent.ConcurrentSkipListMap;
31+
import java.util.concurrent.atomic.AtomicLong;
32+
33+
import org.apache.jena.sparql.SystemARQ;
34+
import org.apache.jena.sparql.util.Context;
35+
import org.apache.jena.sparql.util.Symbol;
36+
import org.slf4j.Logger;
37+
import org.slf4j.LoggerFactory;
38+
39+
public class ExecTracker {
40+
private static final Logger logger = LoggerFactory.getLogger(ExecTracker.class);
41+
42+
public record StartRecord(long requestId, Instant timestamp, Object requestObject, Runnable abortAction) {}
43+
44+
public record CompletionRecord(StartRecord start, Instant timestamp, Throwable throwable) {
45+
public Duration duration() {
46+
return Duration.between(start.timestamp, timestamp);
47+
}
48+
49+
public boolean isSuccess() {
50+
return throwable == null;
51+
}
52+
}
53+
54+
private Set<ExecTrackerListener> eventListeners = Collections.synchronizedSet(Collections.newSetFromMap(new IdentityHashMap<>()));
55+
56+
protected AtomicLong nextId = new AtomicLong();
57+
protected ConcurrentNavigableMap<Long, StartRecord> idToStartRecord = new ConcurrentSkipListMap<>();
58+
protected int maxHistorySize = 1000;
59+
protected ConcurrentNavigableMap<Long, CompletionRecord> history = new ConcurrentSkipListMap<>();
60+
61+
public ConcurrentNavigableMap<Long, StartRecord> getActiveTasks() {
62+
return idToStartRecord;
63+
}
64+
65+
public ConcurrentNavigableMap<Long, CompletionRecord> getHistory() {
66+
return history;
67+
}
68+
69+
public void setMaxHistorySize(int maxHistorySize) {
70+
this.maxHistorySize = maxHistorySize;
71+
}
72+
73+
public long put(Object requestObject, Runnable abortAction) {
74+
long result = nextId.getAndIncrement();
75+
StartRecord record = new StartRecord(result, Instant.now(), requestObject, abortAction);
76+
idToStartRecord.put(result, record);
77+
broadcastStartEvent(record);
78+
return result;
79+
}
80+
81+
protected void trimHistory() {
82+
if (history.size() >= maxHistorySize) {
83+
Iterator<Entry<Long, CompletionRecord>> it = history.entrySet().iterator();
84+
while (history.size() >= maxHistorySize && it.hasNext()) {
85+
it.next();
86+
it.remove();
87+
}
88+
}
89+
}
90+
91+
public CompletionRecord remove(long id, Throwable t) {
92+
StartRecord startRecord = idToStartRecord.remove(id);
93+
CompletionRecord result = null;
94+
if (startRecord != null) {
95+
trimHistory();
96+
Instant now = Instant.now();
97+
result = new CompletionRecord(startRecord, now, t);
98+
// long requestId = startRecord.requestId();
99+
// history.put(now, result);
100+
history.put(id, result);
101+
broadcastCompletionEvent(result);
102+
}
103+
return result;
104+
}
105+
106+
protected void broadcastStartEvent(StartRecord startRecord) {
107+
for (ExecTrackerListener listener : eventListeners) {
108+
try {
109+
listener.onStart(startRecord);
110+
} catch (Throwable t) {
111+
if (logger.isWarnEnabled()) {
112+
logger.warn("Failure during event handler.", t);
113+
}
114+
}
115+
}
116+
}
117+
118+
protected void broadcastCompletionEvent(CompletionRecord completionRecord) {
119+
for (ExecTrackerListener listener : eventListeners) {
120+
try {
121+
listener.onComplete(completionRecord);
122+
} catch (Throwable t) {
123+
if (logger.isWarnEnabled()) {
124+
logger.warn("Failure during event handler.", t);
125+
}
126+
}
127+
}
128+
}
129+
130+
public Runnable addListener(ExecTrackerListener listener) {
131+
Objects.requireNonNull(listener);
132+
eventListeners.add(listener);
133+
return () -> eventListeners.remove(listener);
134+
}
135+
136+
public Set<ExecTrackerListener> getEventListeners() {
137+
return eventListeners;
138+
}
139+
140+
@Override
141+
public String toString() {
142+
return "Active: " + idToStartRecord.size() + ", History: " + history.size() + "/" + maxHistorySize;
143+
}
144+
145+
// --- ARQ Integration ---
146+
147+
public static final Symbol symTracker = SystemARQ.allocSymbol("execTracker");
148+
149+
public static ExecTracker getTracker(Context context) {
150+
return context.get(symTracker);
151+
}
152+
153+
public static ExecTracker requireTracker(Context context) {
154+
ExecTracker result = getTracker(context);
155+
Objects.requireNonNull("No ExecTracker registered in context");
156+
return result;
157+
}
158+
159+
public static ExecTracker ensureTracker(Context context) {
160+
ExecTracker result = context.computeIfAbsent(symTracker, sym -> new ExecTracker());
161+
return result;
162+
}
163+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.jena.sparql.exec.tracker;
20+
21+
import org.apache.jena.sparql.exec.tracker.ExecTracker.CompletionRecord;
22+
import org.apache.jena.sparql.exec.tracker.ExecTracker.StartRecord;
23+
24+
public interface ExecTrackerListener {
25+
void onStart(StartRecord startRecord);
26+
void onComplete(CompletionRecord endRecord);
27+
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.jena.sparql.exec.tracker;
20+
21+
import org.apache.jena.sparql.engine.QueryEngineRegistry;
22+
import org.apache.jena.sparql.modify.UpdateEngineRegistry;
23+
import org.apache.jena.sys.JenaSubsystemLifecycle;
24+
25+
public class InitExecTracker
26+
implements JenaSubsystemLifecycle
27+
{
28+
@Override
29+
public void start() {
30+
QueryEngineRegistry queryReg = QueryEngineRegistry.get();
31+
init(queryReg);
32+
33+
UpdateEngineRegistry updateReg = UpdateEngineRegistry.get();
34+
init(updateReg);
35+
}
36+
37+
@Override
38+
public void stop() {
39+
}
40+
41+
public static void init(QueryEngineRegistry reg) {
42+
reg.add(new QueryEngineFactoryExecTracker());
43+
}
44+
45+
public static void init(UpdateEngineRegistry reg) {
46+
reg.add(new UpdateEngineFactoryExecTracker());
47+
}
48+
49+
@Override
50+
public int level() {
51+
// Register the 'wrapper engine factories' late
52+
// such that upon execution they are consulted early.
53+
return 1_000_000;
54+
}
55+
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.jena.sparql.exec.tracker;
20+
21+
import org.apache.jena.atlas.io.IndentedWriter;
22+
import org.apache.jena.shared.PrefixMapping;
23+
import org.apache.jena.sparql.algebra.Op;
24+
import org.apache.jena.sparql.engine.Plan;
25+
import org.apache.jena.sparql.engine.QueryIterator;
26+
import org.apache.jena.sparql.serializer.SerializationContext;
27+
28+
public interface PlanWrapper
29+
extends Plan
30+
{
31+
Plan getDelegate();
32+
33+
@Override
34+
default void output(IndentedWriter out, SerializationContext sCxt) {
35+
getDelegate().output(out, sCxt);
36+
}
37+
38+
@Override
39+
default String toString(PrefixMapping pmap) {
40+
return getDelegate().toString(pmap);
41+
}
42+
43+
@Override
44+
default void output(IndentedWriter out) {
45+
getDelegate().output(out);
46+
}
47+
48+
@Override
49+
default void close() {
50+
getDelegate().close();
51+
}
52+
53+
@Override
54+
default Op getOp() {
55+
return getDelegate().getOp();
56+
}
57+
58+
@Override
59+
default QueryIterator iterator() {
60+
return getDelegate().iterator();
61+
}
62+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.jena.sparql.exec.tracker;
20+
21+
import org.apache.jena.sparql.engine.Plan;
22+
23+
public class PlanWrapperBase
24+
implements PlanWrapper
25+
{
26+
protected Plan delegate;
27+
28+
public PlanWrapperBase(Plan delegate) {
29+
super();
30+
this.delegate = delegate;
31+
}
32+
33+
@Override
34+
public Plan getDelegate() {
35+
return delegate;
36+
}
37+
}

0 commit comments

Comments
 (0)
Please sign in to comment.