Florent Guillaume
2018-12-09 22:13:36 UTC
Message:
NXP-26323: avoid some ConcurrentUpdateException by updating rows in deterministic order
Repository: nuxeo
Branch: fix-NXP-26323-avoid-some-concurrent-update-exceptions-9.10
Author: Florent Guillaume <***@nuxeo.com>
Pusher: efge <***@nuxeo.com>
Date: 2018-12-09T22:13:07 UTC
URL: https://github.com/nuxeo/nuxeo/commit/90d54311c14b846c5d67c2a1c68a3382594d4785
JIRA: https://jira.nuxeo.com/browse/NXP-26323
Files:
A nuxeo-common/src/main/java/org/nuxeo/common/utils/BatchUtils.java
A nuxeo-common/src/test/java/org/nuxeo/common/utils/TestBatchUtils.java
M nuxeo-core/nuxeo-core-storage-sql/nuxeo-core-storage-sql/src/main/java/org/nuxeo/ecm/core/storage/sql/RowId.java
M nuxeo-core/nuxeo-core-storage-sql/nuxeo-core-storage-sql/src/main/java/org/nuxeo/ecm/core/storage/sql/RowMapper.java
M nuxeo-core/nuxeo-core-storage-sql/nuxeo-core-storage-sql/src/main/java/org/nuxeo/ecm/core/storage/sql/jdbc/JDBCRowMapper.java
M nuxeo-core/nuxeo-core-test/src/test/java/org/nuxeo/ecm/core/TestSQLRepositoryAPI.java
diff --git a/nuxeo-common/src/main/java/org/nuxeo/common/utils/BatchUtils.java b/nuxeo-common/src/main/java/org/nuxeo/common/utils/BatchUtils.java
new file mode 100644
index 00000000000..f16a4ef1492
--- /dev/null
+++ b/nuxeo-common/src/main/java/org/nuxeo/common/utils/BatchUtils.java
@@ -0,0 +1,71 @@
+/*
+ * (C) Copyright 2018 Nuxeo (http://nuxeo.com/) and others.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * Contributors:
+ * Florent Guillaume
+ */
+package org.nuxeo.common.utils;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.BiPredicate;
+import java.util.function.Function;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+/**
+ * Batching utilities.
+ *
+ * @since 10.10
+ */
+public class BatchUtils {
+
+ private BatchUtils() {
+ // utility class
+ }
+
+ /**
+ * Takes the input {@code values} and batches them together in groups. All values in a given group have the same
+ * derived value. The derived value is computed using the {@code deriver} function. Two derived values are compared
+ * for equality using the {@code comparator}.
+ *
+ * @param <T> the type of the values
+ * @param <U> the type of the derived values
+ * @param values the input values
+ * @param deriver the function to compute a derived value
+ * @param comparator the equality test for derived values
+ * @return a list of pairs with a derived values and the corresponding batch
+ * @since 10.10
+ */
+ public static <T, U> List<Pair<U, List<T>>> groupByDerived(List<T> values, Function<T, U> deriver,
+ BiPredicate<U, U> comparator) {
+ List<Pair<U, List<T>>> result = new ArrayList<>();
+ U previousDerived = null;
+ List<T> batch = null;
+ for (T value : values) {
+ U derived = deriver.apply(value);
+ if (batch == null || !comparator.test(derived, previousDerived)) {
+ // start new batch
+ batch = new ArrayList<>();
+ result.add(Pair.of(derived, batch));
+ }
+ // add to current batch
+ batch.add(value);
+ previousDerived = derived;
+ }
+ return result;
+ }
+
+}
diff --git a/nuxeo-common/src/test/java/org/nuxeo/common/utils/TestBatchUtils.java b/nuxeo-common/src/test/java/org/nuxeo/common/utils/TestBatchUtils.java
new file mode 100644
index 00000000000..8b2f160c431
--- /dev/null
+++ b/nuxeo-common/src/test/java/org/nuxeo/common/utils/TestBatchUtils.java
@@ -0,0 +1,66 @@
+/*
+ * (C) Copyright 2018 Nuxeo (http://nuxeo.com/) and others.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * Contributors:
+ * Florent Guillaume
+ */
+package org.nuxeo.common.utils;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertEquals;
+
+import java.util.List;
+import java.util.function.BiPredicate;
+import java.util.function.Function;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.junit.Test;
+import org.nuxeo.common.utils.BatchUtils;
+
+public class TestBatchUtils {
+
+ @Test
+ public void testGroupByDerived() {
+ Function<String, String> deriver = s -> s.substring(0, 1); // first letter
+ BiPredicate<String, String> comparator = (a, b) -> a.equals(b);
+ List<String> values;
+ List<Pair<String, List<String>>> result;
+
+ values = asList();
+ result = BatchUtils.groupByDerived(values, deriver, comparator);
+ assertEquals("[]", result.toString());
+
+ values = asList("a1");
+ result = BatchUtils.groupByDerived(values, deriver, comparator);
+ assertEquals("[(a,[a1])]", result.toString());
+
+ values = asList("a1", "a2", "a3");
+ result = BatchUtils.groupByDerived(values, deriver, comparator);
+ assertEquals("[(a,[a1, a2, a3])]", result.toString());
+
+ values = asList("a1", "a2", "b3");
+ result = BatchUtils.groupByDerived(values, deriver, comparator);
+ assertEquals("[(a,[a1, a2]), (b,[b3])]", result.toString());
+
+ values = asList("a1", "b2", "a3", "b4");
+ result = BatchUtils.groupByDerived(values, deriver, comparator);
+ assertEquals("[(a,[a1]), (b,[b2]), (a,[a3]), (b,[b4])]", result.toString());
+
+ values = asList("a1", "a2", "b3", "c4", "a5", "a6", "a7", "c8");
+ result = BatchUtils.groupByDerived(values, deriver, comparator);
+ assertEquals("[(a,[a1, a2]), (b,[b3]), (c,[c4]), (a,[a5, a6, a7]), (c,[c8])]", result.toString());
+ }
+
+}
diff --git a/nuxeo-core/nuxeo-core-storage-sql/nuxeo-core-storage-sql/src/main/java/org/nuxeo/ecm/core/storage/sql/RowId.java b/nuxeo-core/nuxeo-core-storage-sql/nuxeo-core-storage-sql/src/main/java/org/nuxeo/ecm/core/storage/sql/RowId.java
index b94189764f4..01d6b922b38 100644
--- a/nuxeo-core/nuxeo-core-storage-sql/nuxeo-core-storage-sql/src/main/java/org/nuxeo/ecm/core/storage/sql/RowId.java
+++ b/nuxeo-core/nuxeo-core-storage-sql/nuxeo-core-storage-sql/src/main/java/org/nuxeo/ecm/core/storage/sql/RowId.java
@@ -25,7 +25,7 @@
* <p>
* This class is sometimes used as a marker for an "absent" row in the database, when mixed with actual {@link Row}s.
*/
-public class RowId implements Serializable {
+public class RowId implements Serializable, Comparable<RowId> {
private static final long serialVersionUID = 1L;
@@ -71,6 +71,21 @@ private boolean equals(RowId other) {
return tableName.equals(other.tableName);
}
+ @Override
+ public int compareTo(RowId other) {
+ int cmp = tableName.compareTo(other.tableName);
+ if (cmp != 0) {
+ return cmp;
+ }
+ if (id instanceof String && other.id instanceof String) {
+ return ((String) id).compareTo((String) other.id);
+ } else if (id instanceof Long && other.id instanceof Long) {
+ return ((Long) id).compareTo((Long) other.id);
+ } else {
+ throw new UnsupportedOperationException("id=" + id + " other.id=" + other.id);
+ }
+ }
+
@Override
public String toString() {
return getClass().getSimpleName() + '(' + tableName + ", " + id + ')';
diff --git a/nuxeo-core/nuxeo-core-storage-sql/nuxeo-core-storage-sql/src/main/java/org/nuxeo/ecm/core/storage/sql/RowMapper.java b/nuxeo-core/nuxeo-core-storage-sql/nuxeo-core-storage-sql/src/main/java/org/nuxeo/ecm/core/storage/sql/RowMapper.java
index 6a49904b9b5..844200e59f6 100644
--- a/nuxeo-core/nuxeo-core-storage-sql/nuxeo-core-storage-sql/src/main/java/org/nuxeo/ecm/core/storage/sql/RowMapper.java
+++ b/nuxeo-core/nuxeo-core-storage-sql/nuxeo-core-storage-sql/src/main/java/org/nuxeo/ecm/core/storage/sql/RowMapper.java
@@ -69,7 +69,7 @@
/**
* A {@link Row} and a list of its keys that have to be updated.
*/
- public static final class RowUpdate implements Serializable {
+ public static final class RowUpdate implements Serializable, Comparable<RowUpdate> {
private static final long serialVersionUID = 1L;
public final Row row;
@@ -124,6 +124,11 @@ private boolean equal(RowUpdate other) {
return other.row.equals(row);
}
+ @Override
+ public int compareTo(RowUpdate other) {
+ return row.compareTo(other.row);
+ }
+
@Override
public String toString() {
String string = getClass().getSimpleName() + '(' + row + ", keys=" + keys + ')';
diff --git a/nuxeo-core/nuxeo-core-storage-sql/nuxeo-core-storage-sql/src/main/java/org/nuxeo/ecm/core/storage/sql/jdbc/JDBCRowMapper.java b/nuxeo-core/nuxeo-core-storage-sql/nuxeo-core-storage-sql/src/main/java/org/nuxeo/ecm/core/storage/sql/jdbc/JDBCRowMapper.java
index 5ba6b315147..3478d90b363 100644
--- a/nuxeo-core/nuxeo-core-storage-sql/nuxeo-core-storage-sql/src/main/java/org/nuxeo/ecm/core/storage/sql/jdbc/JDBCRowMapper.java
+++ b/nuxeo-core/nuxeo-core-storage-sql/nuxeo-core-storage-sql/src/main/java/org/nuxeo/ecm/core/storage/sql/jdbc/JDBCRowMapper.java
@@ -47,7 +47,8 @@
import javax.transaction.xa.XAException;
import javax.transaction.xa.Xid;
-import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.nuxeo.common.utils.BatchUtils;
import org.nuxeo.ecm.core.api.ConcurrentUpdateException;
import org.nuxeo.ecm.core.api.NuxeoException;
import org.nuxeo.ecm.core.api.model.Delta;
@@ -474,19 +475,22 @@ protected void writeCreates(List<Row> creates) {
}
protected void writeUpdates(Set<RowUpdate> updates) {
+ // we want to write in a consistent order to avoid simple deadlocks between two transactions
+ // so we write by sorted tables, then in each table by sorted ids
// reorganize by table
Map<String, List<RowUpdate>> tableRows = new HashMap<String, List<RowUpdate>>();
for (RowUpdate rowu : updates) {
- List<RowUpdate> rows = tableRows.get(rowu.row.tableName);
- if (rows == null) {
- tableRows.put(rowu.row.tableName, rows = new LinkedList<RowUpdate>());
- }
- rows.add(rowu);
+ tableRows.computeIfAbsent(rowu.row.tableName, k -> new ArrayList<>()).add(rowu);
}
+ List<String> tables = new ArrayList<>();
+ tables.addAll(tableRows.keySet());
+ // sort tables by name
+ Collections.sort(tables);
// updates on each table
- for (Entry<String, List<RowUpdate>> en : tableRows.entrySet()) {
- String tableName = en.getKey();
- List<RowUpdate> rows = en.getValue();
+ for (String tableName : tables) {
+ List<RowUpdate> rows = tableRows.get(tableName);
+ // sort rows by id
+ Collections.sort(rows);
if (model.isCollectionFragment(tableName)) {
updateCollectionRows(tableName, rows);
} else {
@@ -589,20 +593,15 @@ protected void updateSimpleRows(String tableName, List<RowUpdate> rows) {
return;
}
- // reorganize by identical queries to allow batching
- Map<String, SQLInfoSelect> sqlToInfo = new HashMap<>();
- Map<String, List<RowUpdate>> sqlRowUpdates = new HashMap<>();
- for (RowUpdate rowu : rows) {
- SQLInfoSelect update = sqlInfo.getUpdateById(tableName, rowu);
- String sql = update.sql;
- sqlToInfo.put(sql, update);
- sqlRowUpdates.computeIfAbsent(sql, k -> new ArrayList<RowUpdate>()).add(rowu);
- }
+ // we want to allow batching, BUT we also want to keep the order of rows to avoid some deadlocks
+ // so we batch together successive row updates that use the same SQL
+ List<Pair<SQLInfoSelect, List<RowUpdate>>> batchedPairs = BatchUtils.groupByDerived(rows,
+ rowu -> sqlInfo.getUpdateById(tableName, rowu), (a, b) -> a.sql.equals(b.sql));
- for (Entry<String, List<RowUpdate>> en : sqlRowUpdates.entrySet()) {
- String sql = en.getKey();
- List<RowUpdate> rowUpdates = en.getValue();
- SQLInfoSelect update = sqlToInfo.get(sql);
+ // write by batch
+ for (Pair<SQLInfoSelect, List<RowUpdate>> pair : batchedPairs) {
+ SQLInfoSelect update = pair.getLeft();
+ List<RowUpdate> rowUpdates = pair.getRight();
boolean changeTokenEnabled = model.getRepositoryDescriptor().isChangeTokenEnabled();
boolean batched = supportsBatchUpdates && rowUpdates.size() > 1
&& (dialect.supportsBatchUpdateCount() || !changeTokenEnabled);
diff --git a/nuxeo-core/nuxeo-core-test/src/test/java/org/nuxeo/ecm/core/TestSQLRepositoryAPI.java b/nuxeo-core/nuxeo-core-test/src/test/java/org/nuxeo/ecm/core/TestSQLRepositoryAPI.java
index 576e7c0cdfe..2d86dcdb623 100644
--- a/nuxeo-core/nuxeo-core-test/src/test/java/org/nuxeo/ecm/core/TestSQLRepositoryAPI.java
+++ b/nuxeo-core/nuxeo-core-test/src/test/java/org/nuxeo/ecm/core/TestSQLRepositoryAPI.java
@@ -44,8 +44,11 @@
import java.util.Map;
import java.util.Random;
import java.util.Set;
+import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
import javax.inject.Inject;
@@ -4954,4 +4957,96 @@ public void testOptimisticLockingWithParallelChange() throws Exception {
}
}
+ protected static class SavingJob implements Runnable {
+
+ protected final List<DocumentRef> docRefs;
+
+ protected final CyclicBarrier barrier;
+
+ protected Exception exc;
+
+ public SavingJob(List<DocumentModel> docs, CyclicBarrier barrier) {
+ docRefs = docs.stream().map(DocumentModel::getRef).collect(Collectors.toList());
+ this.barrier = barrier;
+ }
+
+ @Override
+ public void run() {
+ try {
+ barrier.await(5, TimeUnit.SECONDS);
+ TransactionHelper.runInTransaction(() -> {
+ try (CoreSession session = CoreInstance.openCoreSession(null)) {
+ for (DocumentRef docRef : docRefs) {
+ DocumentModel doc = session.getDocument(docRef);
+ doc.setPropertyValue("dc:title", "foo" + System.nanoTime());
+ session.saveDocument(doc);
+ }
+ barrier.await(5, TimeUnit.SECONDS);
+ session.save();
+ } catch (InterruptedException | BrokenBarrierException | TimeoutException e) {
+ exc = e;
+ throw new RuntimeException(e);
+ }
+ });
+ } catch (Exception e) {
+ if (exc == null) {
+ exc = new Exception(docRefs.toString(), e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Test that two concurrent save of sets of documents that have commonalities don't produce a deadlock because of
+ * the order of the writes.
+ * <p>
+ * The test saves in parallel random subsets of the same document list, in a random order.
+ */
+ @Test
+ public void testConcurrentSavesNoDeadlock() throws Exception {
+ final int DOCS = 10;
+ final int CONCUR = 2;
+ final int TRIES = 100;
+ List<DocumentModel> docs = new ArrayList<>(DOCS);
+ for (int i = 0; i < DOCS; i++) {
+ DocumentModel doc = session.createDocumentModel("/", "doc" + i, "File");
+ doc.setPropertyValue("dc:title", "doc" + i);
+ doc = session.createDocument(doc);
+ docs.add(doc);
+ }
+ for (int tries = 0; tries < TRIES; tries++) {
+ waitForAsyncCompletion();
+
+ CyclicBarrier barrier = new CyclicBarrier(CONCUR);
+ List<SavingJob> jobs = new ArrayList<>(CONCUR);
+ List<Thread> threads = new ArrayList<>(CONCUR);
+ for (int i = 0; i < CONCUR; i++) {
+ List<DocumentModel> list = new ArrayList<>(docs);
+ // randomize list
+ Collections.shuffle(list);
+ // truncate to a random number of elements (at least 2)
+ int s = random.nextInt(DOCS - 1) + 2;
+ list = list.subList(0, s);
+ SavingJob job = new SavingJob(list, barrier);
+ jobs.add(job);
+ Thread thread = new Thread(job);
+ threads.add(thread);
+ thread.start();
+ }
+ for (int i = 0; i < CONCUR; i++) {
+ threads.get(i).join();
+ }
+ Exception err = new Exception("");
+ for (int i = 0; i < CONCUR; i++) {
+ Exception e = jobs.get(i).exc;
+ if (e != null) {
+ err.addSuppressed(e);
+ }
+ }
+ if (err.getSuppressed().length != 0) {
+ throw err;
+ }
+ }
+ }
+
}
NXP-26323: avoid some ConcurrentUpdateException by updating rows in deterministic order
Repository: nuxeo
Branch: fix-NXP-26323-avoid-some-concurrent-update-exceptions-9.10
Author: Florent Guillaume <***@nuxeo.com>
Pusher: efge <***@nuxeo.com>
Date: 2018-12-09T22:13:07 UTC
URL: https://github.com/nuxeo/nuxeo/commit/90d54311c14b846c5d67c2a1c68a3382594d4785
JIRA: https://jira.nuxeo.com/browse/NXP-26323
Files:
A nuxeo-common/src/main/java/org/nuxeo/common/utils/BatchUtils.java
A nuxeo-common/src/test/java/org/nuxeo/common/utils/TestBatchUtils.java
M nuxeo-core/nuxeo-core-storage-sql/nuxeo-core-storage-sql/src/main/java/org/nuxeo/ecm/core/storage/sql/RowId.java
M nuxeo-core/nuxeo-core-storage-sql/nuxeo-core-storage-sql/src/main/java/org/nuxeo/ecm/core/storage/sql/RowMapper.java
M nuxeo-core/nuxeo-core-storage-sql/nuxeo-core-storage-sql/src/main/java/org/nuxeo/ecm/core/storage/sql/jdbc/JDBCRowMapper.java
M nuxeo-core/nuxeo-core-test/src/test/java/org/nuxeo/ecm/core/TestSQLRepositoryAPI.java
diff --git a/nuxeo-common/src/main/java/org/nuxeo/common/utils/BatchUtils.java b/nuxeo-common/src/main/java/org/nuxeo/common/utils/BatchUtils.java
new file mode 100644
index 00000000000..f16a4ef1492
--- /dev/null
+++ b/nuxeo-common/src/main/java/org/nuxeo/common/utils/BatchUtils.java
@@ -0,0 +1,71 @@
+/*
+ * (C) Copyright 2018 Nuxeo (http://nuxeo.com/) and others.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * Contributors:
+ * Florent Guillaume
+ */
+package org.nuxeo.common.utils;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.BiPredicate;
+import java.util.function.Function;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+/**
+ * Batching utilities.
+ *
+ * @since 10.10
+ */
+public class BatchUtils {
+
+ private BatchUtils() {
+ // utility class
+ }
+
+ /**
+ * Takes the input {@code values} and batches them together in groups. All values in a given group have the same
+ * derived value. The derived value is computed using the {@code deriver} function. Two derived values are compared
+ * for equality using the {@code comparator}.
+ *
+ * @param <T> the type of the values
+ * @param <U> the type of the derived values
+ * @param values the input values
+ * @param deriver the function to compute a derived value
+ * @param comparator the equality test for derived values
+ * @return a list of pairs with a derived values and the corresponding batch
+ * @since 10.10
+ */
+ public static <T, U> List<Pair<U, List<T>>> groupByDerived(List<T> values, Function<T, U> deriver,
+ BiPredicate<U, U> comparator) {
+ List<Pair<U, List<T>>> result = new ArrayList<>();
+ U previousDerived = null;
+ List<T> batch = null;
+ for (T value : values) {
+ U derived = deriver.apply(value);
+ if (batch == null || !comparator.test(derived, previousDerived)) {
+ // start new batch
+ batch = new ArrayList<>();
+ result.add(Pair.of(derived, batch));
+ }
+ // add to current batch
+ batch.add(value);
+ previousDerived = derived;
+ }
+ return result;
+ }
+
+}
diff --git a/nuxeo-common/src/test/java/org/nuxeo/common/utils/TestBatchUtils.java b/nuxeo-common/src/test/java/org/nuxeo/common/utils/TestBatchUtils.java
new file mode 100644
index 00000000000..8b2f160c431
--- /dev/null
+++ b/nuxeo-common/src/test/java/org/nuxeo/common/utils/TestBatchUtils.java
@@ -0,0 +1,66 @@
+/*
+ * (C) Copyright 2018 Nuxeo (http://nuxeo.com/) and others.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * Contributors:
+ * Florent Guillaume
+ */
+package org.nuxeo.common.utils;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertEquals;
+
+import java.util.List;
+import java.util.function.BiPredicate;
+import java.util.function.Function;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.junit.Test;
+import org.nuxeo.common.utils.BatchUtils;
+
+public class TestBatchUtils {
+
+ @Test
+ public void testGroupByDerived() {
+ Function<String, String> deriver = s -> s.substring(0, 1); // first letter
+ BiPredicate<String, String> comparator = (a, b) -> a.equals(b);
+ List<String> values;
+ List<Pair<String, List<String>>> result;
+
+ values = asList();
+ result = BatchUtils.groupByDerived(values, deriver, comparator);
+ assertEquals("[]", result.toString());
+
+ values = asList("a1");
+ result = BatchUtils.groupByDerived(values, deriver, comparator);
+ assertEquals("[(a,[a1])]", result.toString());
+
+ values = asList("a1", "a2", "a3");
+ result = BatchUtils.groupByDerived(values, deriver, comparator);
+ assertEquals("[(a,[a1, a2, a3])]", result.toString());
+
+ values = asList("a1", "a2", "b3");
+ result = BatchUtils.groupByDerived(values, deriver, comparator);
+ assertEquals("[(a,[a1, a2]), (b,[b3])]", result.toString());
+
+ values = asList("a1", "b2", "a3", "b4");
+ result = BatchUtils.groupByDerived(values, deriver, comparator);
+ assertEquals("[(a,[a1]), (b,[b2]), (a,[a3]), (b,[b4])]", result.toString());
+
+ values = asList("a1", "a2", "b3", "c4", "a5", "a6", "a7", "c8");
+ result = BatchUtils.groupByDerived(values, deriver, comparator);
+ assertEquals("[(a,[a1, a2]), (b,[b3]), (c,[c4]), (a,[a5, a6, a7]), (c,[c8])]", result.toString());
+ }
+
+}
diff --git a/nuxeo-core/nuxeo-core-storage-sql/nuxeo-core-storage-sql/src/main/java/org/nuxeo/ecm/core/storage/sql/RowId.java b/nuxeo-core/nuxeo-core-storage-sql/nuxeo-core-storage-sql/src/main/java/org/nuxeo/ecm/core/storage/sql/RowId.java
index b94189764f4..01d6b922b38 100644
--- a/nuxeo-core/nuxeo-core-storage-sql/nuxeo-core-storage-sql/src/main/java/org/nuxeo/ecm/core/storage/sql/RowId.java
+++ b/nuxeo-core/nuxeo-core-storage-sql/nuxeo-core-storage-sql/src/main/java/org/nuxeo/ecm/core/storage/sql/RowId.java
@@ -25,7 +25,7 @@
* <p>
* This class is sometimes used as a marker for an "absent" row in the database, when mixed with actual {@link Row}s.
*/
-public class RowId implements Serializable {
+public class RowId implements Serializable, Comparable<RowId> {
private static final long serialVersionUID = 1L;
@@ -71,6 +71,21 @@ private boolean equals(RowId other) {
return tableName.equals(other.tableName);
}
+ @Override
+ public int compareTo(RowId other) {
+ int cmp = tableName.compareTo(other.tableName);
+ if (cmp != 0) {
+ return cmp;
+ }
+ if (id instanceof String && other.id instanceof String) {
+ return ((String) id).compareTo((String) other.id);
+ } else if (id instanceof Long && other.id instanceof Long) {
+ return ((Long) id).compareTo((Long) other.id);
+ } else {
+ throw new UnsupportedOperationException("id=" + id + " other.id=" + other.id);
+ }
+ }
+
@Override
public String toString() {
return getClass().getSimpleName() + '(' + tableName + ", " + id + ')';
diff --git a/nuxeo-core/nuxeo-core-storage-sql/nuxeo-core-storage-sql/src/main/java/org/nuxeo/ecm/core/storage/sql/RowMapper.java b/nuxeo-core/nuxeo-core-storage-sql/nuxeo-core-storage-sql/src/main/java/org/nuxeo/ecm/core/storage/sql/RowMapper.java
index 6a49904b9b5..844200e59f6 100644
--- a/nuxeo-core/nuxeo-core-storage-sql/nuxeo-core-storage-sql/src/main/java/org/nuxeo/ecm/core/storage/sql/RowMapper.java
+++ b/nuxeo-core/nuxeo-core-storage-sql/nuxeo-core-storage-sql/src/main/java/org/nuxeo/ecm/core/storage/sql/RowMapper.java
@@ -69,7 +69,7 @@
/**
* A {@link Row} and a list of its keys that have to be updated.
*/
- public static final class RowUpdate implements Serializable {
+ public static final class RowUpdate implements Serializable, Comparable<RowUpdate> {
private static final long serialVersionUID = 1L;
public final Row row;
@@ -124,6 +124,11 @@ private boolean equal(RowUpdate other) {
return other.row.equals(row);
}
+ @Override
+ public int compareTo(RowUpdate other) {
+ return row.compareTo(other.row);
+ }
+
@Override
public String toString() {
String string = getClass().getSimpleName() + '(' + row + ", keys=" + keys + ')';
diff --git a/nuxeo-core/nuxeo-core-storage-sql/nuxeo-core-storage-sql/src/main/java/org/nuxeo/ecm/core/storage/sql/jdbc/JDBCRowMapper.java b/nuxeo-core/nuxeo-core-storage-sql/nuxeo-core-storage-sql/src/main/java/org/nuxeo/ecm/core/storage/sql/jdbc/JDBCRowMapper.java
index 5ba6b315147..3478d90b363 100644
--- a/nuxeo-core/nuxeo-core-storage-sql/nuxeo-core-storage-sql/src/main/java/org/nuxeo/ecm/core/storage/sql/jdbc/JDBCRowMapper.java
+++ b/nuxeo-core/nuxeo-core-storage-sql/nuxeo-core-storage-sql/src/main/java/org/nuxeo/ecm/core/storage/sql/jdbc/JDBCRowMapper.java
@@ -47,7 +47,8 @@
import javax.transaction.xa.XAException;
import javax.transaction.xa.Xid;
-import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.nuxeo.common.utils.BatchUtils;
import org.nuxeo.ecm.core.api.ConcurrentUpdateException;
import org.nuxeo.ecm.core.api.NuxeoException;
import org.nuxeo.ecm.core.api.model.Delta;
@@ -474,19 +475,22 @@ protected void writeCreates(List<Row> creates) {
}
protected void writeUpdates(Set<RowUpdate> updates) {
+ // we want to write in a consistent order to avoid simple deadlocks between two transactions
+ // so we write by sorted tables, then in each table by sorted ids
// reorganize by table
Map<String, List<RowUpdate>> tableRows = new HashMap<String, List<RowUpdate>>();
for (RowUpdate rowu : updates) {
- List<RowUpdate> rows = tableRows.get(rowu.row.tableName);
- if (rows == null) {
- tableRows.put(rowu.row.tableName, rows = new LinkedList<RowUpdate>());
- }
- rows.add(rowu);
+ tableRows.computeIfAbsent(rowu.row.tableName, k -> new ArrayList<>()).add(rowu);
}
+ List<String> tables = new ArrayList<>();
+ tables.addAll(tableRows.keySet());
+ // sort tables by name
+ Collections.sort(tables);
// updates on each table
- for (Entry<String, List<RowUpdate>> en : tableRows.entrySet()) {
- String tableName = en.getKey();
- List<RowUpdate> rows = en.getValue();
+ for (String tableName : tables) {
+ List<RowUpdate> rows = tableRows.get(tableName);
+ // sort rows by id
+ Collections.sort(rows);
if (model.isCollectionFragment(tableName)) {
updateCollectionRows(tableName, rows);
} else {
@@ -589,20 +593,15 @@ protected void updateSimpleRows(String tableName, List<RowUpdate> rows) {
return;
}
- // reorganize by identical queries to allow batching
- Map<String, SQLInfoSelect> sqlToInfo = new HashMap<>();
- Map<String, List<RowUpdate>> sqlRowUpdates = new HashMap<>();
- for (RowUpdate rowu : rows) {
- SQLInfoSelect update = sqlInfo.getUpdateById(tableName, rowu);
- String sql = update.sql;
- sqlToInfo.put(sql, update);
- sqlRowUpdates.computeIfAbsent(sql, k -> new ArrayList<RowUpdate>()).add(rowu);
- }
+ // we want to allow batching, BUT we also want to keep the order of rows to avoid some deadlocks
+ // so we batch together successive row updates that use the same SQL
+ List<Pair<SQLInfoSelect, List<RowUpdate>>> batchedPairs = BatchUtils.groupByDerived(rows,
+ rowu -> sqlInfo.getUpdateById(tableName, rowu), (a, b) -> a.sql.equals(b.sql));
- for (Entry<String, List<RowUpdate>> en : sqlRowUpdates.entrySet()) {
- String sql = en.getKey();
- List<RowUpdate> rowUpdates = en.getValue();
- SQLInfoSelect update = sqlToInfo.get(sql);
+ // write by batch
+ for (Pair<SQLInfoSelect, List<RowUpdate>> pair : batchedPairs) {
+ SQLInfoSelect update = pair.getLeft();
+ List<RowUpdate> rowUpdates = pair.getRight();
boolean changeTokenEnabled = model.getRepositoryDescriptor().isChangeTokenEnabled();
boolean batched = supportsBatchUpdates && rowUpdates.size() > 1
&& (dialect.supportsBatchUpdateCount() || !changeTokenEnabled);
diff --git a/nuxeo-core/nuxeo-core-test/src/test/java/org/nuxeo/ecm/core/TestSQLRepositoryAPI.java b/nuxeo-core/nuxeo-core-test/src/test/java/org/nuxeo/ecm/core/TestSQLRepositoryAPI.java
index 576e7c0cdfe..2d86dcdb623 100644
--- a/nuxeo-core/nuxeo-core-test/src/test/java/org/nuxeo/ecm/core/TestSQLRepositoryAPI.java
+++ b/nuxeo-core/nuxeo-core-test/src/test/java/org/nuxeo/ecm/core/TestSQLRepositoryAPI.java
@@ -44,8 +44,11 @@
import java.util.Map;
import java.util.Random;
import java.util.Set;
+import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
import javax.inject.Inject;
@@ -4954,4 +4957,96 @@ public void testOptimisticLockingWithParallelChange() throws Exception {
}
}
+ protected static class SavingJob implements Runnable {
+
+ protected final List<DocumentRef> docRefs;
+
+ protected final CyclicBarrier barrier;
+
+ protected Exception exc;
+
+ public SavingJob(List<DocumentModel> docs, CyclicBarrier barrier) {
+ docRefs = docs.stream().map(DocumentModel::getRef).collect(Collectors.toList());
+ this.barrier = barrier;
+ }
+
+ @Override
+ public void run() {
+ try {
+ barrier.await(5, TimeUnit.SECONDS);
+ TransactionHelper.runInTransaction(() -> {
+ try (CoreSession session = CoreInstance.openCoreSession(null)) {
+ for (DocumentRef docRef : docRefs) {
+ DocumentModel doc = session.getDocument(docRef);
+ doc.setPropertyValue("dc:title", "foo" + System.nanoTime());
+ session.saveDocument(doc);
+ }
+ barrier.await(5, TimeUnit.SECONDS);
+ session.save();
+ } catch (InterruptedException | BrokenBarrierException | TimeoutException e) {
+ exc = e;
+ throw new RuntimeException(e);
+ }
+ });
+ } catch (Exception e) {
+ if (exc == null) {
+ exc = new Exception(docRefs.toString(), e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Test that two concurrent save of sets of documents that have commonalities don't produce a deadlock because of
+ * the order of the writes.
+ * <p>
+ * The test saves in parallel random subsets of the same document list, in a random order.
+ */
+ @Test
+ public void testConcurrentSavesNoDeadlock() throws Exception {
+ final int DOCS = 10;
+ final int CONCUR = 2;
+ final int TRIES = 100;
+ List<DocumentModel> docs = new ArrayList<>(DOCS);
+ for (int i = 0; i < DOCS; i++) {
+ DocumentModel doc = session.createDocumentModel("/", "doc" + i, "File");
+ doc.setPropertyValue("dc:title", "doc" + i);
+ doc = session.createDocument(doc);
+ docs.add(doc);
+ }
+ for (int tries = 0; tries < TRIES; tries++) {
+ waitForAsyncCompletion();
+
+ CyclicBarrier barrier = new CyclicBarrier(CONCUR);
+ List<SavingJob> jobs = new ArrayList<>(CONCUR);
+ List<Thread> threads = new ArrayList<>(CONCUR);
+ for (int i = 0; i < CONCUR; i++) {
+ List<DocumentModel> list = new ArrayList<>(docs);
+ // randomize list
+ Collections.shuffle(list);
+ // truncate to a random number of elements (at least 2)
+ int s = random.nextInt(DOCS - 1) + 2;
+ list = list.subList(0, s);
+ SavingJob job = new SavingJob(list, barrier);
+ jobs.add(job);
+ Thread thread = new Thread(job);
+ threads.add(thread);
+ thread.start();
+ }
+ for (int i = 0; i < CONCUR; i++) {
+ threads.get(i).join();
+ }
+ Exception err = new Exception("");
+ for (int i = 0; i < CONCUR; i++) {
+ Exception e = jobs.get(i).exc;
+ if (e != null) {
+ err.addSuppressed(e);
+ }
+ }
+ if (err.getSuppressed().length != 0) {
+ throw err;
+ }
+ }
+ }
+
}
--
You received this message because you are subscribed to the Google Groups "ecm-checkins" group.
To unsubscribe from this group and stop receiving emails from it, send an email to ecm-checkins+***@lists.nuxeo.com.
Visit this group at https://groups.google.com/a/lists.nuxeo.com/group/ecm-checkins/.
You received this message because you are subscribed to the Google Groups "ecm-checkins" group.
To unsubscribe from this group and stop receiving emails from it, send an email to ecm-checkins+***@lists.nuxeo.com.
Visit this group at https://groups.google.com/a/lists.nuxeo.com/group/ecm-checkins/.