Commit 0ef62ff8 authored by Mohamad Bashar Desoki's avatar Mohamad Bashar Desoki

kafka producer

parents
target/
!.mvn/wrapper/maven-wrapper.jar
!**/src/main/**/target/
!**/src/test/**/target/
### IntelliJ IDEA ###
.idea/modules.xml
.idea/jarRepositories.xml
.idea/compiler.xml
.idea/libraries/
*.iws
*.iml
*.ipr
### Eclipse ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
build/
!**/src/main/**/build/
!**/src/test/**/build/
### VS Code ###
.vscode/
### Mac OS ###
.DS_Store
\ No newline at end of file
<component name="ArtifactManager">
<artifact type="jar" name="java-kafka-producer:jar">
<output-path>$PROJECT_DIR$/out/artifacts/java_kafka_producer_jar</output-path>
<root id="archive" name="java-kafka-producer.jar">
<element id="module-output" name="java-kafka-producer" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/log4j/log4j/1.2.17/log4j-1.2.17.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/slf4j/slf4j-api/1.7.36/slf4j-api-1.7.36.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/lz4/lz4-java/1.8.0/lz4-java-1.8.0.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/com/github/luben/zstd-jni/1.5.5-6/zstd-jni-1.5.5-6.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/apache/kafka/kafka-clients/3.7.0/kafka-clients-3.7.0.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/slf4j/slf4j-log4j12/1.7.6/slf4j-log4j12-1.7.6.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/xerial/snappy/snappy-java/1.1.10.5/snappy-java-1.1.10.5.jar" path-in-jar="/" />
</root>
</artifact>
</component>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Encoding">
<file url="file://$PROJECT_DIR$/src/main/java" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/src/main/resources" charset="UTF-8" />
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ExternalStorageConfigurationManager" enabled="true" />
<component name="MavenProjectsManager">
<option name="originalFiles">
<list>
<option value="$PROJECT_DIR$/pom.xml" />
</list>
</option>
</component>
<component name="ProjectRootManager" version="2" languageLevel="JDK_17" default="true" project-jdk-name="17" project-jdk-type="JavaSDK">
<output url="file://$PROJECT_DIR$/out" />
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ArtifactsWorkspaceSettings">
<artifacts-to-build>
<artifact name="java-kafka-producer:jar" />
</artifacts-to-build>
</component>
<component name="AutoImportSettings">
<option name="autoReloadType" value="SELECTIVE" />
</component>
<component name="ChangeListManager">
<list default="true" id="3ff78736-ab06-4e41-962b-0538603918f0" name="Changes" comment="">
<change afterPath="$PROJECT_DIR$/.gitignore" afterDir="false" />
<change afterPath="$PROJECT_DIR$/.idea/artifacts/java_kafka_producer_jar.xml" afterDir="false" />
<change afterPath="$PROJECT_DIR$/.idea/encodings.xml" afterDir="false" />
<change afterPath="$PROJECT_DIR$/.idea/misc.xml" afterDir="false" />
<change afterPath="$PROJECT_DIR$/.idea/vcs.xml" afterDir="false" />
<change afterPath="$PROJECT_DIR$/.idea/workspace.xml" afterDir="false" />
<change afterPath="$PROJECT_DIR$/pom.xml" afterDir="false" />
<change afterPath="$PROJECT_DIR$/src/main/java/Application.java" afterDir="false" />
<change afterPath="$PROJECT_DIR$/src/main/resources/log4j.properties" afterDir="false" />
</list>
<option name="SHOW_DIALOG" value="false" />
<option name="HIGHLIGHT_CONFLICTS" value="true" />
<option name="HIGHLIGHT_NON_ACTIVE_CHANGELIST" value="false" />
<option name="LAST_RESOLUTION" value="IGNORE" />
</component>
<component name="FileTemplateManagerImpl">
<option name="RECENT_TEMPLATES">
<list>
<option value="Class" />
</list>
</option>
</component>
<component name="Git.Settings">
<option name="RECENT_GIT_ROOT_PATH" value="$PROJECT_DIR$" />
</component>
<component name="MarkdownSettingsMigration">
<option name="stateVersion" value="1" />
</component>
<component name="ProjectId" id="2dUVtBEx9AOQNVg91sIwzumkOBB" />
<component name="ProjectLevelVcsManager" settingsEditedManually="true" />
<component name="ProjectViewState">
<option name="hideEmptyMiddlePackages" value="true" />
<option name="showLibraryContents" value="true" />
</component>
<component name="PropertiesComponent"><![CDATA[{
"keyToString": {
"RunOnceActivity.OpenProjectViewOnStart": "true",
"RunOnceActivity.ShowReadmeOnStart": "true",
"SHARE_PROJECT_CONFIGURATION_FILES": "true",
"project.structure.last.edited": "Artifacts",
"project.structure.proportion": "0.0",
"project.structure.side.proportion": "0.2"
}
}]]></component>
<component name="RunManager">
<configuration name="Application" type="Application" factoryName="Application" temporary="true" nameIsGenerated="true">
<option name="MAIN_CLASS_NAME" value="Application" />
<module name="java-kafka-producer" />
<method v="2">
<option name="Make" enabled="true" />
</method>
</configuration>
<recent_temporary>
<list>
<item itemvalue="Application.Application" />
</list>
</recent_temporary>
</component>
<component name="SpellCheckerSettings" RuntimeDictionaries="0" Folders="0" CustomDictionaries="0" DefaultDictionary="application-level" UseSingleDictionary="true" transferred="true" />
<component name="TaskManager">
<task active="true" id="Default" summary="Default task">
<changelist id="3ff78736-ab06-4e41-962b-0538603918f0" name="Changes" comment="" />
<created>1710062856262</created>
<option name="number" value="Default" />
<option name="presentableId" value="Default" />
<updated>1710062856262</updated>
</task>
<servers />
</component>
<component name="Vcs.Log.Tabs.Properties">
<option name="TAB_STATES">
<map>
<entry key="MAIN">
<value>
<State />
</value>
</entry>
</map>
</option>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>java-kafka-producer</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.7.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.6</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>runtime</scope>
</dependency>
</dependencies>
</project>
\ No newline at end of file
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
/**
* Apache Kafka - Kafka Producer with Java
*/
public class Application {
private static final String TOPIC = "events";
private static final String BOOTSTRAP_SERVERS = "localhost:9092,localhost:9093,localhost:9094";
public static void main(String[] args) {
Producer<Long, String> kafkaProducer = createKafkaProducer(BOOTSTRAP_SERVERS);
try {
produceMessages(10, kafkaProducer);
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
} finally {
kafkaProducer.flush();
kafkaProducer.close();
}
}
public static void produceMessages(int numberOfMessages, Producer<Long, String> kafkaProducer) throws ExecutionException, InterruptedException {
int partition = 1;
for (int i = 0; i < numberOfMessages; i++) {
long key = i;
String value = String.format("event %d", i);
long timeStamp = System.currentTimeMillis();
// ProducerRecord<Long, String> record = new ProducerRecord<>(TOPIC, partition, timeStamp, key, value);
// no cluster structure
ProducerRecord<Long, String> record = new ProducerRecord<>(TOPIC, key, value);
// RR
// ProducerRecord<Long, String> record = new ProducerRecord<>(TOPIC, value);
RecordMetadata recordMetadata = kafkaProducer.send(record).get();
System.out.println(String.format("Record with (key: %s, value: %s), was sent to (partition: %d, offset: %d",
record.key(), record.value(), recordMetadata.partition(), recordMetadata.offset()));
}
}
public static Producer<Long, String> createKafkaProducer(String bootstrapServers) {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ProducerConfig.CLIENT_ID_CONFIG, "events-producer");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
return new KafkaProducer<>(properties);
}
}
# Root logger option
log4j.rootLogger=ERROR, stdout
# Direct log messages to stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{HH:mm:ss} %-5p %c{1}:%L - %m%n
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment