summaryrefslogtreecommitdiff
path: root/zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorTest.java
blob: 689b8eda9e2dba21b20ecad54cc73bf3f2d2fcc2 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.zookeeper.server;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.File;
import java.io.PrintWriter;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.jute.Record;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.SessionExpiredException;
import org.apache.zookeeper.KeeperException.SessionMovedException;
import org.apache.zookeeper.MultiOperationRecord;
import org.apache.zookeeper.Op;
import org.apache.zookeeper.PortAssignment;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooDefs.OpCode;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.proto.CreateRequest;
import org.apache.zookeeper.proto.ReconfigRequest;
import org.apache.zookeeper.proto.RequestHeader;
import org.apache.zookeeper.proto.SetDataRequest;
import org.apache.zookeeper.server.ZooKeeperServer.ChangeRecord;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.apache.zookeeper.server.quorum.Leader;
import org.apache.zookeeper.server.quorum.LeaderBeanTest;
import org.apache.zookeeper.server.quorum.LeaderZooKeeperServer;
import org.apache.zookeeper.server.quorum.QuorumPeer;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
import org.apache.zookeeper.test.ClientBase;
import org.apache.zookeeper.txn.ErrorTxn;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;


public class PrepRequestProcessorTest extends ClientBase {

    private static final int CONNECTION_TIMEOUT = 3000;
    private static String HOSTPORT = "127.0.0.1:" + PortAssignment.unique();
    private CountDownLatch pLatch;

    private ZooKeeperServer zks;
    private ServerCnxnFactory servcnxnf;
    private PrepRequestProcessor processor;
    private Request outcome;

    private boolean isReconfigEnabledPreviously;
    private boolean isStandaloneEnabledPreviously;

    @BeforeEach
    public void setup() throws Exception {
        File tmpDir = ClientBase.createTmpDir();
        ClientBase.setupTestEnv();
        zks = new ZooKeeperServer(tmpDir, tmpDir, 3000);
        SyncRequestProcessor.setSnapCount(100);
        final int PORT = Integer.parseInt(HOSTPORT.split(":")[1]);

        servcnxnf = ServerCnxnFactory.createFactory(PORT, -1);
        servcnxnf.startup(zks);
        assertTrue(ClientBase.waitForServerUp(HOSTPORT, CONNECTION_TIMEOUT), "waiting for server being up ");
        zks.sessionTracker = new MySessionTracker();

        isReconfigEnabledPreviously = QuorumPeerConfig.isReconfigEnabled();
        isStandaloneEnabledPreviously = QuorumPeerConfig.isStandaloneEnabled();
    }

    @AfterEach
    public void teardown() throws Exception {
        if (servcnxnf != null) {
            servcnxnf.shutdown();
        }
        if (zks != null) {
            zks.shutdown();
        }

        // reset the reconfig option
        QuorumPeerConfig.setReconfigEnabled(isReconfigEnabledPreviously);
        QuorumPeerConfig.setStandaloneEnabled(isStandaloneEnabledPreviously);
    }

    @Test
    public void testPRequest() throws Exception {
        pLatch = new CountDownLatch(1);
        processor = new PrepRequestProcessor(zks, new MyRequestProcessor());
        Request foo = new Request(null, 1L, 1, OpCode.create, RequestRecord.fromBytes(new byte[3]), null);
        processor.pRequest(foo);

        assertEquals(new ErrorTxn(KeeperException.Code.MARSHALLINGERROR.intValue()), outcome.getTxn(), "Request should have marshalling error");
        assertTrue(pLatch.await(5, TimeUnit.SECONDS), "request hasn't been processed in chain");
    }

    private Request createRequest(Record record, int opCode) {
        return createRequest(record, opCode, 1L);
    }

    private Request createRequest(Record record, int opCode, long sessionId) {
        return createRequest(record, opCode, sessionId, false);
    }

    private Request createRequest(Record record, int opCode, boolean admin) {
        return createRequest(record, opCode, 1L, admin);
    }

    private Request createRequest(Record record, int opCode, long sessionId, boolean admin) {
        List<Id> ids = Collections.singletonList(admin ? new Id("super", "super user") : Ids.ANYONE_ID_UNSAFE);
        return new Request(null, sessionId, 0, opCode, RequestRecord.fromRecord(record), ids);
    }

    private void process(List<Op> ops) throws Exception {
        pLatch = new CountDownLatch(1);
        processor = new PrepRequestProcessor(zks, new MyRequestProcessor());

        Record record = new MultiOperationRecord(ops);
        Request req = createRequest(record, OpCode.multi, false);

        processor.pRequest(req);
        assertTrue(pLatch.await(5, TimeUnit.SECONDS), "request hasn't been processed in chain");
    }

    /**
     * This test checks that a successful multi will change outstanding record
     * and failed multi shouldn't change outstanding record.
     */
    @Test
    public void testMultiOutstandingChange() throws Exception {
        zks.getZKDatabase().dataTree.createNode("/foo", new byte[0], Ids.OPEN_ACL_UNSAFE, 0, 0, 0, 0);

        assertNull(zks.outstandingChangesForPath.get("/foo"));

        process(Arrays.asList(Op.setData("/foo", new byte[0], -1)));

        ChangeRecord cr = zks.outstandingChangesForPath.get("/foo");
        assertNotNull(cr, "Change record wasn't set");
        assertEquals(1, cr.zxid, "Record zxid wasn't set correctly");

        process(Arrays.asList(Op.delete("/foo", -1)));
        cr = zks.outstandingChangesForPath.get("/foo");
        assertEquals(2, cr.zxid, "Record zxid wasn't set correctly");

        // It should fail and shouldn't change outstanding record.
        process(Arrays.asList(Op.delete("/foo", -1)));
        cr = zks.outstandingChangesForPath.get("/foo");
        // zxid should still be previous result because record's not changed.
        assertEquals(2, cr.zxid, "Record zxid wasn't set correctly");
    }

    @Test
    public void testReconfigWithAnotherOutstandingChange() throws Exception {
        QuorumPeerConfig.setReconfigEnabled(true);
        QuorumPeerConfig.setStandaloneEnabled(false);

        QuorumPeer qp = new QuorumPeer();
        QuorumVerifier quorumVerifierMock = mock(QuorumVerifier.class);
        when(quorumVerifierMock.getAllMembers()).thenReturn(LeaderBeanTest.getMockedPeerViews(qp.getMyId()));

        qp.setQuorumVerifier(quorumVerifierMock, false);
        FileTxnSnapLog snapLog = new FileTxnSnapLog(tmpDir, tmpDir);
        LeaderZooKeeperServer lzks = new LeaderZooKeeperServer(snapLog, qp, new ZKDatabase(snapLog));
        qp.leader = new Leader(qp, lzks);
        lzks.sessionTracker = new MySessionTracker();
        ZooKeeperServer.setDigestEnabled(true);
        processor = new PrepRequestProcessor(lzks, new MyRequestProcessor());

        Record record = new CreateRequest("/foo", "data".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT.toFlag());
        pLatch = new CountDownLatch(1);
        processor.pRequest(createRequest(record, OpCode.create, false));
        assertTrue(pLatch.await(5, TimeUnit.SECONDS), "request hasn't been processed in chain");

        String newMember = "server.0=localhost:" + PortAssignment.unique()  + ":" + PortAssignment.unique() + ":participant";
        record = new ReconfigRequest(null, null, newMember, 0);
        pLatch = new CountDownLatch(1);
        processor.pRequest(createRequest(record, OpCode.reconfig, true));
        assertTrue(pLatch.await(5, TimeUnit.SECONDS), "request hasn't been processed in chain");
        assertEquals(outcome.getHdr().getType(), OpCode.reconfig);   // Verifies that there was no error.
    }

    /**
     * ZOOKEEPER-2052:
     * This test checks that if a multi operation aborted, and during the multi there is side effect
     * that changed outstandingChangesForPath, after aborted the side effect should be removed and
     * everything should be restored correctly.
     */
    @Test
    public void testMultiRollbackNoLastChange() throws Exception {
        zks.getZKDatabase().dataTree.createNode("/foo", new byte[0], Ids.OPEN_ACL_UNSAFE, 0, 0, 0, 0);
        zks.getZKDatabase().dataTree.createNode("/foo/bar", new byte[0], Ids.OPEN_ACL_UNSAFE, 0, 0, 0, 0);

        assertNull(zks.outstandingChangesForPath.get("/foo"));

        // multi record:
        //   set "/foo" => succeed, leave a outstanding change
        //   delete "/foo" => fail, roll back change
        process(Arrays.asList(Op.setData("/foo", new byte[0], -1), Op.delete("/foo", -1)));

        // aborting multi shouldn't leave any record.
        assertNull(zks.outstandingChangesForPath.get("/foo"));
    }

    /**
     * Test ephemerals are deleted when the session is closed with
     * the newly added CloseSessionTxn in ZOOKEEPER-3145.
     */
    @Test
    public void testCloseSessionTxn() throws Exception {
        boolean before = ZooKeeperServer.isCloseSessionTxnEnabled();

        ZooKeeperServer.setCloseSessionTxnEnabled(true);
        try {
            // create a few ephemerals
            long ephemeralOwner = 1;
            DataTree dt = zks.getZKDatabase().dataTree;
            dt.createNode("/foo", new byte[0], Ids.OPEN_ACL_UNSAFE, ephemeralOwner, 0, 0, 0);
            dt.createNode("/bar", new byte[0], Ids.OPEN_ACL_UNSAFE, ephemeralOwner, 0, 0, 0);

            // close session
            RequestHeader header = new RequestHeader();
            header.setType(OpCode.closeSession);

            final FinalRequestProcessor frq = new FinalRequestProcessor(zks);
            final CountDownLatch latch = new CountDownLatch(1);
            processor = new PrepRequestProcessor(zks, new RequestProcessor() {
                @Override
                public void processRequest(Request request) {
                    frq.processRequest(request);
                    latch.countDown();
                }

                @Override
                public void shutdown() {
                    // TODO Auto-generated method stub
                }
            });
            processor.pRequest(createRequest(header, OpCode.closeSession, ephemeralOwner));

            assertTrue(latch.await(3, TimeUnit.SECONDS));

            // assert ephemerals are deleted
            assertEquals(null, dt.getNode("/foo"));
            assertEquals(null, dt.getNode("/bar"));
        } finally {
            ZooKeeperServer.setCloseSessionTxnEnabled(before);
        }
    }

    /**
     * It tests that PrepRequestProcessor will return BadArgument KeeperException
     * if the request path (if it exists) is not valid, e.g. empty string.
     */
    @Test
    public void testInvalidPath() throws Exception {
        pLatch = new CountDownLatch(1);
        processor = new PrepRequestProcessor(zks, new MyRequestProcessor());

        SetDataRequest record = new SetDataRequest("", new byte[0], -1);
        Request req = createRequest(record, OpCode.setData, false);
        processor.pRequest(req);
        pLatch.await();
        assertEquals(outcome.getHdr().getType(), OpCode.error);
        assertEquals(outcome.getException().code(), KeeperException.Code.BADARGUMENTS);
    }

    private class MyRequestProcessor implements RequestProcessor {

        @Override
        public void processRequest(Request request) {
            // getting called by PrepRequestProcessor
            outcome = request;
            pLatch.countDown();
        }
        @Override
        public void shutdown() {
            // TODO Auto-generated method stub

        }

    }

    private class MySessionTracker implements SessionTracker {

        @Override
        public boolean trackSession(long id, int to) {
            // TODO Auto-generated method stub
            return false;
        }
        @Override
        public boolean commitSession(long id, int to) {
            // TODO Auto-generated method stub
            return false;
        }
        @Override
        public void checkSession(long sessionId, Object owner) throws SessionExpiredException, SessionMovedException {
            // TODO Auto-generated method stub
        }
        @Override
        public long createSession(int sessionTimeout) {
            // TODO Auto-generated method stub
            return 0;
        }
        @Override
        public void dumpSessions(PrintWriter pwriter) {
            // TODO Auto-generated method stub

        }
        @Override
        public void removeSession(long sessionId) {
            // TODO Auto-generated method stub

        }
        public int upgradeSession(long sessionId) {
            // TODO Auto-generated method stub
            return 0;
        }
        @Override
        public void setOwner(long id, Object owner) throws SessionExpiredException {
            // TODO Auto-generated method stub

        }
        @Override
        public void shutdown() {
            // TODO Auto-generated method stub

        }
        @Override
        public boolean touchSession(long sessionId, int sessionTimeout) {
            // TODO Auto-generated method stub
            return false;
        }
        @Override
        public void setSessionClosing(long sessionId) {
            // TODO Auto-generated method stub
        }
        @Override
        public boolean isTrackingSession(long sessionId) {
            // TODO Auto-generated method stub
            return false;
        }
        @Override
        public void checkGlobalSession(long sessionId, Object owner) throws SessionExpiredException, SessionMovedException {
            // TODO Auto-generated method stub
        }
        @Override
        public Map<Long, Set<Long>> getSessionExpiryMap() {
            return new HashMap<Long, Set<Long>>();
        }
        @Override
        public long getLocalSessionCount() {
            return 0;
        }

        @Override
        public boolean isLocalSessionsEnabled() {
            return false;
        }

        public Set<Long> globalSessions() {
            return Collections.emptySet();
        }

        public Set<Long> localSessions() {
            return Collections.emptySet();
        }
    }
}